6

Recientemente comencé a aprender 0MQ. Hoy temprano, me encontré con un blog, Python Multiprocessing with ZeroMQ. Hablaba de the ventilator pattern en la Guía de 0MQ que leí, así que decidí probarlo.¿Por qué este script Python 0MQ para computación distribuida se cuelga en un tamaño de entrada fijo?

En lugar de simplemente calcular los productos de los números de los trabajadores como lo hace el código original, decidí tratar de hacer que el ventilador envíe grandes matrices a los trabajadores a través de mensajes de 0mq. El siguiente es el código que he estado usando para mis "experimentos".

Como se señala en un comentario a continuación, cada vez que intenté aumentar la variable string_length a un número superior a 3MB, el código se bloquea.

síntoma típico: digamos que nos fijamos el string_length a 4 MB (es decir, 4194304), entonces tal vez el gerente resultado obtiene el resultado de un trabajador, y luego el código sólo se detiene. htop muestra los 2 núcleos que no hacen mucho. El monitor de tráfico de red Etherape tampoco muestra tráfico en la interfaz lo.

Hasta el momento, después de horas mirando a su alrededor, no he podido averiguar qué está causando esto, y agradecería una pista o dos de por qué y cualquier resolución sobre este tema. ¡Gracias!

Estoy ejecutando Ubuntu 11.04 64bit en un portátil Dell con CPU Intel Core debida, 8 GB de RAM, 80 GB Intel X25MG2 SSD, Python 2.7.1+, libzmq1 2.1.10-1chl1 ~ natty1, python-pyzmq 2.1.10- 1chl1 ~ natty1

import time 
import zmq 
from multiprocessing import Process, cpu_count 

np = cpu_count() 
pool_size = np 
number_of_elements = 128 
# Odd, why once the slen is bumped to 3MB or above, the code hangs? 
string_length = 1024 * 1024 * 3 

def create_inputs(nelem, slen, pb=True): 
    ''' 
    Generates an array that contains nelem fix-sized (of slen bytes) 
    random strings and an accompanying array of hexdigests of the 
    former's elements. Both are returned in a tuple. 

    :type nelem: int 
    :param nelem: The desired number of elements in the to be generated 
        array. 
    :type slen: int 
    :param slen: The desired number of bytes of each array element. 
    :type pb: bool 
    :param pb: If True, displays a text progress bar during input array 
       generation. 
    ''' 
    from os import urandom 
    import sys 
    import hashlib 

    if pb: 
     if nelem <= 64: 
      toolbar_width = nelem 
      chunk_size = 1 
     else: 
      toolbar_width = 64 
      chunk_size = nelem // toolbar_width 
     description = '%d random strings of %d bytes. ' % (nelem, slen) 
     s = ''.join(('Generating an array of ', description, '...\n')) 
     sys.stdout.write(s) 
     # create an ASCII progress bar 
     sys.stdout.write("[%s]" % (" " * toolbar_width)) 
     sys.stdout.flush() 
     sys.stdout.write("\b" * (toolbar_width+1)) 
    array = list() 
    hash4a = list() 
    try: 
     for i in range(nelem): 
      e = urandom(int(slen)) 
      array.append(e) 
      h = hashlib.md5() 
      h.update(e) 
      he = h.hexdigest() 
      hash4a.append(he) 
      i += 1 
      if pb and i and i % chunk_size == 0: 
       sys.stdout.write("-") 
       sys.stdout.flush() 
     if pb: 
      sys.stdout.write("\n") 
    except MemoryError: 
     print('Memory Error: discarding existing arrays') 
     array = list() 
     hash4a = list() 
    finally: 
     return array, hash4a 

# The "ventilator" function generates an array of nelem fix-sized (of slen 
# bytes long) random strings, and sends the array down a zeromq "PUSH" 
# connection to be processed by listening workers, in a round robin load 
# balanced fashion. 

def ventilator(): 
    # Initialize a zeromq context 
    context = zmq.Context() 

    # Set up a channel to send work 
    ventilator_send = context.socket(zmq.PUSH) 
    ventilator_send.bind("tcp://127.0.0.1:5557") 

    # Give everything a second to spin up and connect 
    time.sleep(1) 

    # Create the input array 
    nelem = number_of_elements 
    slen = string_length 
    payloads = create_inputs(nelem, slen) 

    # Send an array to each worker 
    for num in range(np): 
     work_message = { 'num' : payloads } 
     ventilator_send.send_pyobj(work_message) 

    time.sleep(1) 

# The "worker" functions listen on a zeromq PULL connection for "work" 
# (array to be processed) from the ventilator, get the length of the array 
# and send the results down another zeromq PUSH connection to the results 
# manager. 

def worker(wrk_num): 
    # Initialize a zeromq context 
    context = zmq.Context() 

    # Set up a channel to receive work from the ventilator 
    work_receiver = context.socket(zmq.PULL) 
    work_receiver.connect("tcp://127.0.0.1:5557") 

    # Set up a channel to send result of work to the results reporter 
    results_sender = context.socket(zmq.PUSH) 
    results_sender.connect("tcp://127.0.0.1:5558") 

    # Set up a channel to receive control messages over 
    control_receiver = context.socket(zmq.SUB) 
    control_receiver.connect("tcp://127.0.0.1:5559") 
    control_receiver.setsockopt(zmq.SUBSCRIBE, "") 

    # Set up a poller to multiplex the work receiver and control receiver channels 
    poller = zmq.Poller() 
    poller.register(work_receiver, zmq.POLLIN) 
    poller.register(control_receiver, zmq.POLLIN) 

    # Loop and accept messages from both channels, acting accordingly 
    while True: 
     socks = dict(poller.poll()) 

     # If the message came from work_receiver channel, get the length 
     # of the array and send the answer to the results reporter 
     if socks.get(work_receiver) == zmq.POLLIN: 
      #work_message = work_receiver.recv_json() 
      work_message = work_receiver.recv_pyobj() 
      length = len(work_message['num'][0]) 
      answer_message = { 'worker' : wrk_num, 'result' : length } 
      results_sender.send_json(answer_message) 

     # If the message came over the control channel, shut down the worker. 
     if socks.get(control_receiver) == zmq.POLLIN: 
      control_message = control_receiver.recv() 
      if control_message == "FINISHED": 
       print("Worker %i received FINSHED, quitting!" % wrk_num) 
       break 

# The "results_manager" function receives each result from multiple workers, 
# and prints those results. When all results have been received, it signals 
# the worker processes to shut down. 

def result_manager(): 
    # Initialize a zeromq context 
    context = zmq.Context() 

    # Set up a channel to receive results 
    results_receiver = context.socket(zmq.PULL) 
    results_receiver.bind("tcp://127.0.0.1:5558") 

    # Set up a channel to send control commands 
    control_sender = context.socket(zmq.PUB) 
    control_sender.bind("tcp://127.0.0.1:5559") 

    for task_nbr in range(np): 
     result_message = results_receiver.recv_json() 
     print "Worker %i answered: %i" % (result_message['worker'], result_message['result']) 

    # Signal to all workers that we are finsihed 
    control_sender.send("FINISHED") 
    time.sleep(5) 

if __name__ == "__main__": 

    # Create a pool of workers to distribute work to 
    for wrk_num in range(pool_size): 
     Process(target=worker, args=(wrk_num,)).start() 

    # Fire up our result manager... 
    result_manager = Process(target=result_manager, args=()) 
    result_manager.start() 

    # Start the ventilator! 
    ventilator = Process(target=ventilator, args=()) 
    ventilator.start() 
+0

hice más experimentos: bajó los number_of_elements a 64 y aumentó la string_length a 6. El código todavía funcionó muy bien. Por encima de eso, apareció el mismo síntoma. Esto me llevó a creer que podría haber un límite general de tamaño de mensaje en algún lugar de la vinculación de pyzmq. La API 0MQ C tiene esta función [link] (http://api.zeromq.org/2-1:zmq-msit-size) zmq_msg_init_size (3) que no puedo encontrar en la documentación de pyzmq. Podria ser esta la causa? – user183394

+0

¿Se puede obtener un rastreo donde se cuelga? Puede darte una pista. –

+0

Probé tu código en mi computadora portátil Mac con string_length = 1024 * 1024 * 4 y funcionó bien, así que supongo que debe tener algo que ver con algún tipo de conflicto de memoria. –

Respuesta

6

El problema es que el ventilador (PUSH) socket se cierra antes de que se hace el envío. Usted tiene un descanso de 1s al final de la función del ventilador, que no es suficiente para enviar mensajes de 384MB. Es por eso que tienes el umbral que tienes, si el sueño fue más corto, el umbral sería más bajo.

Dicho esto, LINGER es supuesto para evitar este tipo de cosas, por lo que me gustaría traer esto a colación con zeromq: PUSH no parece respetar a LINGER.

Una solución para su ejemplo en particular (sin agregar un sueño largo indeterminado) sería utilizar la misma señal de FINALIZACIÓN para terminar su ventilador como sus trabajadores. De esta forma, usted garantiza que su ventilador sobreviva el tiempo que lo necesite.

ventilador Revisado:

def ventilator(): 
    # Initialize a zeromq context 
    context = zmq.Context() 

    # Set up a channel to send work 
    ventilator_send = context.socket(zmq.PUSH) 
    ventilator_send.bind("tcp://127.0.0.1:5557") 

    # Set up a channel to receive control messages 
    control_receiver = context.socket(zmq.SUB) 
    control_receiver.connect("tcp://127.0.0.1:5559") 
    control_receiver.setsockopt(zmq.SUBSCRIBE, "") 

    # Give everything a second to spin up and connect 
    time.sleep(1) 

    # Create the input array 
    nelem = number_of_elements 
    slen = string_length 
    payloads = create_inputs(nelem, slen) 

    # Send an array to each worker 
    for num in range(np): 
     work_message = { 'num' : payloads } 
     ventilator_send.send_pyobj(work_message) 

    # Poll for FINISH message, so we don't shutdown too early 
    poller = zmq.Poller() 
    poller.register(control_receiver, zmq.POLLIN) 

    while True: 
     socks = dict(poller.poll()) 

     if socks.get(control_receiver) == zmq.POLLIN: 
      control_message = control_receiver.recv() 
      if control_message == "FINISHED": 
       print("Ventilator received FINSHED, quitting!") 
       break 
      # else: unhandled message 
+0

minrk, muchas gracias por la respuesta perspicaz. ¡Muy útil! No sospeché que el valor de ZMQ_LINGER establecido por zmq_setsockopt (3), ya que como dijiste, el valor predeterminado es -1 (infinito). Gran atrapada! Definitivamente plantearé el problema primero a la gente de pyzmq y lo mencionaré en la lista de correo de zeromq también. Probé la corrección hasta la longitud de cadena establecida en 1024 * 1024 * 10, llegué al máximo de la RAM física de mi portátil y obtuve el resultado esperado. ¡Gracias de nuevo! – user183394

+3

Quizás no valga la pena mencionarlo con 'pyzmq amigos', ya que básicamente soy yo ahora mismo. Apunté libzmq al respecto y escribí un caso de prueba más simple en C: https://gist.github.com/1643223 – minrk

Cuestiones relacionadas