2009-09-19 12 views
7

Necesito una cola para pasar mensajes de un hilo (A) a otro (B), sin embargo, no he podido encontrar uno que realmente haga lo que quiero, ya que generalmente permiten agregar un elemento al error, un caso que en mi situación es bastante fatal ya que el mensaje debe procesarse, y el hilo realmente no puede detenerse y esperar espacio libre.Cohete múltiple de un solo lector cola única de fifo

  • único hilo Un agrega elementos, y sólo el hilo B ellos lee
  • Tema A nunca debe bloquear, sin embargo hilo B no es crítica de rendimiento, por lo que puede
  • artículos Adición siempre debe tener éxito, por lo que la cola no se puede tener un límite de tamaño superior (corto de funcionamiento de la memoria en el sistema)
  • Si la cola está vacía, hilo B debe esperar hasta que haya un artículo para procesar
+0

¿Qué biblioteca de hilos está utilizando? pthreads? –

+0

boost :: thread y algunos bits del código específico de la plataforma aquí y allá –

+0

Su objetivo podría resultar en quedarse sin memoria ya que no permite que el hilo del escritor bloquee o suelte elementos. Por lo tanto, si alcanza un límite de tamaño crítico de la cola, debe decidir si eliminar elementos o bloquear el hilo del escritor. De lo contrario, se sueltan elementos indirectamente porque el programa falla :-) – mmmmmmmm

Respuesta

7

Aquí se explica cómo escribir un lock- cola libre en C++:

http://www.ddj.com/hpc-high-performance-computing/210604448

Pero cuando se dice "hilo A no debe bloquear", ¿está seguro de que es el requisito? Windows no es un sistema operativo en tiempo real (y tampoco lo es Linux, en uso normal). Si desea que el Subproceso A pueda usar toda la memoria disponible del sistema, entonces debe asignar memoria (o esperar mientras que otra persona lo hace). El sistema operativo en sí no puede proporcionar garantías de tiempo mejores que las que tendría si el lector y el escritor tomaran un bloqueo en proceso (es decir, un mutex no compartido) para manipular la lista. Y el peor caso de agregar un mensaje va a tener que ir al sistema operativo para obtener memoria.

En resumen, hay un motivo por el que las colas que no te gustan tienen una capacidad fija, es para que no tengan que asignar memoria en el hilo supuestamente de baja latencia.

Por lo tanto, el código de bloqueo generalmente será menos bloqueado, pero debido a la asignación de memoria no está garantizado, y el rendimiento con un mutex no debería ser tan lamentable a menos que tenga una enorme secuencia de eventos para procesar (por ejemplo, está escribiendo un controlador de red y los mensajes son paquetes de Ethernet entrantes).

Así, en pseudo-código, lo primero que iba a tratar sería:

Writer: 
    allocate message and fill it in 
    acquire lock 
     append node to intrusive list 
     signal condition variable 
    release lock 

Reader: 
    for(;;) 
     acquire lock 
      for(;;) 
       if there's a node 
        remove it 
        break 
       else 
        wait on condition variable 
       endif 
      endfor 
     release lock 
     process message 
     free message 
    endfor 

Sólo si se demuestra que introducir retrasos inaceptables en el hilo escritor iba a ir a bloquear código libre, (a menos que tenga una cola adecuada que ya esté por ahí).

+0

En un nivel inferior, se podría emplear una lista de enlaces únicos con el proceso de escritura agregado y el proceso de lectura consumiendo. Esto puede estar libre de bloqueo con el proceso de escritura cambiando el puntero NULL a no NULL y el proceso de lectura cambiando de NULL a NULL. Un pequeño montón privado proporcionaría un buen rendimiento amortizado para los artículos de la lista. El escritor Mallocs y el lector liberan. Si el lector se queda dormido, se puede proporcionar un tercer proceso C que especulativamente amplía el montón privado, ocultando la naturaleza de bloqueo de la asignación del proceso A. –

+0

su ejemplo se estancará.Mientras el lector está esperando el cond, mantiene el bloqueo, lo que impide que el escritor adquiera el bloqueo y la señalización. Debe liberar el bloqueo antes de esperar en la variable de condición y volver a adquirir inmediatamente después. –

+0

@Bobby: estás equivocado. La espera en una variable de condición libera el bloqueo asociado durante la espera, y luego vuelve a adquirirlo antes de regresar de la espera. Esto es parte de lo que significa "variable de condición": si la API que está utilizando no lo hace por usted, entonces no es una variable de condición, sino más bien como un semáforo. Y es importante que la API lo haga, ya que su código puede depender del hecho de que liberar el bloqueo y comenzar a esperar la condición ocurre atómicamente, es decir, ningún otro hilo puede hacer nada bajo el bloqueo antes de que su hilo sea un mesero. –

0

Es posible que desee considerar sus requisitos: ¿es cierto que A no puede descartar ningún elemento de la cola en absoluto? ¿O es que no quieres que B retire dos elementos consecutivos de la cola que no fueron elementos consecutivos porque eso de alguna manera tergiversaría una secuencia de eventos? Por ejemplo, si se trata de algún tipo de sistema de registro de datos, (comprensiblemente) no querría huecos en el registro, pero sin una memoria ilimitada, la realidad es que en algún caso de esquina, en algún lugar, probablemente podría sobrepasarlo. su capacidad de cola ...

En cuyo caso, una solución es tener algún tipo de elemento especial que pueda colocarse en la cola, lo que representa el caso de A que descubrió que tenía que soltar elementos. Básicamente mantienes un elemento extra alrededor, que es nulo la mayor parte del tiempo. Cada vez que A va a agregar elementos a la cola, si este elemento extra no es nulo, eso entra. Si A descubre que no hay espacio en la cola, configura este elemento adicional para decir 'hey, la cola estaba llena' .

De esta manera, A nunca bloquea, puede soltar elementos cuando el sistema está muy ocupado, pero no pierde de vista el hecho de que los elementos se descartaron, porque tan pronto como el espacio de cola está disponible, esta marca entra para indicar dónde se produjo la caída de datos. El Proceso B hace lo que necesita hacer cuando descubre que ha sacado este elemento de marca de saturación de la cola.

1

Visual Studio 2010 agrega 2 nuevas bibliotecas que soportan este escenario muy bien, Asynchronous Agents Library y Parallel Pattern Library.

La biblioteca agentes tiene el apoyo o un mensaje asíncrono que pasa y contiene bloques de mensajes para enviar mensajes a los 'objetivos' y para recibir mensajes de 'fuentes'

Un unbounded_buffer es una clase de plantilla que ofrece lo que creo que busca para:

#include <agents.h> 
#include <ppl.h> 
#include <iostream> 

using namespace ::Concurrency; 
using namespace ::std; 

int main() 
{ 
    //to hold our messages, the buffer is unbounded... 
    unbounded_buffer<int> buf1; 
    task_group tasks; 

    //thread 1 sends messages to the unbounded_buffer 
    //without blocking 
    tasks.run([&buf1](){ 
     for(int i = 0 ; i < 10000; ++i) 
     send(&buf1,i) 
    //signal exit 
    send(&buf1,-1); 
    }); 

    //thread 2 receives messages and blocks if there are none 

    tasks.run([&buf1](){ 
     int result; 
     while(result = receive(&buf1)!=-1) 
     { 
      cout << "I got a " << result << endl; 
     } 
    }); 

    //wait for the threads to end 
    tasks.wait(); 
} 
+2

¿Eso realmente funciona bajo la categoría de Linux? –

+0

FWIW, en su bucle de recepción, siempre mostrará "Tengo un 1" porque el! = Se evalúa antes de = –

1
  • por qué no usar STL <list> o <deque> con un mutex ar ¿Agregar/eliminar? ¿Es el thread-safety of STL insuficiente?

  • ¿Por qué no crear su propia clase de nodo de lista vinculada (individualmente/doblemente) que contiene un puntero y hacer heredar los elementos que se agregarán o eliminarán? Por lo tanto, hacer una asignación adicional innecesaria. Acabas de introducir algunos punteros en threadA::add() y threadB::remove() y listo. (Mientras que querría hacer eso bajo un mutex, el efecto de bloqueo en threadA sería insignificante a menos que usted hizo algo realmente malo ...)

  • Si está utilizando pthreads, echa un vistazo a sem_post() y sem_wait(). La idea es que threadB puede bloquearse indefinidamente a través de sem_wait() hasta que threadA ponga algo en la cola. Entonces threadA invoca sem_post(). Que despierta threadB para hacer su trabajo. Después de lo cual threadB puede volver a dormir. Es una forma eficiente de manejar la señalización asincrónica, compatible con cosas como threadA::add() múltiples antes de completar threadB::remove().

Cuestiones relacionadas