2009-04-15 11 views
14

Problema: deseo implementar varios procesos de php-worker que están escuchando en una cola de servidor MQ para trabajos asíncronos. El problema ahora es que simplemente ejecutar estos procesos como daemons en un servidor realmente no me da ningún nivel de control sobre las instancias (carga, estado, bloqueado) ... excepto tal vez para descargar ps -aux. Por eso estoy buscando un entorno de ejecución de algún tipo que me permita monitorear y controlar las instancias, ya sea en el nivel del sistema o en una capa superior (algún tipo de servidor de aplicaciones Java)PHP Daemon/entorno de trabajo

Cualquier punteros?

+0

Véase también: http://symfony.com/doc/master/components/process.html –

Respuesta

12

Aquí hay algunos códigos que pueden ser útiles.

<? 
define('WANT_PROCESSORS', 5); 
define('PROCESSOR_EXECUTABLE', '/path/to/your/processor'); 
set_time_limit(0); 
$cycles = 0; 
$run = true; 
$reload = false; 
declare(ticks = 30); 

function signal_handler($signal) { 
    switch($signal) { 
    case SIGTERM : 
     global $run; 
     $run = false; 
     break; 
    case SIGHUP : 
     global $reload; 
     $reload = true; 
     break; 
    } 
} 

pcntl_signal(SIGTERM, 'signal_handler'); 
pcntl_signal(SIGHUP, 'signal_handler'); 

function spawn_processor() { 
    $pid = pcntl_fork(); 
    if($pid) { 
     global $processors; 
     $processors[] = $pid; 
    } else { 
     if(posix_setsid() == -1) 
      die("Forked process could not detach from terminal\n"); 
     fclose(stdin); 
     fclose(stdout); 
     fclose(stderr); 
     pcntl_exec(PROCESSOR_EXECUTABLE); 
     die('Failed to fork ' . PROCESSOR_EXECUTABLE . "\n"); 
    } 
} 

function spawn_processors() { 
    global $processors; 
    if($processors) 
     kill_processors(); 
    $processors = array(); 
    for($ix = 0; $ix < WANT_PROCESSORS; $ix++) 
     spawn_processor(); 
} 

function kill_processors() { 
    global $processors; 
    foreach($processors as $processor) 
     posix_kill($processor, SIGTERM); 
    foreach($processors as $processor) 
     pcntl_waitpid($processor); 
    unset($processors); 
} 

function check_processors() { 
    global $processors; 
    $valid = array(); 
    foreach($processors as $processor) { 
     pcntl_waitpid($processor, $status, WNOHANG); 
     if(posix_getsid($processor)) 
      $valid[] = $processor; 
    } 
    $processors = $valid; 
    if(count($processors) > WANT_PROCESSORS) { 
     for($ix = count($processors) - 1; $ix >= WANT_PROCESSORS; $ix--) 
      posix_kill($processors[$ix], SIGTERM); 
     for($ix = count($processors) - 1; $ix >= WANT_PROCESSORS; $ix--) 
      pcntl_waitpid($processors[$ix]); 
    } elseif(count($processors) < WANT_PROCESSORS) { 
     for($ix = count($processors); $ix < WANT_PROCESSORS; $ix++) 
      spawn_processor(); 
    } 
} 

spawn_processors(); 

while($run) { 
    $cycles++; 
    if($reload) { 
     $reload = false; 
     kill_processors(); 
     spawn_processors(); 
    } else { 
     check_processors(); 
    } 
    usleep(150000); 
} 
kill_processors(); 
pcntl_wait(); 
?> 
+0

¿Dónde consiguió esto? ¿Proyecto de fuente abierta o tu propio código? ¿Alguna documentación o explicación de qué está pasando exactamente aquí? – leek

+0

Mi propio código. No estoy dispuesto a explicarlo, no. – chaos

+0

Además, https://github.com/kvz/system_daemon. – HyderA

1

¿Realmente necesita que se ejecute continuamente?

Si solo desea generar un nuevo proceso a pedido, puede registrarlo como un servicio en xinetd.

+0

El aspecto de desove no es un gran problema porque el número de trabajadores depende del rendimiento del sistema, que generalmente es constante. Más importante sería el aspecto de monitoreo del estado del trabajador individual (bloqueado, lo que sea). Una herramienta que acabo de descubrir para esto podría ser DJBs deamontools – Sebastian

+0

Esa es una opción. Para monitorear, también puede usar archivos flock() PID ed. Tras el bloqueo, se liberan todos los bloqueos. – vartec

4

Parece que ya tiene un MQ en funcionamiento en un sistema * nix y solo quiere una forma de administrar a los trabajadores.

Una forma muy simple de hacerlo es usar la pantalla GNU. Para empezar 10 trabajadores que puede utilizar:

#!/bin/sh 
for x in `seq 1 10` ; do 
screen -dmS worker_$x php /path/to/script.php worker$x 
end 

Esto iniciará 10 trabajadores en el fondo utilizando pantallas nombrados worker_1,2,3 y así sucesivamente.

Puede volver a conectar a las pantallas ejecutando la pantalla -r worker_ y enumerar los trabajadores en ejecución utilizando la lista de pantallas.

Para obtener más información sobre esta guía puede ser de ayuda: http://www.kuro5hin.org/story/2004/3/9/16838/14935

Proveedores:

  • pantalla --help
  • pantalla hombre
  • o google.

Para servidores de producción, normalmente recomendaría utilizar las secuencias de comandos de inicio normales del sistema, pero he estado ejecutando comandos de pantalla desde las secuencias de comandos de inicio durante años sin problemas.

0

Bellow es nuestra implementación funcional de @chaos respuesta. Se eliminó el código para manejar las señales, ya que este script vive habitualmente a solo unos pocos milisegundos.

Además, en el código agregamos 2 funciones para guardar pids entre llamadas: restore_processors_state() y save_processors_state(). Hemos utilizado redis allí, pero puede decidir utilizar la implementación en los archivos.

Ejecutamos este script cada minuto usando cron. Cron comprueba si todos los procesos están vivos. Si no, los vuelve a ejecutar y luego muere. Si queremos eliminar los procesos existentes, simplemente ejecutamos este script con el argumento kill: php script.php kill.

Manera muy útil de ejecutar los trabajadores sin inyectar scripts en init.d.

<?php 

include_once dirname(__FILE__) . '/path/to/bootstrap.php'; 

define('WANT_PROCESSORS', 5); 
define('PROCESSOR_EXECUTABLE', '' . dirname(__FILE__) . '/path/to/worker.php'); 
set_time_limit(0); 

$run = true; 
$reload = false; 
declare(ticks = 30); 

function restore_processors_state() 
{ 
    global $processors; 

    $redis = Zend_Registry::get('redis'); 
    $pids = $redis->hget('worker_procs', 'pids'); 

    if(!$pids) 
    { 
     $processors = array(); 
    } 
    else 
    { 
     $processors = json_decode($pids, true); 
    } 
} 

function save_processors_state() 
{ 
    global $processors; 

    $redis = Zend_Registry::get('redis'); 
    $redis->hset('worker_procs', 'pids', json_encode($processors)); 
} 

function spawn_processor() { 
    $pid = pcntl_fork(); 
    if($pid) { 
     global $processors; 
     $processors[] = $pid; 
    } else { 
     if(posix_setsid() == -1) 
      die("Forked process could not detach from terminal\n"); 
     fclose(STDIN); 
     fclose(STDOUT); 
     fclose(STDERR); 
     pcntl_exec('/usr/bin/php', array(PROCESSOR_EXECUTABLE)); 
     die('Failed to fork ' . PROCESSOR_EXECUTABLE . "\n"); 
    } 
} 

function spawn_processors() { 
    restore_processors_state(); 

    check_processors(); 

    save_processors_state(); 
} 

function kill_processors() { 
    global $processors; 
    foreach($processors as $processor) 
     posix_kill($processor, SIGTERM); 
    foreach($processors as $processor) 
     pcntl_waitpid($processor, $trash); 
    unset($processors); 
} 

function check_processors() { 
    global $processors; 
    $valid = array(); 
    foreach($processors as $processor) { 
     pcntl_waitpid($processor, $status, WNOHANG); 
     if(posix_getsid($processor)) 
      $valid[] = $processor; 
    } 
    $processors = $valid; 
    if(count($processors) > WANT_PROCESSORS) { 
     for($ix = count($processors) - 1; $ix >= WANT_PROCESSORS; $ix--) 
      posix_kill($processors[$ix], SIGTERM); 
     for($ix = count($processors) - 1; $ix >= WANT_PROCESSORS; $ix--) 
      pcntl_waitpid($processors[$ix], $trash); 
    } 
    elseif(count($processors) < WANT_PROCESSORS) { 
     for($ix = count($processors); $ix < WANT_PROCESSORS; $ix++) 
      spawn_processor(); 
    } 
} 

if(isset($argv) && count($argv) > 1) { 
    if($argv[1] == 'kill') { 
     restore_processors_state(); 
     kill_processors(); 
     save_processors_state(); 

     exit(0); 
    } 
} 

spawn_processors(); 
Cuestiones relacionadas