9

Por favor, vea esta pregunta para obtener información básica:Es compatible con CorrelationManager.LogicalOperationStack Parallel.For, Tareas, hilos, etc

How do Tasks in the Task Parallel Library affect ActivityID?

Esa pregunta se refiere a cómo las tareas afectan Trace.CorrelationManager.ActivityId. @Greg Samson respondió su propia pregunta con un programa de prueba que muestra que ActivityId es confiable en el contexto de Tareas. El programa de prueba establece un ActivityId al comienzo del delegado de tareas, duerme para simular el trabajo, luego verifica el ActivityId al final para asegurarse de que tiene el mismo valor (es decir, que no ha sido modificado por otro hilo). El programa se ejecuta con éxito.

Mientras investigaba otras opciones de "contexto" para operaciones de subprocesamiento, tareas y operaciones paralelas (en última instancia, para proporcionar un mejor contexto para el registro), me encontré con un problema extraño con Trace.CorrelationManager.LogicalOperationStack (de todos modos me resultaba extraño). He copiado mi "respuesta" a su pregunta a continuación.

Creo que describe adecuadamente el problema con el que me encontré (Trace.CorrelationManager.LogicalOperationStack aparentemente se corrompe, o algo así, cuando se usa en el contexto de Parallel.For, pero solo si el Parallel.For mismo está encerrado en una operación lógica).

Aquí están mis preguntas:

  1. caso Trace.CorrelationManager.LogicalOperationStack ser utilizable con Parallel.For? De ser así, ¿debería marcar la diferencia si una operación lógica ya está en efecto con el Paralelo. ¿Para cuándo se inició?

  2. ¿Existe una forma "correcta" de utilizar LogicalOperationStack con Parallel.For? ¿Podría codificar este programa de muestra de manera diferente para que "funcione"? Por "trabajos", quiero decir que LogicalOperationStack siempre tiene el número esperado de entradas y las entradas mismas son las entradas esperadas.

He hecho algunas pruebas adicionales utilizando hilos e hilos ThreadPool, pero habría que volver atrás y vuelva a intentar esas pruebas para ver si me encontré con problemas similares.

Diré que parece que los subprocesos Tarea/Paralelo y los subprocesos ThreadPool HACEN "heredar" los valores Trace.CorrelationManager.ActivityId y Trace.CorrelationManager.LogicalOperationStack del subproceso padre. Esto es esperado ya que el CorrelationManager almacena estos valores usando el método LogicalSetData de CallContext (a diferencia de SetData).

Una vez más, por favor refiérase a esta pregunta para obtener el contexto original de la "respuesta" que he publicado a continuación:

How do Tasks in the Task Parallel Library affect ActivityID?

Véase también esta pregunta similar (que hasta ahora no ha sido contestada) en paralelo foro Extensiones de Microsoft:

http://social.msdn.microsoft.com/Forums/en-US/parallelextensions/thread/7c5c3051-133b-4814-9db0-fc0039b4f9d9

[Iniciar pegar]

Perdone que publique esto como una respuesta, ya que no es realmente una respuesta a su pregunta, sin embargo, está relacionado con su pregunta, ya que trata con el comportamiento de CorrelationManager y los hilos/tareas/etc.He estado buscando utilizando los métodos LogicalOperationStack de CorrelationManager (y StartLogicalOperation/StopLogicalOperation) para proporcionar contexto adicional en escenarios de subprocesamiento múltiple.

Tomé su ejemplo y lo modifiqué ligeramente para agregar la capacidad de realizar trabajos en paralelo usando Parallel.For. Además, uso StartLogicalOperation/StopLogicalOperation para colocar (internamente) DoLongRunningWork. Conceptualmente, DoLongRunningWork hace algo como esto cada vez que se ejecuta:

DoLongRunningWork 
    StartLogicalOperation 
    Thread.Sleep(3000) 
    StopLogicalOperation 

He encontrado que si añado estas operaciones lógicas a su código (más o menos como es), todos los operatins lógicas permanecen en sincronía (siempre el número esperado de operaciones en la pila y los valores de las operaciones en la pila son siempre los esperados).

En algunas de mis propias pruebas descubrí que este no era siempre el caso. La pila de operaciones lógicas se estaba "corrompiendo". La mejor explicación que podría surgir es que la "fusión" de la información de CallContext en el contexto de subproceso "principal" cuando el subproceso "secundario" sale causaba que la "vieja" información de contexto de subproceso secundario (operación lógica) " heredado "por otro hilo de niño hermano.

El problema también podría estar relacionado con el hecho de que Parallel.For aparentemente usa el hilo principal (al menos en el código de ejemplo, como está escrito) como uno de los "hilos de trabajo" (o lo que sea que se les llame en el dominio paralelo). Cada vez que se ejecuta DoLongRunningWork, se inicia una nueva operación lógica (al principio) y se detiene (al final) (es decir, se inserta en LogicalOperationStack y se abre de nuevo). Si el hilo principal ya tiene una operación lógica en efecto y si DoLongRunningWork se ejecuta EN EL HILO PRINCIPAL, entonces se inicia una nueva operación lógica, por lo que el LogicalOperationStack del hilo principal ahora tiene DOS operaciones. Cualquier ejecución posterior de DoLongRunningWork (siempre que esta "iteración" de DoLongRunningWork se esté ejecutando en el hilo principal) heredará (al parecer) el LogicalOperationStack del hilo principal (que ahora tiene dos operaciones en él, en lugar de solo una operación esperada).

Me llevó mucho tiempo descubrir por qué el comportamiento de LogicalOperationStack era diferente en mi ejemplo que en mi versión modificada de su ejemplo. Finalmente vi que en mi código había puesto entre corchetes todo el programa en una operación lógica, mientras que en mi versión modificada de su programa de prueba no lo hice. La implicación es que en mi programa de prueba, cada vez que se realizaba mi "trabajo" (análogo a DoLongRunningWork), ya había una operación lógica en vigor. En mi versión modificada de su programa de prueba, no había puesto entre corchetes todo el programa en una operación lógica.

Entonces, cuando modifiqué su programa de prueba para poner el programa completo entre corchetes en una operación lógica Y si estoy usando Parallel.For, encontré exactamente el mismo problema.

Usando el modelo conceptual anterior, este se ejecutará correctamente:

Parallel.For 
    DoLongRunningWork 
    StartLogicalOperation 
    Sleep(3000) 
    StopLogicalOperation 

Aunque esto va a valer el tiempo debido a una aparentemente por LogicalOperationStack sincronización:

StartLogicalOperation 
Parallel.For 
    DoLongRunningWork 
    StartLogicalOperation 
    Sleep(3000) 
    StopLogicalOperation 
StopLogicalOperation 

Aquí es mi programa de ejemplo. Es similar a la tuya ya que tiene un método DoLongRunningWork que manipula ActivityId y LogicalOperationStack. También tengo dos sabores de patear DoLongRunningWork. Un sabor usa Tareas que uno usa Paralelo.Para. Cada sabor también puede ejecutarse de manera que toda la operación paralelizada se encerra en una operación lógica o no. Entonces, hay un total de 4 formas de ejecutar la operación paralela. Para probar cada uno, simplemente elimine el comentario del método "Usar ..." deseado, recompile y ejecute.UseTasks, UseTasks(true), y UseParallelFor deberían completarse todos. UseParallelFor(true) se confirmará en algún momento porque LogicalOperationStack no tiene el número esperado de entradas.

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Text; 
using System.Diagnostics; 
using System.Threading; 
using System.Threading.Tasks; 

namespace CorrelationManagerParallelTest 
{ 
    class Program 
    {  
    static void Main(string[] args)  
    { 
     //UseParallelFor(true) will assert because LogicalOperationStack will not have expected 
     //number of entries, all others will run to completion. 

     UseTasks(); //Equivalent to original test program with only the parallelized 
         //operation bracketed in logical operation. 
     ////UseTasks(true); //Bracket entire UseTasks method in logical operation 
     ////UseParallelFor(); //Equivalent to original test program, but use Parallel.For 
          //rather than Tasks. Bracket only the parallelized 
          //operation in logical operation. 
     ////UseParallelFor(true); //Bracket entire UseParallelFor method in logical operation 
    }  

    private static List<int> threadIds = new List<int>();  
    private static object locker = new object();  

    private static int mainThreadId = Thread.CurrentThread.ManagedThreadId; 

    private static int mainThreadUsedInDelegate = 0; 

    // baseCount is the expected number of entries in the LogicalOperationStack 
    // at the time that DoLongRunningWork starts. If the entire operation is bracketed 
    // externally by Start/StopLogicalOperation, then baseCount will be 1. Otherwise, 
    // it will be 0. 
    private static void DoLongRunningWork(int baseCount)  
    { 
     lock (locker) 
     { 
     //Keep a record of the managed thread used.    
     if (!threadIds.Contains(Thread.CurrentThread.ManagedThreadId)) 
      threadIds.Add(Thread.CurrentThread.ManagedThreadId); 

     if (Thread.CurrentThread.ManagedThreadId == mainThreadId) 
     { 
      mainThreadUsedInDelegate++; 
     } 
     }   

     Guid lo1 = Guid.NewGuid(); 
     Trace.CorrelationManager.StartLogicalOperation(lo1); 

     Guid g1 = Guid.NewGuid();   
     Trace.CorrelationManager.ActivityId = g1; 

     Thread.Sleep(3000);   

     Guid g2 = Trace.CorrelationManager.ActivityId; 
     Debug.Assert(g1.Equals(g2)); 

     //This assert, LogicalOperation.Count, will eventually fail if there is a logical operation 
     //in effect when the Parallel.For operation was started. 
     Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Count == baseCount + 1, string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Count, baseCount + 1)); 
     Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Peek().Equals(lo1), string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Peek(), lo1)); 

     Trace.CorrelationManager.StopLogicalOperation(); 
    } 

    private static void UseTasks(bool encloseInLogicalOperation = false) 
    { 
     int totalThreads = 100; 
     TaskCreationOptions taskCreationOpt = TaskCreationOptions.None; 
     Task task = null; 
     Stopwatch stopwatch = new Stopwatch(); 
     stopwatch.Start(); 

     if (encloseInLogicalOperation) 
     { 
     Trace.CorrelationManager.StartLogicalOperation(); 
     } 

     Task[] allTasks = new Task[totalThreads]; 
     for (int i = 0; i < totalThreads; i++) 
     { 
     task = Task.Factory.StartNew(() => 
     { 
      DoLongRunningWork(encloseInLogicalOperation ? 1 : 0); 
     }, taskCreationOpt); 
     allTasks[i] = task; 
     } 
     Task.WaitAll(allTasks); 

     if (encloseInLogicalOperation) 
     { 
     Trace.CorrelationManager.StopLogicalOperation(); 
     } 

     stopwatch.Stop(); 
     Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds)); 
     Console.WriteLine(String.Format("Used {0} threads", threadIds.Count)); 
     Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate)); 

     Console.ReadKey(); 
    } 

    private static void UseParallelFor(bool encloseInLogicalOperation = false) 
    { 
     int totalThreads = 100; 
     Stopwatch stopwatch = new Stopwatch(); 
     stopwatch.Start(); 

     if (encloseInLogicalOperation) 
     { 
     Trace.CorrelationManager.StartLogicalOperation(); 
     } 

     Parallel.For(0, totalThreads, i => 
     { 
     DoLongRunningWork(encloseInLogicalOperation ? 1 : 0); 
     }); 

     if (encloseInLogicalOperation) 
     { 
     Trace.CorrelationManager.StopLogicalOperation(); 
     } 

     stopwatch.Stop(); 
     Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds)); 
     Console.WriteLine(String.Format("Used {0} threads", threadIds.Count)); 
     Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate)); 

     Console.ReadKey(); 
    } 

    } 
} 

Todo este asunto de si LogicalOperationStack se puede utilizar con Parallel.For (y/u otro roscado/Tarea construye) o la forma en que se puede utilizar probablemente merece su propia pregunta. Tal vez publique una pregunta. Mientras tanto, me pregunto si tienes alguna idea al respecto (o, me pregunto si habías considerado usar LogicalOperationStack ya que ActivityId parece estar seguro).

[PASTA DE FIN]

¿Alguien tiene alguna idea sobre este tema?

+0

Cualquier idea o idea sobre LogicalOperationStack and Parallel.For? – wageoghe

+0

Gracias por la exhaustiva investigación que me salva de encontrar al culpable :) –

Respuesta

5

[Comenzar la actualización]

también hice esta pregunta en Microsoft's Parallel Extensions for .Net support forum y, finalmente, recibí un answer from Stephen Toub. Resulta que hay un bug in the LogicalCallContext que está causando que LogicalOperationStack esté dañado. También hay una buena descripción (en un seguimiento por Stephen a una respuesta que hice a su respuesta) que da una breve visión de cómo Parallel.For funciona con respecto a repartir tareas y por qué eso hace Parallel.Por ser susceptible al error.

En mi respuesta a continuación, especulo que LogicalOperationStack no es compatible con Parallel.For porque Parallel.For usa el hilo principal como uno de los hilos "worker". Basado en la explicación de Stephen, mi especulación era incorrecta. Parallel.For usa el hilo principal como uno de los hilos "worker", pero no se usa simplemente "tal cual". La primera Tarea se ejecuta en el hilo principal, pero se ejecuta de tal manera que es como si se ejecutara en un hilo nuevo. Lea la descripción de Stephen para más información.

[Fin Actualización]

De lo que puedo decir, la respuesta es la siguiente:

Tanto ActivityId y LogicalOperationStack se almacenan a través de CallContext.LogicalSetData. Eso significa que estos valores se "fluirán" a cualquier subproceso "secundario". Eso es genial, por ejemplo, establecer ActivityId en el punto de entrada en un servidor multiproceso (por ejemplo, una llamada de servicio) y todos los hilos que finalmente se inician desde ese punto de entrada pueden ser parte de la misma "actividad". De manera similar, las operaciones lógicas (a través de LogicalOperationStack) también fluyen a los hilos hijo.

Con respecto a Trace.CorrelationManager.ActivityId:

ActivityId parece ser compatible con todos los modelos de roscado que he probado con:. El uso de roscas directamente, usando ThreadPool, utilizando tareas, utilizando paralelo *. En todos los casos, ActivityId tiene el valor esperado.

Con respecto a Trace.CorrelationManager.LogicalOperationStack:

LogicalOperationStack parece ser compatible con la mayoría de los modelos de roscado, pero no con Paralelo *.. Al usar hilos directamente, ThreadPool y Tasks, LogicalOperationStack (como se manipuló en el código de muestra provisto en mi pregunta) mantiene su integridad. En todo momento, el contenido de LogicalOperationStack es el esperado.

LogicalOperationStack NO es compatible con Parallel.For. Si una operación lógica está "en efecto", es decir, si ha llamado a CorrelationManager.StartLogicalOperation, antes de iniciar la operación Parallel. * Y luego inicia una nueva operación lógica en el contexto de Paralle. * (Es decir, en el delegado), entonces LogicalOperationStack estará dañado. (Debo decir que PROBABLEMENTE estará dañado. Paralelo. * Podría no crear ningún subproceso adicional, lo que significa que LogicalOperationStack sería seguro).

El problema se debe al hecho de que Parallel. * Utiliza el hilo principal (o, probablemente más correctamente, el hilo que inicia la operación en paralelo) como uno de sus hilos "worker". Esto significa que a medida que las "operaciones lógicas" se inician y se detienen en el hilo "trabajador" que es lo mismo que el hilo "principal", se está modificando el LogicalOperationStack del hilo "principal". Incluso si el código de llamada (es decir, el delegado) mantiene la pila correctamente (asegurándose de que cada StartLogicalOperation se "detiene" con una StopLogicalOperation correspondiente), la pila de hilos "principal" se modifica. En última instancia, parece (para mí, de todos modos), que la LogicalOperationStack del hilo "principal" está siendo modificada esencialmente por dos hilos "lógicos" diferentes: el hilo "principal" y el hilo "trabajador", que ambos son los MISMOS hilo.

No conozco los pormenores de exactamente por qué esto no funciona (al menos como esperaría que funcione). Mi mejor suposición es que cada vez que el delegado se ejecuta en un hilo (que no es lo mismo que el hilo principal), el hilo "hereda" el estado actual del LogicalOperationStack del hilo principal. Si el delegado se está ejecutando actualmente en el hilo principal (reutilizándose como hilo de trabajo) y ha iniciado una operación lógica, uno (o más de uno) de los otros delegados paralelizados "heredará" el LogicalOperationStack del hilo principal que ahora tiene una (o más) nuevas operaciones lógicas en efecto!

Fwiw, que implementa (principalmente para las pruebas, no estoy de utilizarlo en el momento), el siguiente "pila lógica" para imitar la LogicalOperationStack, pero lo hace de tal manera que va a trabajar con el paralelo. * Siéntase libre de probarlo y/o usarlo. Para probar, sustituir las llamadas a

Trace.CorrelationManager.StartLogicalOperation/StopLogicalOperation 

en el código de ejemplo de mi pregunta original con llamadas a

LogicalOperation.OperationStack.Push()/Pop(). 


//OperationStack.cs 
using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Text; 

using System.Runtime.Remoting.Messaging; 

namespace LogicalOperation 
{ 
    public static class OperationStack 
    { 
    private const string OperationStackSlot = "OperationStackSlot"; 

    public static IDisposable Push(string operation) 
    { 
     OperationStackItem parent = CallContext.LogicalGetData(OperationStackSlot) as OperationStackItem; 
     OperationStackItem op = new OperationStackItem(parent, operation); 
     CallContext.LogicalSetData(OperationStackSlot, op); 
     return op; 
    } 

    public static object Pop() 
    { 
     OperationStackItem current = CallContext.LogicalGetData(OperationStackSlot) as OperationStackItem; 

     if (current != null) 
     { 
     CallContext.LogicalSetData(OperationStackSlot, current.Parent); 
     return current.Operation; 
     } 
     else 
     { 
     CallContext.FreeNamedDataSlot(OperationStackSlot); 
     } 
     return null; 
    } 

    public static object Peek() 
    { 
     OperationStackItem top = Top(); 
     return top != null ? top.Operation : null; 
    } 

    internal static OperationStackItem Top() 
    { 
     OperationStackItem top = CallContext.LogicalGetData(OperationStackSlot) as OperationStackItem; 
     return top; 
    } 

    public static IEnumerable<object> Operations() 
    { 
     OperationStackItem current = Top(); 
     while (current != null) 
     { 
     yield return current.Operation; 
     current = current.Parent; 
     } 
    } 

    public static int Count 
    { 
     get 
     { 
     OperationStackItem top = Top(); 
     return top == null ? 0 : top.Depth; 
     } 
    } 

    public static IEnumerable<string> OperationStrings() 
    { 
     foreach (object o in Operations()) 
     { 
     yield return o.ToString(); 
     } 
    } 
    } 
} 


//OperationStackItem.cs 
using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Text; 

namespace LogicalOperation 
{ 
    public class OperationStackItem : IDisposable 
    { 
    private OperationStackItem parent = null; 
    private object operation; 
    private int depth; 
    private bool disposed = false; 

    internal OperationStackItem(OperationStackItem parentOperation, object operation) 
    { 
     parent = parentOperation; 
     this.operation = operation; 
     depth = parent == null ? 1 : parent.Depth + 1; 
    } 

    internal object Operation { get { return operation; } } 
    internal int Depth { get { return depth; } } 

    internal OperationStackItem Parent { get { return parent; } } 

    public override string ToString() 
    { 
     return operation != null ? operation.ToString() : ""; 
    } 

    #region IDisposable Members 

    public void Dispose() 
    { 
     if (disposed) return; 

     OperationStack.Pop(); 

     disposed = true; 
    } 

    #endregion 
    } 
} 

Esto se inspiró en los objetos de ámbito descritos por Brent VanderMeide aquí: http://www.dnrtv.com/default.aspx?showNum=114

Puede usar esta clase de esta manera:

public void MyFunc() 
{ 
    using (LogicalOperation.OperationStack.Push("MyFunc")) 
    { 
    MyOtherFunc(); 
    } 
} 

public void MyOtherFunc() 
{ 
    using (LogicalOperation.OperationStack.Push("MyOtherFunc")) 
    { 
    MyFinalFunc(); 
    } 
} 

public void MyFinalFunc() 
{ 
    using (LogicalOperation.OperationStack.Push("MyFinalFunc")) 
    { 
    Console.WriteLine("Hello"); 
    } 
} 
2

Fui investigado una forma de tener una pila lógica que debería funcionar fácilmente en una aplicación que usa TPL en gran medida. Decidí usar LogicalOperationStack porque hizo todo lo que necesitaba sin cambiar el código existente. Pero luego leí acerca de un error en el LogicalCallContext:

https://connect.microsoft.com/VisualStudio/feedback/details/609929/logicalcallcontext-clone-bug-when-correlationmanager-slot-is-present

así que traté de encontrar una solución de este error y creo que tengo trabajo para el TPL (Gracias ILSpy):

public static class FixLogicalOperationStackBug 
{ 
    private static bool _fixed = false; 

    public static void Fix() 
    { 
     if (!_fixed) 
     { 
      _fixed = true; 

      Type taskType = typeof(Task); 
      var s_ecCallbackField = taskType.GetFields(BindingFlags.Static | BindingFlags.NonPublic).First(f => f.Name == "s_ecCallback"); 
      ContextCallback s_ecCallback = (ContextCallback)s_ecCallbackField.GetValue(null); 

      ContextCallback injectedCallback = new ContextCallback(obj => 
      { 
       // Next line will set the private field m_IsCorrelationMgr of LogicalCallContext which isn't cloned 
       CallContext.LogicalSetData("System.Diagnostics.Trace.CorrelationManagerSlot", Trace.CorrelationManager.LogicalOperationStack); 
       s_ecCallback(obj); 
      }); 

      s_ecCallbackField.SetValue(null, injectedCallback); 
     } 
    } 
} 
+0

¿Alguna posibilidad de que pueda agregar detalles sobre dónde y cuándo llamar a este método para "arreglar" el error? – jpierson

+0

Obtengo una NullReferenceException cuando intento esta solución y al agregar una comprobación nula alrededor de s_ecCallback antes de invocarlo, mi código de prueba se congela después de llamar a Fix(). – jpierson

+0

@jpierson: si realiza la comprobación nula en la devolución de llamada, puede evitar que la tarea se ejecute realmente. Si debe haber una verificación nula, sería justo después de la asignación de s_ecCallback.Pero supongo que su llamada a Fix es demasiado pronto: incluso antes de que el marco tuviera un cambio de asignación. –