2010-12-29 6 views
14

Estoy tratando de averiguar cuál será la mejor forma de trabajar con una cola. Tengo un proceso que devuelve una DataTable. Cada DataTable, a su vez, se fusiona con la DataTable anterior. Hay un problema, demasiados registros para mantener hasta la final BulkCopy (OutOfMemory).Cómo trabajar con threading con ConcurrentQueue <T>

Por lo tanto, he determinado que debo procesar cada DataTable entrante de inmediato. Pensando en el ConcurrentQueue<T> ... pero no veo cómo el método WriteQueuedData() sabría dequeuear una tabla y escribirla en la base de datos.

Por ejemplo:

public class TableTransporter 
{ 
    private ConcurrentQueue<DataTable> tableQueue = new ConcurrentQueue<DataTable>(); 

    public TableTransporter() 
    { 
     tableQueue.OnItemQueued += new EventHandler(WriteQueuedData); // no events available 
    } 

    public void ExtractData() 
    { 
     DataTable table; 

     // perform data extraction 
     tableQueue.Enqueue(table); 
    } 

    private void WriteQueuedData(object sender, EventArgs e) 
    { 
     BulkCopy(e.Table); 
    } 
} 

Mi primera pregunta es, aparte del hecho de que en realidad no tienen ningún evento para la suscripción, si llamo ExtractData() asíncrona será este todo lo que necesito? En segundo lugar, ¿hay algo que me falta sobre la forma en que funciona ConcurrentQueue<T> y que necesita alguna forma de desencadenador para trabajar de forma asíncrona con los objetos en cola?

actualización Me acaban de deriva una clase de ConcurrentQueue<T> que tiene un controlador de eventos OnItemQueued. Entonces:

new public void Enqueue (DataTable Table) 
{ 
    base.Enqueue(Table); 
    OnTableQueued(new TableQueuedEventArgs(Table)); 
} 

public void OnTableQueued(TableQueuedEventArgs table) 
{ 
    EventHandler<TableQueuedEventArgs> handler = TableQueued; 

    if (handler != null) 
    { 
     handler(this, table); 
    } 
} 

¿Tiene alguna preocupación acerca de esta implementación?

Respuesta

18

Según entiendo el problema, te faltan algunas cosas.

La cola simultánea es una estructura de datos diseñada para aceptar múltiples hilos de lectura y escritura en la cola sin necesidad de bloquear explícitamente la estructura de datos. (Todo ese jazz se resuelve entre bastidores, o la colección se implementa de tal forma que no necesita un candado).

Teniendo eso en cuenta, parece que el patrón que está intentando para usar es el "Producto/Consumidor". En primer lugar, tiene algunas tareas que producen trabajo (y agrega elementos a la cola). Y en segundo lugar, tiene una segunda tarea. Consumir cosas de la cola (y eliminar elementos).

Así que realmente quieres dos hilos: uno que agrega elementos y otro que elimina los elementos. Como está utilizando una colección simultánea, puede tener varios subprocesos que agreguen elementos y varios subprocesos que eliminen elementos. Pero, obviamente, cuanto más contención tengas en la cola concurrente, más rápido se convertirá en el cuello de botella.

+0

Pensé que tenía 2 hilos. El hilo principal básicamente esperaría a que se desencadene el evento. El segundo subproceso comienza como una llamada asincrónica a 'ExtractData()'. En la devolución de llamada asíncrona, simplemente continuaré el proceso de extracción. – IAbstract

+0

En realidad, creo que lo tengo al revés; el hilo principal debería ser la puesta en cola de las tablas de datos; luego comience el método de escritura asíncrona a través del desencadenador de evento de elemento en cola. – IAbstract

3

Ésta es la solución completa para lo que ocurrió:

public class TableTransporter 
{ 
    private static int _indexer; 

    private CustomQueue tableQueue = new CustomQueue(); 
    private Func<DataTable, String> RunPostProcess; 
    private string filename; 

    public TableTransporter() 
    { 
     RunPostProcess = new Func<DataTable, String>(SerializeTable); 
     tableQueue.TableQueued += new EventHandler<TableQueuedEventArgs>(tableQueue_TableQueued); 
    } 

    void tableQueue_TableQueued(object sender, TableQueuedEventArgs e) 
    { 
     // do something with table 
     // I can't figure out is how to pass custom object in 3rd parameter 
     RunPostProcess.BeginInvoke(e.Table,new AsyncCallback(PostComplete), filename); 
    } 

    public void ExtractData() 
    { 
     // perform data extraction 
     tableQueue.Enqueue(MakeTable()); 
     Console.WriteLine("Table count [{0}]", tableQueue.Count); 
    } 

    private DataTable MakeTable() 
    { return new DataTable(String.Format("Table{0}", _indexer++)); } 

    private string SerializeTable(DataTable Table) 
    { 
     string file = Table.TableName + ".xml"; 

     DataSet dataSet = new DataSet(Table.TableName); 

     dataSet.Tables.Add(Table); 

     Console.WriteLine("[{0}]Writing {1}", Thread.CurrentThread.ManagedThreadId, file); 
     string xmlstream = String.Empty; 

     using (MemoryStream memstream = new MemoryStream()) 
     { 
      XmlSerializer xmlSerializer = new XmlSerializer(typeof(DataSet)); 
      XmlTextWriter xmlWriter = new XmlTextWriter(memstream, Encoding.UTF8); 

      xmlSerializer.Serialize(xmlWriter, dataSet); 
      xmlstream = UTF8ByteArrayToString(((MemoryStream)xmlWriter.BaseStream).ToArray()); 

      using (var fileStream = new FileStream(file, FileMode.Create)) 
       fileStream.Write(StringToUTF8ByteArray(xmlstream), 0, xmlstream.Length + 2); 
     } 
     filename = file; 

     return file; 
    } 

    private void PostComplete(IAsyncResult iasResult) 
    { 
     string file = (string)iasResult.AsyncState; 
     Console.WriteLine("[{0}]Completed: {1}", Thread.CurrentThread.ManagedThreadId, file); 

     RunPostProcess.EndInvoke(iasResult); 
    } 

    public static String UTF8ByteArrayToString(Byte[] ArrBytes) 
    { return new UTF8Encoding().GetString(ArrBytes); } 

    public static Byte[] StringToUTF8ByteArray(String XmlString) 
    { return new UTF8Encoding().GetBytes(XmlString); } 
} 

public sealed class CustomQueue : ConcurrentQueue<DataTable> 
{ 
    public event EventHandler<TableQueuedEventArgs> TableQueued; 

    public CustomQueue() 
    { } 
    public CustomQueue(IEnumerable<DataTable> TableCollection) 
     : base(TableCollection) 
    { } 

    new public void Enqueue (DataTable Table) 
    { 
     base.Enqueue(Table); 
     OnTableQueued(new TableQueuedEventArgs(Table)); 
    } 

    public void OnTableQueued(TableQueuedEventArgs table) 
    { 
     EventHandler<TableQueuedEventArgs> handler = TableQueued; 

     if (handler != null) 
     { 
      handler(this, table); 
     } 
    } 
} 

public class TableQueuedEventArgs : EventArgs 
{ 
    #region Fields 
    #endregion 

    #region Init 
    public TableQueuedEventArgs(DataTable Table) 
    {this.Table = Table;} 
    #endregion 

    #region Functions 
    #endregion 

    #region Properties 
    public DataTable Table 
    {get;set;} 
    #endregion 
} 

Como prueba de concepto, parece que funciona bastante bien. A lo sumo, vi 4 hilos de trabajo.

+0

TODO: actualizar con un método asincrónico más nuevo. – IAbstract

+0

Mirando a través de esto, es una buena implementación, sin embargo, después de ejecutar una prueba rápida, ¿cuándo se quita la cola de un artículo? –

+0

@RichardPriddy: dado que esto fue hace poco más de 5 años (* y hace mucho tiempo que pasé a mi tercera compañía *), solo puedo suponer que este no fue un ejemplo completo. Tenga en cuenta la * prueba de concepto * comentario al final. ;) Dicho esto, dependiendo de los requisitos, podría exponer el evento 'enqueued' y permitir que otra persona maneje el proceso de dequeueing. De lo contrario, podría ser lógico dequear en algún lugar en la función de proceso posterior 'AsyncCallback'. Sería realmente difícil precisar algo más específico en esta fecha tardía. – IAbstract

8

Creo que ConcurrentQueue solo es útil en muy pocos casos. Su principal ventaja es que está libre de bloqueo. Sin embargo, por lo general, el/los hilo (s) productor/es deben (n) informar al (a los) hilo (s) de consumo de alguna manera que hay datos disponibles para procesar. Esta señalización entre hilos necesita bloqueos y anula el beneficio de usar ConcurrentQueue. La forma más rápida de sincronizar subprocesos es usando Monitor.Pulse(), que funciona solo dentro de un bloqueo. Todas las demás herramientas de sincronización son incluso más lentas.

Por supuesto, el consumidor puede comprobar continuamente si hay algo en la cola, que funciona sin bloqueos, pero es una gran pérdida de recursos del procesador. Un poco mejor es si el consumidor espera entre un control y otro.

Elevar un hilo cuando se escribe en la cola es una muy mala idea. El uso de ConcurrentQueue para guardar mabe 1 microsegundo se desperdiciará por completo al ejecutar el controlador de eventos, que puede demorar 1000 veces más.

Si todo el procesamiento se realiza en un controlador de eventos o una llamada asincrónica, la pregunta es ¿por qué todavía se necesita una cola? Mejor pasar los datos directamente al controlador y no utilizar una cola en absoluto.

Tenga en cuenta que la implementación de ConcurrentQueue es bastante complicada para permitir la concurrencia. En la mayoría de los casos, mejor utilice una cola normal <> y bloquee cada acceso a la cola. Como el acceso a la cola solo necesita microsegundos, es muy poco probable que 2 hilos accedan a la cola en el mismo microsegundo y casi nunca habrá demoras debido al bloqueo. El uso de una cola normal <> con bloqueo a menudo dará como resultado una ejecución de código más rápida que ConcurrentQueue.

+0

Es una pena recibir el voto negativo. Creo que es una opinión válida y pragmática. – user3085342