2009-05-27 60 views
11

Estoy escribiendo un programa de servidor con un productor y múltiples consumidores, lo que me confunde es solo el primer productor de tarea puesto en la cola consumido, después de lo cual las tareas en cola ya no se consumen, permanecen en la cola para siempre.problema productor/consumidor con multiproceso de python

from multiprocessing import Process, Queue, cpu_count 
from http import httpserv 
import time 

def work(queue): 
    while True: 
     task = queue.get() 
     if task is None: 
      break 
     time.sleep(5) 
     print "task done:", task 
    queue.put(None) 

class Manager: 
    def __init__(self): 
     self.queue = Queue() 
     self.NUMBER_OF_PROCESSES = cpu_count() 

    def start(self): 
     self.workers = [Process(target=work, args=(self.queue,)) 
         for i in xrange(self.NUMBER_OF_PROCESSES)] 
     for w in self.workers: 
      w.start() 

     httpserv(self.queue) 

    def stop(self): 
     self.queue.put(None) 
     for i in range(self.NUMBER_OF_PROCESSES): 
      self.workers[i].join() 
     queue.close() 

Manager().start() 

El productor es un servidor HTTP que puso una tarea en la cola recibe una vez una petición del usuario. Parece que los procesos de los consumidores todavía están bloqueados cuando hay nuevas tareas en la cola, lo que es extraño.

P.S. Otras dos preguntas que no están relacionadas con lo anterior, no estoy seguro de si es mejor poner el servidor HTTP en su propio proceso que no sea el proceso , si es así, ¿cómo puedo hacer que el proceso principal siga funcionando antes de que todos los procesos secundarios finalicen? Segunda pregunta, ¿cuál es la mejor manera de detener el servidor HTTP con gracia?

Editar: agregar código del productor, es sólo un simple servidor de pitón wsgi:

import fapws._evwsgi as evwsgi 
from fapws import base 

def httpserv(queue): 
    evwsgi.start("0.0.0.0", 8080) 
    evwsgi.set_base_module(base) 

    def request_1(environ, start_response): 
     start_response('200 OK', [('Content-Type','text/html')]) 
     queue.put('task_1') 
     return ["request 1!"] 

    def request_2(environ, start_response): 
     start_response('200 OK', [('Content-Type','text/html')]) 
     queue.put('task_2') 
     return ["request 2!!"] 

    evwsgi.wsgi_cb(("/request_1", request_1)) 
    evwsgi.wsgi_cb(("/request_2", request_2)) 

    evwsgi.run() 

Respuesta

7

Creo que debe haber algo mal con la parte del servidor web, ya que esto funciona a la perfección:

from multiprocessing import Process, Queue, cpu_count 
import random 
import time 


def serve(queue): 
    works = ["task_1", "task_2"] 
    while True: 
     time.sleep(0.01) 
     queue.put(random.choice(works)) 


def work(id, queue): 
    while True: 
     task = queue.get() 
     if task is None: 
      break 
     time.sleep(0.05) 
     print "%d task:" % id, task 
    queue.put(None) 


class Manager: 
    def __init__(self): 
     self.queue = Queue() 
     self.NUMBER_OF_PROCESSES = cpu_count() 

    def start(self): 
     print "starting %d workers" % self.NUMBER_OF_PROCESSES 
     self.workers = [Process(target=work, args=(i, self.queue,)) 
         for i in xrange(self.NUMBER_OF_PROCESSES)] 
     for w in self.workers: 
      w.start() 

     serve(self.queue) 

    def stop(self): 
     self.queue.put(None) 
     for i in range(self.NUMBER_OF_PROCESS): 
      self.workers[i].join() 
     queue.close() 


Manager().start() 
salida

muestra:

starting 2 workers 
0 task: task_1 
1 task: task_2 
0 task: task_2 
1 task: task_1 
0 task: task_1 
+0

impresionante mientras que si usted podría proporcionar un ejemplo productor + multi-trabajador. Sería bueno. –

4

"Segunda pregunta, ¿cuál es la mejor manera de detener el servidor HTTP correctamente?"

Esto es difícil.

usted tiene dos opciones para comunicación entre procesos:

  • controles fuera de banda. El servidor tiene otro mecanismo de comunicación. Otra toma, una señal Unix u otra cosa. La otra cosa podría ser un archivo "stop-now" en el directorio local del servidor. Parece extraño, pero funciona bien y es más simple que la introducción de un bucle de selección para escuchar múltiples tomas o un manejador de señal para capturar una señal Unis.

    El archivo "stop-now" es fácil de implementar. El bucle evwsgi.run() simplemente comprueba este archivo después de cada solicitud. Para que el servidor se detenga, cree el archivo, ejecute una solicitud /control (que obtendrá un error 500 o algo así, en realidad no importa) y el servidor debería detenerse. Recuerde eliminar el archivo stop-now; de lo contrario, su servidor no se reiniciará.

  • Controles en banda. El servidor tiene otra URL (/stop) que lo detendrá. Superficialmente, esto parece una pesadilla de seguridad, pero depende completamente de dónde y cómo se usará este servidor. Dado que parece ser un simple contenedor alrededor de una cola de solicitud interna, esta URL extra funciona bien.

    Para que esto funcione, debe escribir su propia versión de evwsgi.run() que puede finalizar configurando alguna variable de forma que salga del ciclo.

Editar

Es probable que no desea terminar su servidor, ya que no se conoce el estado de su subprocesos de trabajo. Necesita señalar el servidor y luego solo tiene que esperar hasta que termine las cosas normalmente.

Si quiere matar a la fuerza el servidor, entonces os.kill() (o multiprocessing.terminate) funcionará. Excepto, por supuesto, no sabes qué estaban haciendo los hilos del niño.

+0

¿Qué le parece poner el servidor en su propio proceso y usar el método multiprocessing.Process.terminate para finalizar el proceso? Esto parece más fácil. – btw0

Cuestiones relacionadas