2012-05-15 25 views
10

Estoy tratando de encontrar el peso máximo de aproximadamente 6,1 mil millones de elementos (personalizados) y me gustaría hacer esto con el procesamiento paralelo. Para mi aplicación particular hay mejores algoritmos que no requieren que itere más de 6.1 mil millones de elementos, pero el libro de texto que los explica está por encima de mi cabeza y mi jefe quiere que esto se haga en 4 días. Pensé que tenía una mejor oportunidad con el elegante servidor de mi empresa y el procesamiento en paralelo. Sin embargo, todo lo que sé sobre el procesamiento paralelo proviene de leer el Pythondocumentation. Es decir, estoy bastante perdido ...Evitar las condiciones de carrera en las colas de multiprocesamiento de Python 3

Mi teoría actual es configurar un proceso de alimentación, una cola de entrada, un grupo completo (digamos, 30) de procesos de trabajo y una cola de salida (encontrar el el elemento máximo en la cola de salida será trivial). Lo que no entiendo es cómo el proceso del alimentador puede decirle a los procesos del trabajador cuándo dejar de esperar que los artículos ingresen a través de la cola de entrada.

Pensé en usar multiprocessing.Pool.map_async en mi iterable de 6.1E9 elementos, pero se tarda casi 10 minutos simplemente para recorrer los elementos sin hacerles nada. A menos que esté malinterpretando algo ..., que tiene map_async iterar a través de ellos para asignarlos a los procesos podría hacerse mientras los procesos comienzan su trabajo. (Pool proporciona también imap pero el documentation dice que es similar a map, lo que no parece funcionar de forma asíncrona Quiero asíncrono, justo.?)

preguntas relacionadas: ¿Quiero utilizar concurrent.futures en lugar de multiprocessing ? No podría ser la primera persona en implementar un sistema de dos colas (así es exactamente como funcionan las líneas de todas las tiendas de delicatessen en Estados Unidos ...). ¿Hay alguna forma más pitonica/integrada para hacer esto?

Aquí hay un esqueleto de lo que estoy tratando de hacer. Ver el bloque de comentarios en el medio.

import multiprocessing as mp 
import queue 

def faucet(items, bathtub): 
    """Fill bathtub, a process-safe queue, with 6.1e9 items""" 
    for item in items: 
     bathtub.put(item) 
    bathtub.close() 

def drain_filter(bathtub, drain): 
    """Put maximal item from bathtub into drain. 
    Bathtub and drain are process-safe queues. 
    """ 
    max_weight = 0 
    max_item = None 
    while True: 
     try: 
      current_item = bathtub.get() 
     # The following line three lines are the ones that I can't 
     # quite figure out how to trigger without a race condition. 
     # What I would love is to trigger them AFTER faucet calls 
     # bathtub.close and the bathtub queue is empty. 
     except queue.Empty: 
      drain.put((max_weight, max_item)) 
      return 
     else: 
      bathtub.task_done() 
     if not item.is_relevant(): 
      continue 
     current_weight = item.weight 
     if current_weight > max_weight: 
      max_weight = current_weight 
      max_item = current_item 

def parallel_max(items, nprocs=30): 
    """The elements of items should have a method `is_relevant` 
    and an attribute `weight`. `items` itself is an immutable 
    iterator object. 
    """ 
    bathtub_q = mp.JoinableQueue() 
    drain_q = mp.Queue() 

    faucet_proc = mp.Process(target=faucet, args=(items, bathtub_q)) 
    worker_procs = mp.Pool(processes=nprocs) 

    faucet_proc.start() 
    worker_procs.apply_async(drain_filter, bathtub_q, drain_q) 

    finalists = [] 
    for i in range(nprocs): 
     finalists.append(drain_q.get()) 

    return max(finalists) 


está aquí LA RESPUESTA

he encontrado una respuesta muy completa a mi pregunta, y una suave introducción a la multitarea del director de comunicación de la Fundación Python Doug Hellman. Lo que quería era el patrón de "píldora de veneno". Verlo aquí: http://www.doughellmann.com/PyMOTW/multiprocessing/communication.html

Apoyos a @MRAB para publicar el kernel de ese concepto.

+0

¿Por qué 'importar cola' si está utilizando 'multiprocesamiento.Queue'? –

+0

Lo uso para atrapar una excepción 'queue.Empty' cuando el trabajador mira su cola de entrada. Mi ilusión era que esa excepción se podía lanzar si y solo si la cola se había cerrado y también estaba vacía. Tenga en cuenta que bajo el método de @ MRAB en su respuesta, la importación de 'queue' sería innecesaria. – wkschwartz

Respuesta

3

Puede poner un elemento especial de terminación, como Ninguno, en la cola. Cuando un trabajador lo ve, puede volver a colocarlo para que lo vean los otros trabajadores y luego terminarlo. Alternativamente, puede poner un artículo de terminación especial por trabajador en la cola.

+1

Esa es una muy buena respuesta. Voy a dejar esta pregunta abierta un poco más en caso de que alguien pueda responder las preguntas relacionadas. – wkschwartz

+0

Así que puse 'bathtub.put (None)' al final de 'faucet()' antes de cerrar la cola, pero ¿qué hago con las llamadas 'task_done()'?¿O simplemente cambio de usar 'JoinableQueue' a una' Queue' normal y deshacerme de todas las llamadas 'task_done()'? – wkschwartz

Cuestiones relacionadas