2011-12-11 13 views

Respuesta

180
  • A Pipe() solo pueden tener dos puntos extremos.

  • A Queue() puede tener múltiples productores y consumidores.

cuándo usarlos

Si necesita más de dos puntos para comunicarse, utilizar un Queue().

Si necesita rendimiento absoluto, un Pipe() es mucho más rápido porque Queue() se construye en la parte superior de Pipe().

Estudio comparativo del rendimiento

Supongamos que desea generar dos procesos y enviar mensajes entre ellos lo más rápido posible. Estos son los resultados de sincronización de una carrera de arrastre entre pruebas similares usando Pipe() y Queue() ... Esto está en un ThinkpadT61 con Ubuntu 11.10 y Python 2.7.2.

FYI, arrojé los resultados para JoinableQueue() como una bonificación; JoinableQueue() se encarga de las tareas cuando se llama al queue.task_done() (ni siquiera conoce la tarea específica, solo cuenta tareas sin terminar en la cola), de modo que queue.join() sabe que el trabajo ha finalizado.

El código para cada al final de esta respuesta ...

[email protected]:~$ python multi_pipe.py 
Sending 10000 numbers to Pipe() took 0.0369849205017 seconds 
Sending 100000 numbers to Pipe() took 0.328398942947 seconds 
Sending 1000000 numbers to Pipe() took 3.17266988754 seconds 
[email protected]:~$ python multi_queue.py 
Sending 10000 numbers to Queue() took 0.105256080627 seconds 
Sending 100000 numbers to Queue() took 0.980564117432 seconds 
Sending 1000000 numbers to Queue() took 10.1611330509 seconds 
[email protected]:~$ python multi_joinablequeue.py 
Sending 10000 numbers to JoinableQueue() took 0.172781944275 seconds 
Sending 100000 numbers to JoinableQueue() took 1.5714070797 seconds 
Sending 1000000 numbers to JoinableQueue() took 15.8527247906 seconds 
[email protected]:~$ 

En resumen Pipe() es aproximadamente tres veces más rápido que un Queue(). Ni siquiera piense en el JoinableQueue() a menos que realmente deba tener los beneficios.

Material Adicional 2

multiprocesamiento introduce cambios sutiles en el flujo de información que hacen que la depuración duro a menos que sepa algunos atajos. Por ejemplo, es posible que tenga una secuencia de comandos que funcione bien al indexar a través de un diccionario en muchas condiciones, pero con poca frecuencia falla con ciertas entradas.

Normalmente obtenemos pistas sobre el fallo cuando todo el proceso de Python falla; sin embargo, no se registran los registros de fallos no solicitados en la consola si la función de multiprocesamiento falla. El seguimiento de bloqueos de multiprocesamiento desconocidos es difícil sin una pista de lo que colapsó el proceso.

La forma más sencilla que he encontrado para localizar a multiprocesamiento informaiton accidente es envolver toda la función de multiprocesamiento en un try/except y utilizar traceback.print_exc():

import traceback 
def reader(args): 
    try: 
     # Insert stuff to be multiprocessed here 
     return args[0]['that'] 
    except: 
     print "FATAL: reader({0}) exited while multiprocessing".format(args) 
     traceback.print_exc() 

Ahora, cuando se encuentra un accidente que ver algo así como : Código

FATAL: reader([{'crash', 'this'}]) exited while multiprocessing 
Traceback (most recent call last): 
    File "foo.py", line 19, in __init__ 
    self.run(task_q, result_q) 
    File "foo.py", line 46, in run 
    raise ValueError 
ValueError 

Fuente:


""" 
multi_pipe.py 
""" 
from multiprocessing import Process, Pipe 
import time 

def reader(pipe): 
    output_p, input_p = pipe 
    input_p.close() # We are only reading 
    while True: 
     try: 
      msg = output_p.recv() # Read from the output pipe and do nothing 
     except EOFError: 
      break 

def writer(count, input_p): 
    for ii in xrange(0, count): 
     input_p.send(ii)    # Write 'count' numbers into the input pipe 

if __name__=='__main__': 
    for count in [10**4, 10**5, 10**6]: 
     output_p, input_p = Pipe() 
     reader_p = Process(target=reader, args=((output_p, input_p),)) 
     reader_p.start()  # Launch the reader process 

     output_p.close()  # We no longer need this part of the Pipe() 
     _start = time.time() 
     writer(count, input_p) # Send a lot of stuff to reader() 
     input_p.close()  # Ask the reader to stop when it reads EOF 
     reader_p.join() 
     print "Sending %s numbers to Pipe() took %s seconds" % (count, 
      (time.time() - _start)) 

""" 
multi_queue.py 
""" 
from multiprocessing import Process, Queue 
import time 

def reader(queue): 
    while True: 
     msg = queue.get()   # Read from the queue and do nothing 
     if (msg == 'DONE'): 
      break 

def writer(count, queue): 
    for ii in xrange(0, count): 
     queue.put(ii)    # Write 'count' numbers into the queue 
    queue.put('DONE') 

if __name__=='__main__': 
    for count in [10**4, 10**5, 10**6]: 
     queue = Queue() # reader() reads from queue 
          # writer() writes to queue 
     reader_p = Process(target=reader, args=((queue),)) 
     reader_p.daemon = True 
     reader_p.start()  # Launch the reader process 

     _start = time.time() 
     writer(count, queue) # Send a lot of stuff to reader() 
     reader_p.join()   # Wait for the reader to finish 
     print "Sending %s numbers to Queue() took %s seconds" % (count, 
      (time.time() - _start)) 

""" 
multi_joinablequeue.py 
""" 
from multiprocessing import Process, JoinableQueue 
import time 

def reader(queue): 
    while True: 
     msg = queue.get()   # Read from the queue and do nothing 
     queue.task_done() 

def writer(count, queue): 
    for ii in xrange(0, count): 
     queue.put(ii)    # Write 'count' numbers into the queue 

if __name__=='__main__': 
    for count in [10**4, 10**5, 10**6]: 
     queue = JoinableQueue() # reader() reads from queue 
            # writer() writes to queue 
     reader_p = Process(target=reader, args=((queue),)) 
     reader_p.daemon = True 
     reader_p.start()  # Launch the reader process 

     _start = time.time() 
     writer(count, queue) # Send a lot of stuff to reader() 
     queue.join()   # Wait for the reader to finish 
     print "Sending %s numbers to JoinableQueue() took %s seconds" % (count, 
      (time.time() - _start)) 
+2

@ Jonathan "en la tubería de resumen() es aproximadamente tres veces más rápido que una Cola()" –

+0

Pero Pipe() no pueden ser utilizados sin problemas con múltiples productores/consumidores. –

+11

¡Excelente! ¡Buena respuesta y agradable que haya proporcionado puntos de referencia! Solo tengo dos pequeñas objeciones: (1) "órdenes de magnitud más rápidas" es un poco exagerado. La diferencia es x3, que es aproximadamente un tercio de un orden de magnitud. Solo digo. ;-); y (2) una comparación más justa estaría ejecutando N trabajadores, cada uno de los cuales se comunica con el hilo principal a través de un canal punto a punto en comparación con el rendimiento de los trabajadores en ejecución que extraen de una única cola punto a multipunto. – JJC

Cuestiones relacionadas