2012-05-24 21 views
6

Intento implementar un ConcurrentDictionary envolviéndolo en un BlockingCollection pero no parece tener éxito.Cómo envolver ConcurrentDictionary en BlockingCollection?

entiendo que una declaración de variables trabajan con BlockingCollection como ConcurrentBag<T>, ConcurrentQueue<T>, etc.

Por lo tanto, para crear un ConcurrentBag envuelto en una BlockingCollection yo anunciare y cree una instancia de esta manera:

BlockingCollection<int> bag = new BlockingCollection<int>(new ConcurrentBag<int>()); 

pero, ¿cómo hacerlo para ConcurrentDictionary? Necesito la funcionalidad de bloqueo de BlockingCollection tanto para el productor como para el consumidor.

+0

El diccionario (y ConcurrentDictionary también) no conserva el orden de los elementos. ¿Puedes describir tu escenario productor-consumidor? – Dennis

+0

@Dennis, soy consciente de eso. Un productor almacena KeyValuePairs en el concurrentDictionary, y una tarea de consumidor incrementa un int y elimina el KeyValuePair si el int coincide con la clave respectiva. Hago esto porque las tareas de los trabajadores llenan el Dictuario concurrente con valores, pero en orden arbitrario, la tarea del consumidor garantiza que los valores recibidos se pasen o se trabajen en el orden correcto. ¿Puede un ConcurrentDictionary ser envuelto en un BlockingCollection? –

+0

¿Qué solución se te ocurrió? Estoy tratando de encontrar una buena solución a un problema similar donde el productor no produce artículos en el orden que necesita el consumidor. (Publicación anterior lo sé, pero vale la pena intentarlo) – Kim

Respuesta

1

Tendrá que escribir su propia clase de adaptador - algo así como:

public class ConcurrentDictionaryWrapper<TKey,TValue> : IProducerConsumerCollection<KeyValuePair<TKey,TValue>> 
{ 
    private ConcurrentDictionary<TKey, TValue> dictionary; 

    public IEnumerator<KeyValuePair<TKey, TValue>> GetEnumerator() 
    { 
     return dictionary.GetEnumerator(); 
    } 

    IEnumerator IEnumerable.GetEnumerator() 
    { 
     return GetEnumerator(); 
    } 

    public void CopyTo(Array array, int index) 
    { 
     throw new NotImplementedException(); 
    } 

    public int Count 
    { 
     get { return dictionary.Count; } 
    } 

    public object SyncRoot 
    { 
     get { return this; } 
    } 

    public bool IsSynchronized 
    { 
     get { return true; } 
    } 

    public void CopyTo(KeyValuePair<TKey, TValue>[] array, int index) 
    { 
     throw new NotImplementedException(); 
    } 

    public bool TryAdd(KeyValuePair<TKey, TValue> item) 
    { 
     return dictionary.TryAdd(item.Key, item.Value); 
    } 

    public bool TryTake(out KeyValuePair<TKey, TValue> item) 
    { 
     item = dictionary.FirstOrDefault(); 
     TValue value; 
     return dictionary.TryRemove(item.Key, out value); 
    } 

    public KeyValuePair<TKey, TValue>[] ToArray() 
    { 
     throw new NotImplementedException(); 
    } 
} 
+1

Gracias por la sugerencia de código. Pero mi principal objetivo al utilizar un BlockingCollection fue la capacidad de marcar la colección como Completando Completado y verificar el estado del mismo, así como si su adición es completa y está vacía, similar a lo que ofrece BlockingCollection. Soy consciente de que puedo agregar fácilmente dicha funcionalidad, pero estoy buscando una sugerencia de cómo hacerlo directamente a través de BlockingCollection. Hasta ahora, no veo una razón por la cual no pueda funcionar directamente a través de la colección de bloqueo. Tal vez solo se necesita IProducerConsumerCollection ? –

4

Tal vez usted necesita un diccionario concurrente de blockingCollection

 ConcurrentDictionary<int, BlockingCollection<string>> mailBoxes = new ConcurrentDictionary<int, BlockingCollection<string>>(); 
     int maxBoxes = 5; 

     CancellationTokenSource cancelationTokenSource = new CancellationTokenSource(); 
     CancellationToken cancelationToken = cancelationTokenSource.Token; 

     Random rnd = new Random(); 
     // Producer 
     Task.Factory.StartNew(() => 
     { 
      while (true) 
      { 
       int index = rnd.Next(0, maxBoxes); 
       // put the letter in the mailbox 'index' 
       var box = mailBoxes.GetOrAdd(index, new BlockingCollection<string>()); 
       box.Add("some message " + index, cancelationToken); 
       Console.WriteLine("Produced a letter to put in box " + index); 

       // Wait simulating a heavy production item. 
       Thread.Sleep(1000); 
      } 
     }); 

     // Consumer 1 
     Task.Factory.StartNew(() => 
     { 
      while (true) 
      { 
       int index = rnd.Next(0, maxBoxes); 
       // get the letter in the mailbox 'index' 
       var box = mailBoxes.GetOrAdd(index, new BlockingCollection<string>()); 
       var message = box.Take(cancelationToken); 
       Console.WriteLine("Consumed 1: " + message); 

       // consume a item cost less than produce it: 
       Thread.Sleep(50); 
      } 
     }); 

     // Consumer 2 
     Task.Factory.StartNew(() => 
     { 
      while (true) 
      { 
       int index = rnd.Next(0, maxBoxes); 
       // get the letter in the mailbox 'index' 
       var box = mailBoxes.GetOrAdd(index, new BlockingCollection<string>()); 
       var message = box.Take(cancelationToken); 
       Console.WriteLine("Consumed 2: " + message); 

       // consume a item cost less than produce it: 
       Thread.Sleep(50); 
      } 
     }); 

     Console.ReadLine(); 
     cancelationTokenSource.Cancel(); 

De esta manera, un consumidor que está esperando algo en el buzón 5, esperará hasta que el productor ponga una letra en el buzón 5.

Cuestiones relacionadas