2010-09-28 6 views
6

Estoy a punto de implementar un hilo de trabajo con cola de elementos de trabajo, y mientras pensaba en el problema, quería saber si estoy haciendo lo mejor.Primitiva de sincronización más liviana para la fila de hilos del trabajador

El hilo en cuestión tendrá que tener algunos datos locales de subprocesos (preinicializados en la construcción) y hará un bucle en los elementos de trabajo hasta que se cumpla alguna condición.

pseudocódigo:

volatile bool run = true; 

int WorkerThread(param) 
{ 
    localclassinstance c1 = new c1(); 
    [other initialization] 

    while(true) { 
     [LOCK] 
     [unqueue work item] 
     [UNLOCK] 
     if([hasWorkItem]) { 
      [process data] 
      [PostMessage with pointer to data] 
     } 
     [Sleep] 

     if(!run) 
      break; 
    } 

    [uninitialize] 
    return 0; 
} 

supongo que haré el bloqueo a través de la sección crítica, como se std :: vector de la cola o std :: cola, pero tal vez hay una mejor manera.

La parte con suspensión no parece demasiado grande, ya que habrá una gran cantidad de sueño adicional con grandes valores de reposo, o mucho bloqueo extra cuando el valor de reposo es pequeño, y eso es definitivamente innecesario.

Pero no puedo pensar en una primitiva amistosa WaitForSingleObject que podría utilizar en lugar de una sección crítica, ya que podría haber dos hilos en cola de elementos de trabajo al mismo tiempo. Así que Event, que parece ser el mejor candidato, puede perder el segundo elemento de trabajo si el Evento ya se estableció, y no garantiza una exclusión mutua.

Quizás haya incluso un mejor enfoque con el tipo de funciones InterlockedExchange que conduzca a una serialización aún menor.

P.S .: Puede que necesite preprocesar toda la cola y soltar los elementos de trabajo obsoletos durante la etapa de desencadenamiento.

+0

Si el bloqueo y el desbloqueo no es caro, se está llevando a cabo el sueño es un problema? – DumbCoder

+0

La serialización de 3 o 4 hilos, la grabación de la CPU y, posiblemente, la ejecución de cerraduras de convoyes, probablemente no sea algo muy bueno. Y si entiendo correctamente esto es lo que sucederá sin Sleep o WaitForSingleObject. Por otra parte, tal vez estoy equivocado. – Coder

+0

¿Cómo se entregan los elementos de trabajo a la cola? –

Respuesta

5

Hay una multitud de formas de hacerlo.

Una opción es usar un semáforo para la espera. El semáforo se señala cada vez que se inserta un valor en la cola, por lo que el subproceso de trabajo solo se bloqueará si no hay elementos en la cola. Esto aún requerirá una sincronización por separado en la cola misma.

Una segunda opción es utilizar un evento de restablecimiento manual que se establece cuando hay elementos en la cola y se borran cuando la cola está vacía. De nuevo, deberá hacer una sincronización por separado en la cola.

Una tercera opción es crear una ventana invisible de solo mensaje en el hilo y usar un mensaje especial WM_USER o WM_APP para publicar elementos en la cola, adjuntando el elemento al mensaje mediante un puntero.

Otra opción es usar condition variables. Las variables de condición nativas de Windows solo funcionan si está dirigido a Windows Vista o Windows 7, pero las variables de condición también están disponibles para Windows XP con Boost o una implementación de la biblioteca de subprocesos C++ 0x. Una cola de ejemplo que usa variables de condición de impulso está disponible en mi blog: http://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html

+0

La solución de eventos parece ser la mejor, desafortunadamente, parece que tengo que estudiar y refactorizar más debido al COM. CoInitialize parece ser incompatible con WaitForSingleObject. las variables condicionales se ven bien, pero no tengo la dependencia Boost en este proyecto y tengo que apoyar WinXp – Coder

+0

IIRC, COM requiere un suministro de mensajes, así que use 'MsgWaitForMultipleObjects' lugar, y los mensajes de las ventanas de la bomba si tiene alguno. –

+0

si COM es un requisito, entonces también podría simplemente enviar mensajes de subprocesos, o usar una ventana de tipo MESSAGE_HWND ya que el destino como el mecanismo de bomba de mensajes de Windows ya se ha instanciado en el subproceso. –

1

Hay varias maneras de hacer esto

Por un lado se podría crear un evento en lugar denominado 'correr' y luego utilizar eso para detectar cuando el hilo debe terminar, el hilo principal, entonces las señales. En lugar de dormir, entonces usaría WaitForSingleObject con un tiempo de espera, de esa manera se cerrará directamente en lugar de esperar a que se agote el ms.

Otra forma es aceptar los mensajes en su bucle y luego inventar un mensaje definido por el usuario que publique al hilo

EDIT: dependiendo de la situación, también puede ser conveniente tener otro hilo que supervisa este hilo para compruebe si está muerto o no, esto se puede hacer mediante la cola de mensajes mencionada anteriormente, por lo que responder a un determinado mensaje dentro de x ms significaría que el hilo no se ha bloqueado.

0

Use un semáforo en lugar de un evento.

+0

en este caso creo que un evento sería suficiente porque es booleano en su naturaleza. –

1

Me reestructurar una bits:

WorkItem GetWorkItem() 
{ 
    while(true) 
    { 
     WaitForSingleObject(queue.Ready); 
     { 
      ScopeLock lock(queue.Lock); 
      if(!queue.IsEmpty()) 
      { 
       return queue.GetItem(); 
      } 
     } 
    } 
} 

int WorkerThread(param) 
{ 
    bool done = false; 
    do 
    { 
     WorkItem work = GetWorkItem(); 
     if(work.IsQuitMessage()) 
     { 
      done = true; 
     } 
     else 
     { 
      work.Process(); 
     } 
    } while(!done); 

    return 0; 
} 

Puntos de interés:

  1. ScopeLock es una clase RAII para hacer uso de la sección crítica más seguro.
  2. Bloque en el evento hasta que workitem esté (posiblemente) listo - luego cancele mientras intente para dequearlo.
  3. no utilice una bandera global "IsDone", ponga en cola mensaje de abandono especial WorkItem s.
+0

> WaitForSingleObject (queue.Ready); ¿Está a la espera del evento? ¿Qué pasa si el hilo 1 programa un elemento, señala el evento, al mismo tiempo subprocesos y elementos y también señala el evento. El hilo de trabajo se reanuda, procesa un elemento, restablece el evento y vuelve a WaitForSingleObject, dejando un elemento colgando. El RAII y el mensaje de renuncia es una buena idea. – Coder

+1

Si la memoria me sirve correctamente, lo siguiente debería funcionar: use el restablecimiento manual de eventos y reinicie en dequeue si workitems cae a cero. – snemarch

0

Mantenga la señalización y la sincronización por separado. Algo en esta línea ...

// in main thread 

HANDLE events[2]; 
events[0] = CreateEvent(...); // for shutdown 
events[1] = CreateEvent(...); // for work to do 

// start thread and pass the events 

// in worker thread 

DWORD ret; 
while (true) 
{ 
    ret = WaitForMultipleObjects(2, events, FALSE, <timeout val or INFINITE>); 

    if shutdown 
     return 
    else if do-work 
     enter crit sec 
     unqueue work 
     leave crit sec 
     etc. 
    else if timeout 
     do something else that has to be done 
} 
2

La primitiva de bloqueo más rápido suele ser un bloqueo de giro o bloqueo de bloqueo de giro. CRITICAL_SECTION es simplemente un spin-sleep-lock (espacio de usuario). (Bueno, aparte de no usar primitivas de bloqueo, por supuesto, pero eso significa usar estructuras de datos sin enclavamiento, y esas son realmente difíciles de manejar).

En cuanto a evitar el sueño: echar un vistazo en condiciones-variables Están diseñados para usarse junto con un "mutex", y creo que son mucho más fáciles de usar correctamente que los EVENT de Windows.

Boost.Thread tiene una buena aplicación portátil de ambos, de espacio de usuario rápida spin-sueño-locks y variables de condición:

http://www.boost.org/doc/libs/1_44_0/doc/html/thread/synchronization.html#thread.synchronization.condvar_ref

Una obra-cola utilizando Boost.Thread podría ser algo como esto :

template <class T> 
class Queue : private boost::noncopyable 
{ 
public: 
    void Enqueue(T const& t) 
    { 
     unique_lock lock(m_mutex); 

     // wait until the queue is not full 
     while (m_backingStore.size() >= m_maxSize) 
      m_queueNotFullCondition.wait(lock); // releases the lock temporarily 

     m_backingStore.push_back(t); 
     m_queueNotEmptyCondition.notify_all(); // notify waiters that the queue is not empty 
    } 

    T DequeueOrBlock() 
    { 
     unique_lock lock(m_mutex); 

     // wait until the queue is not empty 
     while (m_backingStore.empty()) 
      m_queueNotEmptyCondition.wait(lock); // releases the lock temporarily 

     T t = m_backingStore.front(); 
     m_backingStore.pop_front(); 

     m_queueNotFullCondition.notify_all(); // notify waiters that the queue is not full 

     return t; 
    } 

private: 
    typedef boost::recursive_mutex mutex; 
    typedef boost::unique_lock<boost::recursive_mutex> unique_lock; 

    size_t const m_maxSize; 

    mutex mutable m_mutex; 
    boost::condition_variable_any m_queueNotEmptyCondition; 
    boost::condition_variable_any m_queueNotFullCondition; 

    std::deque<T> m_backingStore; 
}; 
+0

¿Conoces un tutorial que explique las variables de condición? Todavía tengo que ver un ejemplo de variable de condición que demuestre cómo son mejores que un semáforo de Windows o un objeto de evento. Seguro que hay un objeto de sección crítica asociado, pero la forma de utilizar esto para lograr algo es completamente obvio para los no iniciados. –

0

Teniendo en cuenta que esta pregunta se etiqueta ventanas, Ill respuesta así:

No cree 1 subproceso de trabajo. ¿Sus trabajos de hilo de trabajo son presumiblemente independientes, por lo que puede procesar varios trabajos a la vez? En caso afirmativo:

  • En su hilo principal, llame a CreateIOCompletionPort para crear un objeto de puerto io completion.
  • Cree un grupo de subprocesos de trabajo. El número que necesita crear depende de cuántos trabajos desee realizar en paralelo. Algunos múltiplos de la cantidad de núcleos de CPU es un buen comienzo.
  • Cada vez que un trabajo entra en una llamada PostQueuedCompletionStatus() pasa un puntero a la estructura del trabajo como la estructura lpOverlapped.
  • Cada subproceso de trabajo llama a GetQueuedCompletionItem(): recupera el elemento de trabajo desde el puntero lpOverlapped y realiza el trabajo antes de volver a GetQueuedCompletionStatus.

Esto parece pesado, pero io puertos de finalización se implementan en modo kernel y representan una cola que se puede deserializar en cualquiera de los subprocesos de trabajo asociados a la cola (es decir, a la espera de una llamada a GetQueuedCompletionStatus). El puerto de finalización io sabe cuántos de los subprocesos que procesan un elemento están realmente usando una CPU vs bloqueado en una llamada de E/S - y lanzará más subprocesos de trabajo del grupo para asegurarse de que se cumple el conteo de simultaneidad.

Por lo tanto, no es liviano, pero es muy muy eficiente ... el puerto de terminación se puede asociar con mangos de tubería y zócalo, por ejemplo, y puede extraer los resultados de las operaciones asincrónicas en esos controladores. Los diseños de puertos de terminación pueden escalar para manejar 10 de miles de conectores de socket en un único servidor, pero en el lado de escritorio del mundo constituyen una forma muy conveniente de escalar el procesamiento de trabajos en los 2 o 4 núcleos ahora comunes en las PC de escritorio.

+1

El mayor problema de los diseños basados ​​en puertos de finalización escritos en el C++ moderno es que todos los subprocesos en un proceso comparten acceso a un solo montón, y el STL tiende a demoler mucho el montón. Como resultado, puede encontrar el rendimiento limitado por cada uno de los subprocesos de trabajo que compiten por bloqueos de montón más que cualquier otra cosa. –

+0

Lamentablemente, tengo que usar componentes COM, y ThreadPools no son muy amigables con ellos. – Coder

+0

sin saber cómo está utilizando los objetos COM hace que sea difícil de juzgar. Si los objetos COM forman parte del trabajo, sí, hay un problema. Si los objetos com se usan para hacer el trabajo, cada subproceso de trabajo es su propio apartamento, solo necesita un almacén de TLS para obtener los objetos de trabajador relacionados con el subproceso actual. –

3

Es posible compartir un recurso entre subprocesos sin usar bloqueos de bloqueo en absoluto, si su escenario cumple con ciertos requisitos.

Necesita una primitiva de intercambio de puntero atómico, como InterlockedExchange de Win32. La mayoría de las arquitecturas de procesador ofrecen algún tipo de intercambio atómico, y generalmente es mucho menos costoso que adquirir un bloqueo formal.

Puede almacenar los elementos de la cola de trabajo en una variable de puntero a la que tienen acceso todos los hilos que estarán interesados ​​en ella. (Var global, o campo de un objeto al que todos los subprocesos tienen acceso)

En este escenario se supone que los subprocesos implicados siempre tienen algo que ver, y solo ocasionalmente "vistazo" al recurso compartido. Si desea un diseño donde los hilos bloquean la entrada en espera, utilice un objeto de evento de bloqueo tradicional.

Antes de que algo comience, cree su cola o objeto de la lista de elemento de trabajo y asígnelo a la variable del puntero compartido.

Ahora, cuando los productores desean insertar algo en la cola, "adquieren" acceso exclusivo al objeto de cola intercambiando un valor nulo en la variable de puntero compartida mediante InterlockedExchange. Si el resultado del intercambio devuelve un valor nulo, entonces alguien más está modificando el objeto de la cola. Duerme (0) para liberar el resto de la división de tiempo de tu subproceso, luego realiza un ciclo para volver a intentar el intercambio hasta que no sea nulo. Incluso si termina haciendo bucles varias veces, esto es mucho. muchas veces más rápido que hacer una llamada al kernel para adquirir un objeto mutex. Las llamadas al kernel requieren cientos de ciclos de reloj para pasar al modo kernel.

Cuando obtenga correctamente el puntero, realice las modificaciones en la cola y luego vuelva a colocar el puntero de la cola en el puntero compartido.

Al consumir elementos de la cola, usted hace lo mismo: intercambie un nulo en el puntero compartido y bucleo hasta que obtenga un resultado no nulo, opere en el objeto en la var local y luego vuelva a colocarlo en el var. puntero compartido

Esta técnica es una combinación de intercambio atómico y bucles de giro breves. Funciona bien en escenarios donde los hilos involucrados no están bloqueados y las colisiones son raras. La mayoría de las veces, el intercambio le dará acceso exclusivo al objeto compartido en el primer intento, y siempre que el tiempo que el objeto queue sea mantenido exclusivamente por un hilo sea muy corto, ningún hilo debería tener que pasar más de un ciclo. algunas veces antes de que el objeto de la cola vuelva a estar disponible.

Si espera mucha disputa entre hilos en su escenario, o si desea un diseño donde los hilos pasan la mayor parte del tiempo bloqueados esperando a que llegue el trabajo, puede ser mejor atendido por un objeto de sincronización mutex formal.

Cuestiones relacionadas