2010-07-27 7 views
12

Este es el código para crear un thread_group y ejecutar todas las discusiones en paralelo:Cómo hacer impulso :: thread_group ejecutar un número fijo de hilos paralelos

boost::thread_group group; 
for (int i = 0; i < 15; ++i) 
    group.create_thread(aFunctionToExecute); 
group.join_all(); 

Este código se ejecutará en todos los temas a la vez. Lo que quiero hacer es ejecutarlos todos menos 4 como máximo en paralelo. Cuando se termina, se ejecuta otro hasta que no haya más que ejecutar.

Respuesta

3

Otra solución más eficiente sería tener cada devolución de llamada de subproceso al subproceso primario cuando hayan terminado, y el controlador en el subproceso primario podría iniciar un nuevo subproceso cada vez. Esto evita las llamadas repetitivas a timed_join, ya que el hilo primario no hará nada hasta que se active la devolución de llamada.

+1

Finalmente tengo algo como esto: Tengo un threadpool en el que registro todos los trabajos. Luego, creo los n hilos y paso como argumento a cada hilo el threadpool. Cada hilo verifica si quedan trabajos. Si es así, solo consigue un trabajo para ejecutar. De lo contrario, el hilo termina. De esta forma, solo creamos n subprocesos y no un subproceso por trabajo (un trabajo finaliza, se crea un nuevo subproceso). –

0

que tienen algo como esto:

boost::mutex mutex_; 
    boost::condition_variable condition_; 
    const size_t throttle_; 
    size_t size_; 
    bool wait_; 
    template <typename Env, class F> 
    void eval_(const Env &env, const F &f) { 
     { 
      boost::unique_lock<boost::mutex> lock(mutex_); 
      size_ = std::min(size_+1, throttle_); 
      while (throttle_ <= size_) condition_.wait(lock); 
     } 
     f.eval(env); 
     { 
      boost::lock_guard<boost::mutex> lock(mutex_); 
      --size_; 
     } 
     condition_.notify_one(); 
    } 
0

Creo que busca una aplicación thread_pool, que está disponible here.

Además, he notado que si crea un vector de std :: future y almacena futuros de muchas std :: async_tasks en él y no tiene ningún código de bloqueo en la función que se pasa a la cadena, VS2013 (al menos de lo que puedo confirmar) lanzará exactamente el número apropiado de hilos que su máquina puede manejar. Reutiliza los hilos una vez creados.

0

he creado mi propia interfaz simplificada de boost::thread_group para hacer este trabajo:

class ThreadGroup : public boost::noncopyable 
{ 
    private: 
     boost::thread_group  group; 
     std::size_t    maxSize; 
     float      sleepStart; 
     float      sleepCoef; 
     float      sleepMax; 
     std::set<boost::thread*> running; 

    public: 
     ThreadGroup(std::size_t max_size = 0, 
        float max_sleeping_time = 1.0f, 
        float sleeping_time_coef = 1.5f, 
        float sleeping_time_start = 0.001f) : 
      boost::noncopyable(), 
      group(), 
      maxSize(max_size), 
      sleepStart(sleeping_time_start), 
      sleepCoef(sleeping_time_coef), 
      sleepMax(max_sleeping_time), 
      running() 
     { 
      if(max_size == 0) 
       this->maxSize = (std::size_t)std::max(boost::thread::hardware_concurrency(), 1u); 
      assert(max_sleeping_time >= sleeping_time_start); 
      assert(sleeping_time_start > 0.0f); 
      assert(sleeping_time_coef > 1.0f); 
     } 

     ~ThreadGroup() 
     { 
      this->joinAll(); 
     } 

     template<typename F> boost::thread* createThread(F f) 
     { 
      float sleeping_time = this->sleepStart; 
      while(this->running.size() >= this->maxSize) 
      { 
       for(std::set<boost::thread*>::iterator it = running.begin(); it != running.end();) 
       { 
        const std::set<boost::thread*>::iterator jt = it++; 
        if((*jt)->timed_join(boost::posix_time::milliseconds((long int)(1000.0f * sleeping_time)))) 
         running.erase(jt); 
       } 
       if(sleeping_time < this->sleepMax) 
       { 
        sleeping_time *= this->sleepCoef; 
        if(sleeping_time > this->sleepMax) 
         sleeping_time = this->sleepMax; 
       } 
      } 
      return *this->running.insert(this->group.create_thread(f)).first; 
     } 

     void joinAll() 
     { 
      this->group.join_all(); 
     } 

     void interruptAll() 
     { 
#ifdef BOOST_THREAD_PROVIDES_INTERRUPTIONS 
      this->group.interrupt_all(); 
#endif 
     } 

     std::size_t size() const 
     { 
      return this->group.size(); 
     } 
    }; 

Aquí se muestra un ejemplo de uso, muy similar a boost::thread_group con la principal diferencia de que la creación de la rosca es un punto de espera:

{ 
    ThreadGroup group(4); 
    for(int i = 0; i < 15; ++i) 
    group.createThread(aFunctionToExecute); 
} // join all at destruction 
Cuestiones relacionadas