2012-07-26 15 views
9

Estoy usando boost::asio::io_service como un grupo de subprocesos básicos. Algunos subprocesos se agregan a io_service, el subproceso principal comienza a publicar controladores, los subprocesos de trabajo comienzan a ejecutar los controladores y todo termina. Hasta aquí todo bien; Obtengo una buena aceleración sobre el código de un solo subproceso.¿Establecer límite en el tamaño de la cola de publicaciones con Boost Asio?

Sin embargo, el hilo principal tiene millones de elementos para publicar. Y simplemente los sigue publicando, mucho más rápido de lo que los hilos de trabajo pueden manejarlos. No alcanzo los límites de RAM, pero sigue siendo una tontería poner en cola tantas cosas. Lo que me gustaría hacer es tener un tamaño fijo para la cola del controlador y tener un bloque post() si la cola está llena.

No veo ninguna opción para esto en los documentos de Boost ASIO. es posible?

Respuesta

0

podrías usar el objeto strand para poner los eventos y poner un retraso en tu main? ¿Su programa se cierra después de que se publique todo el trabajo? Si es así, puede usar el objeto de trabajo que le dará más control sobre cuándo se detiene su io_service.

siempre se puede controlar el estado de los hilos y esperar hasta que uno se vuelva libre o algo así.

// enlaces

http://www.boost.org/doc/libs/1_40_0/doc/html/boost_asio/reference/io_service__strand.html

http://www.boost.org/doc/libs/1_40_0/doc/html/boost_asio/reference/io_service.html

//example from the second link 
boost::asio::io_service io_service; 
boost::asio::io_service::work work(io_service); 

esperanza esto ayuda.

+0

El problema no es que el 'io_service 'se detenga antes de completar el trabajo --- sabemos sobre la eliminación del objeto' work' para hacer que el 'io_service' se detenga con gracia. El problema es que el 'io_service' permite que se acumulen demasiadas tareas. Nos gustaría limitar el número de tareas sin asignar de una manera que no implique un sondeo por parte del hilo que crea las tareas, de ahí nuestra pregunta sobre si 'poll()' puede hacerse para bloquear. – uckelman

2

Estoy usando el semáforo para arreglar el tamaño de la cola de los controladores. El código siguiente ilustra esta solución:

void Schedule(boost::function<void()> function) 
{ 
    semaphore.wait(); 
    io_service.post(boost::bind(&TaskWrapper, function)); 
} 

void TaskWrapper(boost::function<void()> &function) 
{ 
    function(); 
    semaphore.post(); 
} 
1

Usted puede envolver su lambda en otro lambda que cuidar de contar las tareas "en curso", y luego esperar antes de publicar si hay demasiadas tareas en curso .

Ejemplo:

#include <atomic> 
#include <chrono> 
#include <future> 
#include <iostream> 
#include <mutex> 
#include <thread> 
#include <vector> 
#include <boost/asio.hpp> 

class ThreadPool { 
    using asio_worker = std::unique_ptr<boost::asio::io_service::work>; 
    boost::asio::io_service service; 
    asio_worker service_worker; 
    std::vector<std::thread> grp; 
    std::atomic<int> inProgress = 0; 
    std::mutex mtx; 
    std::condition_variable busy; 
public: 
    ThreadPool(int threads) : service(), service_worker(new asio_worker::element_type(service)) { 
    for (int i = 0; i < threads; ++i) { 
     grp.emplace_back([this] { service.run(); }); 
    } 
    } 

    template<typename F> 
    void enqueue(F && f) { 
    std::unique_lock<std::mutex> lock(mtx); 
    // limit queue depth = number of threads 
    while (inProgress >= grp.size()) { 
     busy.wait(lock); 
    } 
    inProgress++; 
    service.post([this, f = std::forward<F>(f)]{ 
     try { 
     f(); 
     } 
     catch (...) { 
     inProgress--; 
     busy.notify_one(); 
     throw; 
     } 
     inProgress--; 
     busy.notify_one(); 
    }); 
    } 

    ~ThreadPool() { 
    service_worker.reset(); 
    for (auto& t : grp) 
     if (t.joinable()) 
     t.join(); 
    service.stop(); 
    } 
}; 

int main() { 
    std::unique_ptr<ThreadPool> pool(new ThreadPool(4)); 
    for (int i = 1; i <= 20; ++i) { 
    pool->enqueue([i] { 
     std::string s("Hello from task "); 
     s += std::to_string(i) + "\n"; 
     std::cout << s; 
     std::this_thread::sleep_for(std::chrono::seconds(1)); 
    }); 
    } 
    std::cout << "All tasks queued.\n"; 
    pool.reset(); // wait for all tasks to complete 
    std::cout << "Done.\n"; 
} 

Salida:

Hello from task 3 
Hello from task 4 
Hello from task 2 
Hello from task 1 
Hello from task 5 
Hello from task 7 
Hello from task 6 
Hello from task 8 
Hello from task 9 
Hello from task 10 
Hello from task 11 
Hello from task 12 
Hello from task 13 
Hello from task 14 
Hello from task 15 
Hello from task 16 
Hello from task 17 
Hello from task 18 
All tasks queued. 
Hello from task 19 
Hello from task 20 
Done. 
0

Tal vez intente reducir la prioridad del hilo principal de manera que una vez que los subprocesos de trabajo se llene de gente que se muere de hambre el hilo principal y los límites auto sistema.

Cuestiones relacionadas