2010-01-16 12 views
10

Tengo std::list<Info> infoList en mi aplicación que se comparte entre dos subprocesos. Estos 2 hilos están accediendo a esta lista como sigue:cuál es la mejor manera de sincronizar el acceso al contenedor entre varios subprocesos en la aplicación en tiempo real

Thread 1: utiliza push_back(), pop_front() o clear() en la lista (Dependiendo de la situación) Thread 2: utiliza un iterator para iterar a través de los elementos de la enumera y haz algunas acciones.

Tema 2 está iterando la lista como la siguiente:

for(std::list<Info>::iterator i = infoList.begin(); i != infoList.end(); ++i) 
{ 
    DoAction(i); 
} 

El código se compila usando GCC 4.4.2.

A veces ++ i causa una segfault y bloquea la aplicación. El error se produce en la línea 143 std_list.h en la siguiente línea:

_M_node = _M_node->_M_next; 

creo que esto es una condición de carrera. La lista podría haber cambiado o incluso borrado por el subproceso 1 mientras el subproceso 2 la iteraba.

Utilicé Mutex para sincronizar el acceso a esta lista y todo fue bien durante mi prueba inicial. Pero el sistema simplemente se congela bajo la prueba de estrés, lo que hace que esta solución sea totalmente inaceptable. Esta aplicación es una aplicación en tiempo real y necesito encontrar una solución para que ambos subprocesos puedan ejecutarse lo más rápido posible sin dañar el rendimiento total de las aplicaciones.

Mi pregunta es esta: El hilo 1 y el hilo 2 deben ejecutarse lo más rápido posible ya que esta es una aplicación en tiempo real. ¿Qué puedo hacer para evitar este problema y aún mantener el rendimiento de la aplicación? ¿Hay algún algoritmo libre de bloqueos disponible para tal problema?

Está bien si omito algunos objetos Info recientemente agregados en la iteración de la cadena 2, pero ¿qué puedo hacer para evitar que el iterador se convierta en un puntero colgante?

Gracias

+1

El artículo de Sutters sobre cómo escribir una cola sin bloqueo podría ayudar: http://www.ddj.com/cpp/210604448 –

+0

http://stackoverflow.com/questions/1724630/how-do-i-build- a-lockless-queue –

+0

¿Por qué no puedes usar una cola en lugar de una lista? ¿Cómo se notifica a su cadena de consumidores los nuevos objetos Info? –

Respuesta

4

En general, no es seguro usar contenedores STL de esta manera. Deberá implementar un método específico para hacer que su código sea seguro. La solución que elija depende de sus necesidades. Probablemente resolvería esto manteniendo dos listas, una en cada hilo. Y comunicando los cambios a través de un lock free queue (mencionado en los comentarios a esta pregunta). También puede limitar la vida útil de sus objetos de información envolviéndolos en boost :: shared_ptr, p. Ej.

typedef boost::shared_ptr<Info> InfoReference; 
typedef std::list<InfoReference> InfoList; 

enum CommandValue 
{ 
    Insert, 
    Delete 
} 

struct Command 
{ 
    CommandValue operation; 
    InfoReference reference; 
} 

typedef LockFreeQueue<Command> CommandQueue; 

class Thread1 
{ 
    Thread1(CommandQueue queue) : m_commands(queue) {} 
    void run() 
    { 
     while (!finished) 
     { 
      //Process Items and use 
      // deleteInfo() or addInfo() 
     }; 

    } 

    void deleteInfo(InfoReference reference) 
    { 
     Command command; 
     command.operation = Delete; 
     command.reference = reference; 
     m_commands.produce(command); 
    } 

    void addInfo(InfoReference reference) 
    { 
     Command command; 
     command.operation = Insert; 
     command.reference = reference; 
     m_commands.produce(command); 
    } 
} 

private: 
    CommandQueue& m_commands; 
    InfoList m_infoList; 
} 

class Thread2 
{ 
    Thread2(CommandQueue queue) : m_commands(queue) {} 

    void run() 
    { 
     while(!finished) 
     { 
      processQueue(); 
      processList(); 
     } 
    } 

    void processQueue() 
    { 
     Command command; 
     while (m_commands.consume(command)) 
     { 
      switch(command.operation) 
      { 
       case Insert: 
        m_infoList.push_back(command.reference); 
        break; 
       case Delete: 
        m_infoList.remove(command.reference); 
        break; 
      } 
     } 
    } 

    void processList() 
    { 
     // Iterate over m_infoList 
    } 

private: 
    CommandQueue& m_commands; 
    InfoList m_infoList; 
} 


void main() 
{ 
CommandQueue commands; 

Thread1 thread1(commands); 
Thread2 thread2(commands); 

thread1.start(); 
thread2.start(); 

waitforTermination(); 

} 

Esto no ha sido compilado. Aún debe asegurarse de que el acceso a sus objetos Info sea seguro para subprocesos.

1

para evitar que su iterador de ser invalidado que tienen para bloquear todo el bucle for. Ahora supongo que el primer hilo puede tener dificultades para actualizar la lista. Intentaría darle la oportunidad de hacer su trabajo en cada iteración (o cada enésima iteración).

En pseudo-código que se vería así:

mutex_lock(); 
for(...){ 
    doAction(); 
    mutex_unlock(); 
    thread_yield(); // give first thread a chance 
    mutex_lock(); 
    if(iterator_invalidated_flag) // set by first thread 
    reset_iterator(); 
} 
mutex_unlock(); 
+0

¿Debo reiniciar el iterador al principio de la lista cada vez en reset_iterator(); ? –

+0

@O. Askari: depende de la operación: 'clear' - por supuesto que sí,' push_back' - no es necesario reiniciar. En cuanto a 'pop_front', puede que tenga que reiniciar (como cuando se encuentra al principio de la lista), en cuyo caso debe compartir el iterador entre hilos si desea evitar reinicios innecesarios. – catwalk

-1

No creo que pueda salir de allí sin ninguna sincronización en absoluto en este caso como cierta operación se invalidar los iteradores que está utilizando. Con una lista, esto es bastante limitado (básicamente, si ambos hilos intentan manipular iteradores al mismo elemento al mismo tiempo), pero aún existe el peligro de que elimines un elemento al mismo tiempo que intentas para agregar uno a eso.

¿Está usted por casualidad sosteniendo el candado en DoAction(i)? Obviamente, solo desea mantener el bloqueo durante el tiempo mínimo que puede obtener para maximizar el rendimiento. Del código anterior, creo que querrás descomponer el bucle un tanto para acelerar ambos lados de la operación.

Algo a lo largo de las líneas de:

while (processItems) { 
    Info item; 
    lock(mutex); 
    if (!infoList.empty()) { 
    item = infoList.front(); 
    infoList.pop_front(); 
    } 
    unlock(mutex); 
    DoAction(item); 
    delayALittle(); 
} 

Y la función de inserción todavía tendría que tener este aspecto:

lock(mutex); 
infoList.push_back(item); 
unlock(mutex); 

A menos que es probable que sea masiva la cola, estaría tentado usar algo como std::vector<Info> o incluso std::vector<boost::shared_ptr<Info> > para minimizar la copia de los objetos de información (asumiendo que estos son algo más caros de copiar en comparación con un boost :: shared_ptr. Generalmente, las operaciones en un vector tienden a ser un poco más rápidas que en una lista, especialmente si los objetos almacenados en el vector son pequeños y baratos de copiar.

+0

Gracias, esta solución parece prometedora. La lista no contiene muchos elementos, 20 máx. Y cada elemento de información ocupa unos 128 bytes. Sin embargo, ¿qué ocurre si necesito mantener los elementos procesados ​​en InfoList y no eliminarlos? –

1

Tienes que decidir qué hilo es más importante. Si es el hilo de actualización, debe indicar al hilo del iterador que se detenga, espere y vuelva a comenzar. Si es el hilo del iterador, simplemente puede bloquear la lista hasta que finalice la iteración.

5

Su bucle for() puede mantener un bloqueo durante un tiempo relativamente largo, dependiendo de cuántos elementos itera. Usted puede tener problemas reales si "sondea" la cola, constantemente verificando si un nuevo elemento estuvo disponible. Eso hace que el hilo sea el propietario del mutex durante un tiempo excesivamente largo, dando pocas oportunidades al hilo productor para entrar y agregar un elemento. Y quemando muchos ciclos de CPU innecesarios en el proceso.

Necesita una "cola de bloqueo limitada". No lo escriba usted mismo, el diseño de la cerradura no es trivial. Difíciles de encontrar buenos ejemplos, la mayoría es un código .NET. This article parece prometedor.

0

Debe estar utilizando alguna biblioteca de subprocesos. Si está utilizando Intel TBB, puede usar concurrent_vector o concurrent_queue. Ver this.

3

Me gustaría saber cuál es el propósito de esta lista, entonces sería más fácil responder la pregunta.

Como dijo Hoare, que es generalmente una mala idea para tratar de compartir los datos para comunicarse entre dos hilos, sino que debe comunicar a compartir los datos: es decir, la mensajería.

Si esta lista es el modelado de una cola, por ejemplo, es posible que sólo tiene que utilizar una de las diversas formas de comunicarse (tales como enchufes) entre los dos hilos. El consumidor/productor es un problema estándar y bien conocido.

Si sus artículos son caros, entonces sólo pasan los punteros alrededor durante la comunicación, evitará la copia de los artículos ellos mismos.

En general, es exquisitamente difícil de compartir datos, aunque es por desgracia el único modo de programación se habla de en la escuela.Normalmente, solo la implementación de "canales" de comunicación de bajo nivel debería preocuparse por la sincronización y debería aprender a usar los canales para comunicarse en lugar de tratar de emularlos.

+0

+1 - solo para la cita de Hoare si nada más. La programación multiproceso no es la única respuesta a todos los problemas. –

0

Si desea continuar usando std::list en un entorno de subprocesos múltiples, le recomiendo envolverlo en una clase con un mutex que proporcione acceso bloqueado a él. Dependiendo del uso exacto, podría tener sentido cambiar a un modelo de cola impulsado por eventos donde los mensajes se pasan en una cola que varios hilos de trabajo están consumiendo (sugerencia: productor-consumidor).

Me tomaría en serio Matthieu's thought en consideración. Muchos problemas que se resuelven con la programación de subprocesos múltiples se resuelven mejor mediante el paso de mensajes entre subprocesos o procesos. Si necesita un alto rendimiento y no requiere absolutamente que el procesamiento tenga el mismo espacio de memoria, considere usar algo como el Message-Passing Interface (MPI) en lugar de implementar su propia solución de subprocesos múltiples. Hay un montón de implementaciones de C++ disponible - OpenMPI, Boost.MPI, Microsoft MPI, etc, etc

1

Como se ha mencionado que no le importa si su consumo iteración pierde algunas entradas agregados recientemente, se podría utilizar un contra copias on-write lista debajo. Esto permite que el consumidor iterativo opere en una instantánea consistente de la lista desde el momento en que comenzó, mientras que, en otros hilos, las actualizaciones de la lista producen copias nuevas pero consistentes, sin perturbar ninguna de las instantáneas existentes.

La operación aquí es que cada actualización de la lista requiere el bloqueo de acceso exclusivo el tiempo suficiente para copiar toda la lista. Esta técnica está predispuesta a tener muchos lectores concurrentes y actualizaciones menos frecuentes.

Al intentar agregar un bloqueo intrínseco al contenedor, primero debe pensar qué operaciones deben comportarse en grupos atómicos. Por ejemplo, comprobar si la lista está vacía antes de intentar extraer el primer elemento requiere una operación atómica pop-if-not-empty; de lo contrario, la respuesta a la lista que está vacía puede cambiar en el medio cuando la persona que llama recibe la respuesta e intenta actuar en consecuencia.

No está claro en su ejemplo anterior lo que garantiza que la iteración debe obedecer. ¿Debe cada elemento de la lista eventualmente ser visitado por el hilo iterativo? ¿Hace pases múltiples? ¿Qué significa que un hilo elimine un elemento de la lista mientras otro hilo ejecuta DoAction() en su contra? Sospecho que trabajar en estas preguntas conducirá a cambios significativos en el diseño.


Estás trabajando en C++ y mencionaste que necesitan una cola con un emergente si-no-vacío operación. Escribí un two-lock queue hace muchos años usando las primitivas de concurrencia de ACE Library, ya que el Boost thread library aún no estaba listo para su uso en producción, y la posibilidad de que la Biblioteca Estándar de C++ incluyera tales instalaciones era un sueño lejano. Transmitirlo a algo más moderno sería fácil.

Esta cola mía, llamada concurrent::two_lock_queue, permite el acceso al jefe de la cola solo a través de RAII. Esto asegura que la adquisición de la cerradura para leer la cabeza siempre se combinará con una liberación de la cerradura. Un consumidor construye un const_front (acceso directo al elemento principal), un front (elemento no const al elemento principal) o un renewable_front (acceso no const al elemento principal y sucesor) para representar el derecho exclusivo de acceder al elemento principal de la cola. Dichos objetos "frontales" no se pueden copiar.

Clase two_lock_queue también ofrece una función pop_front() que espera hasta que al menos un elemento está disponible para ser retirado, pero, de acuerdo con std::queue 's y std::stack' estilo de no mezclar la mutación de contenedores y la copia de valor, pop_front() void Devuelve s.

En un archivo acompañante, hay un tipo llamado concurrent::unconditional_pop, que permite asegurar a través de RAII que el elemento principal de la cola saltará al salir del alcance actual.

El archivo complementario error.hh define las excepciones que surgen del uso de la función two_lock_queue::interrupt(), utilizada para desbloquear hilos que esperan acceso al encabezado de la cola.

Eche un vistazo al código y avíseme si necesita más explicaciones sobre cómo usarlo.

+0

DoAction() debe procesar todos los elementos de la lista, pero otro hilo puede eliminar un elemento durante su ejecución. Esto no va a hacer ningún daño, ya que puede verificar la validez del objeto Info. La lista no se puede vaciar después del ciclo ya que se va a usar en otro lugar. He usado una instantánea para el iterador, pero sigo teniendo problemas sobre cómo hacer un pop-if-not-empty atómico. Gracias –

1

La mejor manera de hacerlo es utilizar un contenedor que esté sincronizado internamente. TBB y la concurrent_queue de Microsoft hacen esto. Anthony Williams también tiene una buena aplicación en su blog here

1

Otros ya han sugerido alternativas sin bloqueo, así que voy a responder como si estuviera atrapado usando cerraduras ...

Cuando se modifica una lista , los iteradores existentes pueden quedar invalidados porque ya no apuntan a la memoria válida (la lista reasigna automáticamente más memoria cuando necesita crecer). Para evitar los iteradores invalidados, podría hacer que el productor bloquee un mutex mientras su consumidor cruza la lista, pero eso sería innecesariamente esperando para el productor.

La vida sería más fácil si utilizara una cola en lugar de una lista, y haga que su consumidor use una llamada sincronizada queue<Info>::pop_front() en lugar de iteradores que pueden invalidarse a sus espaldas. Si su consumidor realmente necesita engullir trozos de Info a la vez, use un condition variable que hará que su consumidor bloquee hasta el queue.size() >= minimum.

La biblioteca Boost tiene una buena implementación portátil de las variables de condición (que incluso funciona con versiones anteriores de Windows), así como el usual threading library stuff.

Para una cola productor-consumidor que utiliza el bloqueo (anticuado), consulte la clase de plantilla BlockingQueue de la biblioteca ZThreads. No he usado ZThreads yo mismo, estoy preocupado por la falta de actualizaciones recientes, y porque no parece ser ampliamente utilizado. Sin embargo, lo he usado como inspiración para lanzar mi propia cola productor-consumidor segura para subprocesos (antes de conocer las colas lock-free y TBB).

Parece que hay una biblioteca de colas/cola sin bloqueo en la cola de revisión de Boost. ¡Esperemos que veamos un nuevo Boost.Lockfree en el futuro cercano! :)

Si hay interés, puedo escribir un ejemplo de una cola de bloqueo que usa std :: queue y Boost thread locking.

EDITAR:

El blog al que hace referencia la respuesta de Rick ya tiene una cola de bloqueo ejemplo que utiliza std :: cola y Boost condvars.Si el consumidor necesita para engullir trozos, se puede extender el ejemplo de la siguiente manera:

void wait_for_data(size_t how_many) 
    { 
     boost::mutex::scoped_lock lock(the_mutex); 
     while(the_queue.size() < how_many) 
     { 
      the_condition_variable.wait(lock); 
     } 
    } 

Usted también puede querer modificarlo para permitir tiempos de descanso y cancelaciones.

Mencionó que la velocidad era una preocupación. Si su Info s son pesados, debe considerar pasarlos por shared_ptr. También puede intentar hacer que su tamaño fijo sea Info y usar un memory pool (que puede ser mucho más rápido que el montón).

1

Si estás usando C++ 0x lista se podría sincronizar internamente iteración de esta manera:

Suponiendo que la clase tiene una lista de plantilla llamado objects_, y un impulso :: mutex llamado mutex_

el método es un método toall miembro de la envoltura de la lista

void toAll(std::function<void (T*)> lambda) 
{ 
boost::mutex::scoped_lock(this->mutex_); 
for(auto it = this->objects_.begin(); it != this->objects_.end(); it++) 
{ 
     T* object = it->second; 
     if(object != nullptr) 
     { 
       lambda(object); 
      } 
     } 
} 

Calling:

synchronizedList1->toAll(
     [&](T* object)->void // Or the class that your list holds 
     { 
      for(auto it = this->knownEntities->begin(); it != this->knownEntities->end(); it++) 
      { 
       // Do something 
      } 
     } 
); 
Cuestiones relacionadas