2009-05-29 14 views
8

Me gustaría crear una clase cuyos métodos se puedan invocar desde varios subprocesos. pero en lugar de ejecutar el método en el hilo del que se llamó, debe realizarlos todos en su propio hilo. No es necesario devolver ningún resultado y no debe bloquear el hilo de llamada.Event/Task Queue Multithreading C++

Un primer intento Implementación que he incluido a continuación. Los métodos públicos insertan un puntero de función y datos en una Cola de trabajos, que luego recoge el hilo del trabajador. Sin embargo, no es particularmente bonito el código y agregar nuevos métodos es engorroso.

Idealmente, me gustaría utilizar esto como una clase base que puedo agregar fácilmente métodos (con un número variable de argumentos) con la duplicación mínima de hastle y código.

¿Cuál es la mejor manera de hacerlo? ¿Hay algún código disponible que haga algo similar? Gracias

#include <queue> 

using namespace std; 

class GThreadObject 
{ 
    class event 
    { 
     public: 
     void (GThreadObject::*funcPtr)(void *); 
     void * data; 
    }; 

public: 
    void functionOne(char * argOne, int argTwo); 

private: 
    void workerThread(); 
    queue<GThreadObject::event*> jobQueue; 
    void functionOneProxy(void * buffer); 
    void functionOneInternal(char * argOne, int argTwo); 

}; 



#include <iostream> 
#include "GThreadObject.h" 

using namespace std; 

/* On a continuous loop, reading tasks from queue 
* When a new event is received it executes the attached function pointer 
* It should block on a condition, but Thread code removed to decrease clutter 
*/ 
void GThreadObject::workerThread() 
{ 
    //New Event added, process it 
    GThreadObject::event * receivedEvent = jobQueue.front(); 

    //Execute the function pointer with the attached data 
    (*this.*receivedEvent->funcPtr)(receivedEvent->data); 
} 

/* 
* This is the public interface, Can be called from child threads 
* Instead of executing the event directly it adds it to a job queue 
* Then the workerThread picks it up and executes all tasks on the same thread 
*/ 
void GThreadObject::functionOne(char * argOne, int argTwo) 
{ 

    //Malloc an object the size of the function arguments 
    int argumentSize = sizeof(char*)+sizeof(int); 
    void * myData = malloc(argumentSize); 
    //Copy the data passed to this function into the buffer 
    memcpy(myData, &argOne, argumentSize); 

    //Create the event and push it on to the queue 
    GThreadObject::event * myEvent = new event; 
    myEvent->data = myData; 
    myEvent->funcPtr = &GThreadObject::functionOneProxy; 
    jobQueue.push(myEvent); 

    //This would be send a thread condition signal, replaced with a simple call here 
    this->workerThread(); 
} 

/* 
* This handles the actual event 
*/ 
void GThreadObject::functionOneInternal(char * argOne, int argTwo) 
{ 
    cout << "We've made it to functionTwo char*:" << argOne << " int:" << argTwo << endl; 

    //Now do the work 
} 

/* 
* This is the function I would like to remove if possible 
* Split the void * buffer into arguments for the internal Function 
*/ 
void GThreadObject::functionOneProxy(void * buffer) 
{ 
    char * cBuff = (char*)buffer; 
    functionOneInternal((char*)*((unsigned int*)cBuff), (int)*(cBuff+sizeof(char*))); 
}; 

int main() 
{ 
    GThreadObject myObj; 

    myObj.functionOne("My Message", 23); 

    return 0; 
} 

Respuesta

6

Hay una biblioteca Futures haciendo su camino en Boost y la biblioteca estándar de C++. También hay algo del mismo tipo en ACE, pero odiaría recomendarlo a cualquiera (como @lothar ya señaló, es Objeto Activo).

+0

Estaba buscando boost :: future, pero como no forma parte de una versión de boost lanzada, tuve que recurrir a mi ACE de confianza :-) – lothar

+0

La biblioteca de futuros formará parte de Boost 1.41. También está disponible como parte de mi implementación de la biblioteca de hilos C++ 0x en http://www.stdthread.co.uk –

+0

Gracias, Anthony. Encantado de saber de ti :) –

2

La biblioteca POCO tiene algo en la misma línea llamada ActiveMethod (junto con alguna funcionalidad relacionada, por ejemplo, ActiveResult) en la sección de subprocesamiento. El código fuente está disponible y es fácil de entender.

1

Para la extensibilidad y mantenibilidad (y otras capacidades) se puede definir una clase abstracta (o interfaz) para el "trabajo" que el subproceso va a realizar. Luego, los usuarios de su grupo de subprocesos implementarían esta interfaz y darían referencia al objeto al grupo de subprocesos. Esto es muy similar al diseño de Objeto activo de Symbian: todas las subclases de AO de CActive y tienen que implementar métodos como Ejecutar() y Cancelar().

Para simplificar la interfaz (clase abstracta) podría ser tan simple como:

class IJob 
{ 
    virtual Run()=0; 
}; 

A continuación, el grupo de subprocesos, o las solicitudes de rosca aceptar solo tendría algo como:

class CThread 
{ 
    <...> 
public: 
    void AddJob(IJob* iTask); 
    <...> 
}; 

Naturalmente que sería tener múltiples tareas que pueden tener todo tipo de setters/getters/attributes extra y lo que sea que necesites en cualquier aspecto de la vida. Sin embargo, el único requisito es poner en práctica el método Run(), que lleve a cabo los cálculos largos:

class CDumbLoop : public IJob 
{ 
public: 
    CDumbJob(int iCount) : m_Count(iCount) {}; 
    ~CDumbJob() {}; 
    void Run() 
    { 
     // Do anything you want here 
    } 
private: 
    int m_Count; 
}; 
+0

Este es el método exacto que utilizamos para desarrollar juegos en sistemas de próxima generación en mi compañía (para un bono de rendimiento adicional, busca agregar elementos a la cola de trabajo a través de métodos sin bloqueo) –

+0

Cualquier consejo en la implementación de la cola sin bloqueo? –

+0

Hay bastantes buenos ejemplos de colas sin bloqueos en la red (es una de las estructuras más fáciles de implementar sin bloqueo). Este es el que me cortó los dientes cuando subí al tren sin cerrojo: http://www.boyet.com/Articles/LockfreeQueue.html –

2

Puede resolver esto mediante el uso -library hilo de Boost. Algo como esto (media pseudo):


class GThreadObject 
{ 
     ... 

     public: 
       GThreadObject() 
       : _done(false) 
       , _newJob(false) 
       , _thread(boost::bind(&GThreadObject::workerThread, this)) 
       { 
       } 

       ~GThreadObject() 
       { 
         _done = true; 

         _thread.join(); 
       } 

       void functionOne(char *argOne, int argTwo) 
       { 
         ... 

         _jobQueue.push(myEvent); 

         { 
           boost::lock_guard l(_mutex); 

           _newJob = true; 
         } 

         _cond.notify_one(); 
       } 

     private: 
       void workerThread() 
       { 
         while (!_done) { 
           boost::unique_lock l(_mutex); 

           while (!_newJob) { 
             cond.wait(l); 
           } 

           Event *receivedEvent = _jobQueue.front(); 

           ... 
         } 
       } 

     private: 
       volatile bool    _done; 
       volatile bool    _newJob; 
       boost::thread    _thread; 
       boost::mutex    _mutex; 
       boost::condition_variable _cond; 
       std::queue<Event*>  _jobQueue; 
}; 

Además, tenga en cuenta cómo RAII nos permiten obtener el código más pequeño y mejor de manejar.

+1

¿Está std :: queue :: push threadsafe? Parece que el functionOne lock_guard debe ir antes de la llamada _jobQueue.push. –

1

Aquí hay una clase que escribí para un propósito similar (la utilizo para el manejo de eventos pero, por supuesto, puedes cambiarle el nombre a ActionQueue y renombrar sus métodos).

que lo utilice como esto:

Con la función que desea llamar: void foo (const int x, const int y) { /*...*/ }

Y: EventQueue q;

q.AddEvent (boost :: bind (foo, 10, 20));

En el subproceso de trabajo

q.PlayOutEvents();

Nota: debería ser bastante fácil agregar código para bloquear en condición para evitar el uso de ciclos de CPU.

El código (Visual Studio 2003 con impulso 1.34.1):

#pragma once 

#include <boost/thread/recursive_mutex.hpp> 
#include <boost/function.hpp> 
#include <boost/signals.hpp> 
#include <boost/bind.hpp> 
#include <boost/foreach.hpp> 
#include <string> 
using std::string; 


// Records & plays out actions (closures) in a safe-thread manner. 

class EventQueue 
{ 
    typedef boost::function <void()> Event; 

public: 

    const bool PlayOutEvents() 
    { 
     // The copy is there to ensure there are no deadlocks. 
     const std::vector<Event> eventsCopy = PopEvents(); 

     BOOST_FOREACH (const Event& e, eventsCopy) 
     { 
      e(); 
      Sleep (0); 
     } 

     return eventsCopy.size() > 0; 
    } 

    void AddEvent (const Event& event) 
    { 
     Mutex::scoped_lock lock (myMutex); 

     myEvents.push_back (event); 
    } 

protected: 

    const std::vector<Event> PopEvents() 
    { 
     Mutex::scoped_lock lock (myMutex); 

     const std::vector<Event> eventsCopy = myEvents; 
     myEvents.clear(); 

     return eventsCopy; 
    } 

private: 

    typedef boost::recursive_mutex Mutex; 
    Mutex myMutex; 

    std::vector <Event> myEvents; 

}; 

espero que esto ayude. :)

Martin Bilski

+0

Recomiendo usar mutexes (o cualquier otra forma de "primitiva de sincronización") ya que no se escalan en absoluto con múltiples procesadores (después de aproximadamente 4-8, realmente disminuyen el rendimiento). Mire en la codificación sin bloqueo para implementaciones verdaderamente escalables. Además, si necesita usar una primitiva de sincronización, use una sección crítica ya que son más rápidos que un mutex (mutex es seguro para el proceso, la sección crítica es segura para subprocesos, es decir, utiliza un mutex al sincronizar procesos, CS cuando sincroniza subprocesos en el mismo proceso) –

0

Usted debe echar un vistazo a la biblioteca Boost ASIO. Está diseñado para despachar eventos de forma asíncrona. Se puede emparejar con la biblioteca Boost Thread para construir el sistema que describió.

Necesitará crear una instancia de un solo objeto boost::asio::io_service y programar una serie de eventos asincrónicos (boost::asio::io_service::post o boost::asio::io_service::dispatch). A continuación, llama a la función de miembro run de n hilos. El objeto io_service es seguro para subprocesos y garantiza que sus controladores asincrónicos solo se distribuirán en un subproceso desde el que llamó al io_service::run.

El objeto boost::asio::strand también es útil para la sincronización simple de hilos.

Por lo que vale, creo que la biblioteca ASIO es una solución muy elegante a este problema.

1

A continuación se muestra una implementación que no requiere un método "functionProxy". Aunque es más fácil agregar nuevos métodos, sigue siendo complicado.

Boost :: Bind y "Futures" parecen que ordenarían mucho de esto. Creo que echaré un vistazo al código de impulso y veré cómo funciona. Gracias a todos por sus sugerencias.

GThreadObject.h

#include <queue> 

using namespace std; 

class GThreadObject 
{ 

    template <int size> 
    class VariableSizeContainter 
    { 
     char data[size]; 
    }; 

    class event 
    { 
     public: 
     void (GThreadObject::*funcPtr)(void *); 
     int dataSize; 
     char * data; 
    }; 

public: 
    void functionOne(char * argOne, int argTwo); 
    void functionTwo(int argTwo, int arg2); 


private: 
    void newEvent(void (GThreadObject::*)(void*), unsigned int argStart, int argSize); 
    void workerThread(); 
    queue<GThreadObject::event*> jobQueue; 
    void functionTwoInternal(int argTwo, int arg2); 
    void functionOneInternal(char * argOne, int argTwo); 

}; 

GThreadObject.cpp

#include <iostream> 
#include "GThreadObject.h" 

using namespace std; 

/* On a continuous loop, reading tasks from queue 
* When a new event is received it executes the attached function pointer 
* Thread code removed to decrease clutter 
*/ 
void GThreadObject::workerThread() 
{ 
    //New Event added, process it 
    GThreadObject::event * receivedEvent = jobQueue.front(); 

    /* Create an object the size of the stack the function is expecting, then cast the function to accept this object as an argument. 
    * This is the bit i would like to remove 
    * Only supports 8 byte argument size e.g 2 int's OR pointer + int OR myObject8bytesSize 
    * Subsequent data sizes would need to be added with an else if 
    * */ 
    if (receivedEvent->dataSize == 8) 
    { 
     const int size = 8; 

     void (GThreadObject::*newFuncPtr)(VariableSizeContainter<size>); 
     newFuncPtr = (void (GThreadObject::*)(VariableSizeContainter<size>))receivedEvent->funcPtr; 

     //Execute the function 
     (*this.*newFuncPtr)(*((VariableSizeContainter<size>*)receivedEvent->data)); 
    } 

    //Clean up 
    free(receivedEvent->data); 
    delete receivedEvent; 

} 

void GThreadObject::newEvent(void (GThreadObject::*funcPtr)(void*), unsigned int argStart, int argSize) 
{ 

    //Malloc an object the size of the function arguments 
    void * myData = malloc(argSize); 
    //Copy the data passed to this function into the buffer 
    memcpy(myData, (char*)argStart, argSize); 

    //Create the event and push it on to the queue 
    GThreadObject::event * myEvent = new event; 
    myEvent->data = (char*)myData; 
    myEvent->dataSize = argSize; 
    myEvent->funcPtr = funcPtr; 
    jobQueue.push(myEvent); 

    //This would be send a thread condition signal, replaced with a simple call here 
    this->workerThread(); 

} 

/* 
* This is the public interface, Can be called from child threads 
* Instead of executing the event directly it adds it to a job queue 
* Then the workerThread picks it up and executes all tasks on the same thread 
*/ 
void GThreadObject::functionOne(char * argOne, int argTwo) 
{ 
    newEvent((void (GThreadObject::*)(void*))&GThreadObject::functionOneInternal, (unsigned int)&argOne, sizeof(char*)+sizeof(int)); 
} 

/* 
* This handles the actual event 
*/ 
void GThreadObject::functionOneInternal(char * argOne, int argTwo) 
{ 
    cout << "We've made it to functionOne Internal char*:" << argOne << " int:" << argTwo << endl; 

    //Now do the work 
} 

void GThreadObject::functionTwo(int argOne, int argTwo) 
{ 
    newEvent((void (GThreadObject::*)(void*))&GThreadObject::functionTwoInternal, (unsigned int)&argOne, sizeof(int)+sizeof(int)); 
} 

/* 
* This handles the actual event 
*/ 
void GThreadObject::functionTwoInternal(int argOne, int argTwo) 
{ 
    cout << "We've made it to functionTwo Internal arg1:" << argOne << " int:" << argTwo << endl; 
} 

main.cpp

#include <iostream> 
#include "GThreadObject.h" 

int main() 
{ 

    GThreadObject myObj; 

    myObj.functionOne("My Message", 23); 
    myObj.functionTwo(456, 23); 


    return 0; 
} 

Editar: simplemente para la corrección Hice una aplicación con boost :: bind.Las diferencias clave:

queue<boost::function<void()> > jobQueue; 

void GThreadObjectBoost::functionOne(char * argOne, int argTwo) 
{ 
    jobQueue.push(boost::bind(&GThreadObjectBoost::functionOneInternal, this, argOne, argTwo)); 

    workerThread(); 
} 

void GThreadObjectBoost::workerThread() 
{ 
    boost::function<void()> func = jobQueue.front(); 
    func(); 
} 

Uso de la aplicación impulso de 10.000.000 iteraciones de functionOne() se tomaron ~ 19sec. Sin embargo, la implementación sin impulso solo tomó ~ 6.5 segundos. Entonces Aproximadamente 3 veces más lento. Supongo que encontrar una buena cola sin bloqueo será el mayor cuello de botella de rendimiento aquí. Pero sigue siendo una gran diferencia.