2012-06-20 12 views
5

Voy a implementar el servidor boost :: asio con un grupo de subprocesos con el único io_service (HTTP Server 3 example). io_service se vinculará al socket de dominio de Unix y pasará solicitudes desde conexiones en este socket a diferentes hilos. Para reducir el consumo de recursos, quiero hacer que el grupo de subprocesos sea dinámico.Ejemplo de grupo de subprocesos dinámicos en boost :: asio

Aquí hay un concepto. En primer lugar, se crea un solo hilo. Cuando llega una solicitud y el servidor ve que no hay un subproceso inactivo en un grupo, crea un nuevo subproceso y le pasa la solicitud. El servidor puede crear hasta un número máximo de hilos. Idealmente, debería tener la funcionalidad de suspender hilos que están inactivos por algún tiempo.

¿Alguien hizo algo similar? O tal vez alguien tiene un ejemplo relevante?

En cuanto a mí, creo que de alguna manera debería anular io_service.dispatch para lograrlo.

Respuesta

5

Puede haber algunos desafíos con el planteamiento inicial:

  • boost::asio::io_service no pretende ser derivados de o reimplantado. Tenga en cuenta la falta de funciones virtuales.
  • Si su biblioteca de hilos no proporciona la capacidad de consultar el estado de un hilo, entonces la información de estado debe gestionarse por separado.

Una solución alternativa consiste en publicar un trabajo en el io_service, a continuación, comprobar el tiempo que se sentó en el io_service. Si el delta de tiempo entre cuando estaba listo para ejecutarse y cuándo se ejecutó realmente está por encima de un cierto umbral, esto indica que hay más trabajos en la cola que los hilos que dan servicio a la cola. Un beneficio importante de esto es que la lógica de crecimiento del grupo de subprocesos dinámico se desacopla de otra lógica.

Aquí hay un ejemplo que logra esto usando el deadline_timer.

  • Establecer deadline_timer expirar 3 segundos desde ahora.
  • Espera asincrónicamente en el deadline_timer. El controlador estará listo para ejecutarse 3 segundos desde que se configuró el deadline_timer.
  • En el controlador asíncrono, verifique la hora actual relativa a cuándo se programó que el temporizador expire. Si es mayor que 2 segundos, la cola io_service realiza una copia de seguridad, por lo tanto, agregue un hilo al grupo de subprocesos.

Ejemplo:

#include <boost/asio.hpp> 
#include <boost/bind.hpp> 
#include <boost/thread.hpp> 
#include <iostream> 

class thread_pool_checker 
    : private boost::noncopyable 
{ 
public: 

    thread_pool_checker(boost::asio::io_service& io_service, 
         boost::thread_group& threads, 
         unsigned int max_threads, 
         long threshold_seconds, 
         long periodic_seconds) 
    : io_service_(io_service), 
     timer_(io_service), 
     threads_(threads), 
     max_threads_(max_threads), 
     threshold_seconds_(threshold_seconds), 
     periodic_seconds_(periodic_seconds) 
    { 
     schedule_check(); 
    } 

private: 

    void schedule_check(); 
    void on_check(const boost::system::error_code& error); 

private: 

    boost::asio::io_service& io_service_; 
    boost::asio::deadline_timer timer_; 
    boost::thread_group&  threads_; 
    unsigned int    max_threads_; 
    long      threshold_seconds_; 
    long      periodic_seconds_; 
}; 

void thread_pool_checker::schedule_check() 
{ 
    // Thread pool is already at max size. 
    if (max_threads_ <= threads_.size()) 
    { 
    std::cout << "Thread pool has reached its max. Example will shutdown." 
       << std::endl; 
    io_service_.stop(); 
    return; 
    } 

    // Schedule check to see if pool needs to increase. 
    std::cout << "Will check if pool needs to increase in " 
      << periodic_seconds_ << " seconds." << std::endl; 
    timer_.expires_from_now(boost::posix_time::seconds(periodic_seconds_)); 
    timer_.async_wait( 
    boost::bind(&thread_pool_checker::on_check, this, 
       boost::asio::placeholders::error)); 
} 

void thread_pool_checker::on_check(const boost::system::error_code& error) 
{ 
    // On error, return early. 
    if (error) return; 

    // Check how long this job was waiting in the service queue. This 
    // returns the expiration time relative to now. Thus, if it expired 
    // 7 seconds ago, then the delta time is -7 seconds. 
    boost::posix_time::time_duration delta = timer_.expires_from_now(); 
    long wait_in_seconds = -delta.seconds(); 

    // If the time delta is greater than the threshold, then the job 
    // remained in the service queue for too long, so increase the 
    // thread pool. 
    std::cout << "Job job sat in queue for " 
      << wait_in_seconds << " seconds." << std::endl; 
    if (threshold_seconds_ < wait_in_seconds) 
    { 
    std::cout << "Increasing thread pool." << std::endl; 
    threads_.create_thread(
     boost::bind(&boost::asio::io_service::run, 
        &io_service_)); 
    } 

    // Otherwise, schedule another pool check. 
    run(); 
} 

// Busy work functions. 
void busy_work(boost::asio::io_service&, 
       unsigned int); 

void add_busy_work(boost::asio::io_service& io_service, 
        unsigned int count) 
{ 
    io_service.post(
    boost::bind(busy_work, 
       boost::ref(io_service), 
       count)); 
} 

void busy_work(boost::asio::io_service& io_service, 
       unsigned int count) 
{ 
    boost::this_thread::sleep(boost::posix_time::seconds(5)); 

    count += 1; 

    // When the count is 3, spawn additional busy work. 
    if (3 == count) 
    { 
    add_busy_work(io_service, 0); 
    } 
    add_busy_work(io_service, count); 
} 

int main() 
{ 
    using boost::asio::ip::tcp; 

    // Create io service. 
    boost::asio::io_service io_service; 

    // Add some busy work to the service. 
    add_busy_work(io_service, 0); 

    // Create thread group and thread_pool_checker. 
    boost::thread_group threads; 
    thread_pool_checker checker(io_service, threads, 
           3, // Max pool size. 
           2, // Create thread if job waits for 2 sec. 
           3); // Check if pool needs to grow every 3 sec. 

    // Start running the io service. 
    io_service.run(); 

    threads.join_all(); 

    return 0; 
} 

Salida:

Will check if pool needs to increase in 3 seconds. 
Job job sat in queue for 7 seconds. 
Increasing thread pool. 
Will check if pool needs to increase in 3 seconds. 
Job job sat in queue for 0 seconds. 
Will check if pool needs to increase in 3 seconds. 
Job job sat in queue for 4 seconds. 
Increasing thread pool. 
Will check if pool needs to increase in 3 seconds. 
Job job sat in queue for 0 seconds. 
Will check if pool needs to increase in 3 seconds. 
Job job sat in queue for 0 seconds. 
Will check if pool needs to increase in 3 seconds. 
Job job sat in queue for 0 seconds. 
Will check if pool needs to increase in 3 seconds. 
Job job sat in queue for 3 seconds. 
Increasing thread pool. 
Thread pool has reached its max. Example will shutdown.
+1

Si he entendido bien, las tareas busy_work puede esperar en una cola para el segundo, así como corrector de la piscina, aunque el máximo número de hilos no era alcanzado todavía porque los nuevos hilos no se crean antes de tiempo. Eso hace que este principio apenas se pueda usar porque la característica de ser dinámico no debería degradar tanto el rendimiento. Debería hacer que la ejecución de la tarea sea más larga solo por el tiempo necesario para una nueva creación de subprocesos en comparación con el tiempo necesario con el grupo estático. Gracias de todos modos. – boqapt

+0

@ user484936: su comprensión es correcta.El crecimiento de la piscina ocurre después de que se ha detectado la degradación; es uno de los enfoques más simples para agrupar y no debe 'degradar el rendimiento'. Si desea asignar subprocesos _cuando sabe_ que son necesarios, entonces el estado del subproceso debe ser gestionado, introduciendo una sobrecarga para todos los subprocesos, y puede requerir que la lógica de estado se disemine por todo el código. Si desea asignar hilos _como predice_ que serán necesarios, entonces tenga un hilo dedicado que publique un trabajo en el servicio, luego haga una espera cronometrada para recibir una respuesta. –

+0

Me pregunto qué sucede en un escenario en el que solo se ejecuta una tarea de larga ejecución y agregamos innecesariamente un hilo al grupo cuando nuestro temporizador se dispara. Si en realidad no hay más eventos que procesar en ese momento, este enfoque parece ser ineficiente para mí. Por favor, corríjame si estoy equivocado. – russoue