2012-04-05 16 views
5

Necesito pasar mensajes a los procesos CLI PHP a través de stdin desde Java. Me gustaría mantener alrededor de 20 procesos PHP ejecutándose en un grupo, de modo que cuando paso un mensaje al grupo, envíe cada mensaje a un hilo separado, manteniendo una cola de mensajes entregados. Me gustaría que estos procesos PHP permanezcan vivos el mayor tiempo posible, mencionando uno nuevo si uno muere. Consideré hacer esto con un grupo de subprocesos estáticos, pero parece más diseñado para tareas que se ejecutan y simplemente mueren. ¿Cómo podría hacer esto, con una interfaz simple para pasar un mensaje al grupo? ¿Tendré que implementar mi propio "grupo de hilos" personalizado?ThreadPool de procesos CLI

+0

Muy similar a esta pregunta: http://stackoverflow.com/questions/2592093/php-thread-pool –

+1

que hay alguna salida del PHP tal manera que se saber cuándo se procesa? – Clint

+0

Nunca se procesará. Si uno muere, necesito generar uno nuevo para reemplazarlo. Voy a pasar datos a ellos en forma de round robin a través de stdin. – Will

Respuesta

4

Proporciono un código con esto ya que creo que hará las cosas más claras. Básicamente, debe mantener un conjunto de objetos de proceso. Tenga en cuenta que cada uno de estos procesos tiene un flujo de entrada, salida y error que necesita gestionar de alguna manera. En mi ejemplo, simplemente redirijo el error y lo envío a la consola de procesos principal. Puede configurar callbacks y manejadores para obtener el resultado del programa PHP si es necesario. Si solo está procesando tareas y no le importa lo que diga PHP, déjelo como está o redirija a un archivo.

Estoy utilizando la biblioteca Apache Commons Pool para ObjectPool. No hay necesidad de reinventar uno.

Tendrás un conjunto de 20 procesos que ejecutarán tu programa PHP. Esto solo no te dará lo que necesitas. Es posible que desee procesar tareas en todos los 20 procesos "al mismo tiempo". Entonces también necesitarás un ThreadPool que extraerá un Proceso de tu ObjectPool.

También tendrá que entender que si matas, o CTRL-C su proceso de Java el proceso init se hará cargo de los procesos de php y que sólo se sentará allí. Es probable que desee mantener un registro de todos los pid de los procesos de PHP que genere, y luego limpiarlos si vuelve a ejecutar su programa Java.

public class StackOverflow_10037379 { 

    private static Logger sLogger = Logger.getLogger(StackOverflow_10037379.class.getName()); 

    public static class CLIPoolableObjectFactory extends BasePoolableObjectFactory<Process> { 

     private String mProcessToRun; 

     public CLIPoolableObjectFactory(String processToRun) { 
      mProcessToRun = processToRun; 
     } 

     @Override 
     public Process makeObject() throws Exception { 
      ProcessBuilder builder = new ProcessBuilder(); 
      builder.redirectError(Redirect.INHERIT); 
      // I am being lazy, but really the InputStream is where 
      // you can get any output of the PHP Process. This setting 
      // will make it output to the current processes console. 
      builder.redirectOutput(Redirect.INHERIT); 
      builder.redirectInput(Redirect.PIPE); 
      builder.command(mProcessToRun); 
      return builder.start(); 
     } 

     @Override 
     public boolean validateObject(Process process) { 
      try { 
       process.exitValue(); 
       return false; 
      } catch (IllegalThreadStateException ex) { 
       return true; 
      } 
     } 

     @Override 
     public void destroyObject(Process process) throws Exception { 
      // If PHP has a way to stop it, do that instead of destroy 
      process.destroy(); 
     } 

     @Override 
     public void passivateObject(Process process) throws Exception { 
      // Should really try to read from the InputStream of the Process 
      // to prevent lock-ups if Rediret.INHERIT is not used. 
     } 
    } 

    public static class CLIWorkItem implements Runnable { 

     private ObjectPool<Process> mPool; 
     private String mWork; 

     public CLIWorkItem(ObjectPool<Process> pool, String work) { 
      mPool = pool; 
      mWork = work; 
     } 

     @Override 
     public void run() { 
      Process workProcess = null; 
      try { 
       workProcess = mPool.borrowObject(); 
       OutputStream os = workProcess.getOutputStream(); 
       os.write(mWork.getBytes(Charset.forName("UTF-8"))); 
       os.flush(); 
       // Because of the INHERIT rule with the output stream 
       // the console stream overwrites itself. REMOVE THIS in production. 
       Thread.sleep(100); 
      } catch (Exception ex) { 
       sLogger.log(Level.SEVERE, null, ex); 
      } finally { 
       if (workProcess != null) { 
        try { 
         // Seriously.. so many exceptions. 
         mPool.returnObject(workProcess); 
        } catch (Exception ex) { 
         sLogger.log(Level.SEVERE, null, ex); 
        } 
       } 
      } 
     } 
    } 

    public static void main(String[] args) throws Exception { 

     // Change the 5 to 20 in your case. 
     // Also change mock_php.exe to /usr/bin/php or wherever. 
     ObjectPool<Process> pool = 
       new GenericObjectPool<>(
       new CLIPoolableObjectFactory("mock_php.exe"), 5);   

     // This will only allow you to queue 100 work items at a time. I would suspect 
     // that if you only want 20 PHP processes running at a time and this queue 
     // filled up you'll need to implement some other strategy as you are doing 
     // more work than PHP can keep up with. You'll need to block at some point 
     // or throw work away. 
     BlockingQueue<Runnable> queue = 
      new ArrayBlockingQueue<>(100, true); 

     ThreadPoolExecutor executor = 
      new ThreadPoolExecutor(20, 20, 1, TimeUnit.HOURS, queue); 

     // print some stuff out. 
     executor.execute(new CLIWorkItem(pool, "Message 1\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 2\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 3\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 4\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 5\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 6\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 7\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 8\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 9\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 10\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 11\r\n")); 

     executor.shutdown(); 
     executor.awaitTermination(4000, TimeUnit.HOURS); 

     pool.close();   
    } 
} 

salida de Ejecutar programa:

12172 - Message 2 
10568 - Message 1 
4804 - Message 3 
11916 - Message 4 
11116 - Message 5 
12172 - Message 6 
4804 - Message 7 
10568 - Message 8 
11916 - Message 9 
11116 - Message 10 
12172 - Message 11 

Código de programa en C++ que acaba de salida de lo que era de entrada:

#include <windows.h> 
#include <iostream> 
#include <string> 

int main(int argc, char* argv[]) 
{ 
    DWORD pid = GetCurrentProcessId(); 
    std::string line; 
    while (true) {  
     std::getline (std::cin, line); 
     std::cout << pid << " - " << line << std::endl; 
    } 

    return 0; 
} 

actualización

Lo siento por el retraso. Aquí hay una versión JDK 6 para cualquier persona interesada. Tendrá que ejecutar un hilo por separado para leer toda la entrada del InputStream del proceso. Configuré este código para generar un nuevo hilo al lado de cada nuevo proceso. Ese hilo siempre lee del proceso mientras está vivo. En lugar de enviar directamente a un archivo, lo configuré de manera que utiliza el marco de trabajo de registro. De esta forma, puede configurar una configuración de registro para ir a un archivo, transferir, ir a la consola, etc., sin que esté codificado para ir a un archivo.

Notarás que solo empiezo un único Gobbler para cada proceso aunque un Proceso tenga stdout y stderr. Redirijo stderr a stdout solo para facilitar las cosas. Aparentemente jdk6 solo es compatible con este tipo de redireccionamientos.

public class StackOverflow_10037379_jdk6 { 

    private static Logger sLogger = Logger.getLogger(StackOverflow_10037379_jdk6.class.getName()); 

    // Shamelessy taken from Google and modified. 
    // I don't know who the original Author is. 
    public static class StreamGobbler extends Thread { 

     InputStream is; 
     Logger logger; 
     Level level; 

     StreamGobbler(String logName, Level level, InputStream is) { 
      this.is = is; 
      this.logger = Logger.getLogger(logName); 
      this.level = level; 
     } 

     public void run() { 
      try { 
       InputStreamReader isr = new InputStreamReader(is); 
       BufferedReader br = new BufferedReader(isr); 
       String line = null; 
       while ((line = br.readLine()) != null) { 
        logger.log(level, line); 
       } 
      } catch (IOException ex) { 
       logger.log(Level.SEVERE, "Failed to read from Process.", ex); 
      } 
      logger.log(
        Level.INFO, 
        String.format("Exiting Gobbler for %s.", logger.getName())); 
     } 
    } 

    public static class CLIPoolableObjectFactory extends BasePoolableObjectFactory<Process> { 

     private String mProcessToRun; 

     public CLIPoolableObjectFactory(String processToRun) { 
      mProcessToRun = processToRun; 
     } 

     @Override 
     public Process makeObject() throws Exception { 
      ProcessBuilder builder = new ProcessBuilder(); 
      builder.redirectErrorStream(true); 
      builder.command(mProcessToRun); 
      Process process = builder.start(); 
      StreamGobbler loggingGobbler = 
        new StreamGobbler(
        String.format("process.%s", process.hashCode()), 
        Level.INFO, 
        process.getInputStream()); 
      loggingGobbler.start(); 
      return process; 
     } 

     @Override 
     public boolean validateObject(Process process) { 
      try { 
       process.exitValue(); 
       return false; 
      } catch (IllegalThreadStateException ex) { 
       return true; 
      } 
     } 

     @Override 
     public void destroyObject(Process process) throws Exception { 
      // If PHP has a way to stop it, do that instead of destroy 
      process.destroy(); 
     } 

     @Override 
     public void passivateObject(Process process) throws Exception { 
      // Should really try to read from the InputStream of the Process 
      // to prevent lock-ups if Rediret.INHERIT is not used. 
     } 
    } 

    public static class CLIWorkItem implements Runnable { 

     private ObjectPool<Process> mPool; 
     private String mWork; 

     public CLIWorkItem(ObjectPool<Process> pool, String work) { 
      mPool = pool; 
      mWork = work; 
     } 

     @Override 
     public void run() { 
      Process workProcess = null; 
      try { 
       workProcess = mPool.borrowObject(); 
       OutputStream os = workProcess.getOutputStream(); 
       os.write(mWork.getBytes(Charset.forName("UTF-8"))); 
       os.flush(); 
       // Because of the INHERIT rule with the output stream 
       // the console stream overwrites itself. REMOVE THIS in production. 
       Thread.sleep(100); 
      } catch (Exception ex) { 
       sLogger.log(Level.SEVERE, null, ex); 
      } finally { 
       if (workProcess != null) { 
        try { 
         // Seriously.. so many exceptions. 
         mPool.returnObject(workProcess); 
        } catch (Exception ex) { 
         sLogger.log(Level.SEVERE, null, ex); 
        } 
       } 
      } 
     } 
    } 

    public static void main(String[] args) throws Exception { 

     // Change the 5 to 20 in your case. 
     ObjectPool<Process> pool = 
       new GenericObjectPool<Process>(
       new CLIPoolableObjectFactory("mock_php.exe"), 5); 

     BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(100, true); 

     ThreadPoolExecutor executor = new ThreadPoolExecutor(20, 20, 1, TimeUnit.HOURS, queue); 

     // print some stuff out. 
     executor.execute(new CLIWorkItem(pool, "Message 1\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 2\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 3\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 4\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 5\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 6\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 7\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 8\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 9\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 10\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 11\r\n")); 

     executor.shutdown(); 
     executor.awaitTermination(4000, TimeUnit.HOURS); 

     pool.close(); 
    } 
} 

salida

Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: 9440 - Message 3 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: 8776 - Message 2 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: 6100 - Message 1 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: 10096 - Message 4 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: 8868 - Message 5 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: 8868 - Message 8 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: 6100 - Message 10 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: 8776 - Message 9 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: 10096 - Message 6 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: 9440 - Message 7 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: 6100 - Message 11 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: Exiting Gobbler for process.295131993. 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: Exiting Gobbler for process.756434719. 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: Exiting Gobbler for process.332711452. 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: Exiting Gobbler for process.1981440623. 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: Exiting Gobbler for process.1043636732. 
+0

¡Guau, gracias por la respuesta completa! Hacer una implementación de prueba basada en esto ahora. Realmente lo aprecio. – Will

+0

Así que estoy en Java6 y no tengo Redireccionamiento. ¿Cómo puedo evitar que el stdout/stderr del proceso se bloquee? En mi caso de uso normal, deseo escribir en un proceso y redireccionar stdout/stderr a archivos de registro separados (sin bloqueo). – Will

+1

@Will actualizado con una versión jdk6. –

1

Su mejor opción es utilizar las funciones pcntl para bifurcar un proceso, pero la comunicación entre procesos es difícil. Recomendaría crear una cola de la que tus procesos puedan leer, en lugar de intentar pasar mensajes a la línea de comando.

Beanstalk tiene varios clientes de PHP que puede utilizar para manejar los mensajes entre procesos.

+0

Lo siento, tal vez mi pregunta no estaba clara: se editará. Esta es una pregunta de Java. Quiero un conjunto de temas java de procesos cli de larga ejecución (/ usr/bin/php en este caso). Necesito poder enviar algo al grupo, que luego se escribirá en stdin en uno de los procesos de CLI. – Will