2011-09-24 10 views
5

Soy completamente nuevo en multiprocesamiento. He estado leyendo documentación sobre el módulo de multiprocesamiento. Leí sobre Pool, Threads, Colas, etc., pero estoy completamente perdido.Qué estrategia usar con multiprocesamiento en python

Lo que quiero hacer con multiprocesamiento es que, convertir mi humilde descargador http, para trabajar con varios trabajadores. Lo que estoy haciendo en este momento es descargar una página, analizar la página para obtener enlaces interesantes. Continúa hasta descargar todos los enlaces interesantes. Ahora, quiero implementar esto con multiprocesamiento. Pero no tengo idea en este momento, cómo organizar este flujo de trabajo. Tenía dos pensamientos sobre esto. En primer lugar, pensé en tener dos colas. Una cola para los enlaces que deben descargarse, otra para que los enlaces sean analizados. Un trabajador, descarga las páginas y las agrega a la cola, que es para los elementos que se deben analizar. Y otro proceso analiza una página y agrega los enlaces que encuentra interesantes a la otra cola. Los problemas que espero de este enfoque son; En primer lugar, ¿por qué descargar una página a la vez y analizar una página a la vez? Además, ¿cómo sabe un proceso que hay elementos que se agregarán a la cola más tarde, después de agotar todos los elementos de la cola?

Otro enfoque que pensé usar es eso. Tener una función, que se puede llamar con una url como argumento. Esta función descarga el documento y comienza a analizarlo para los enlaces. Cada vez que encuentra un enlace interesante, crea instantáneamente un nuevo hilo que ejecuta una función idéntica a la suya. El problema que tengo con este enfoque es, ¿cómo puedo hacer un seguimiento de todos los procesos generados por todos lados, cómo sé si todavía hay procesos para ejecutar. Y también, ¿cómo puedo limitar el número máximo de procesos?

Así que estoy completamente perdido. ¿Alguien puede sugerir una buena estrategia y tal vez mostrar algunos códigos de ejemplo sobre cómo seguir con la idea?

+0

Esto se ha discutido con cierta profundidad [anteriormente] (http://stackoverflow.com/questions/731993/multiprocessing-or-multithreading) – brc

+0

¿Puedo sugerir que consulte la biblioteca de eventos? Puede encontrar que se adapta mejor a sus propósitos que usar multiprocesamiento. –

Respuesta

3

Aquí hay un enfoque, usando multiprocesamiento. (Muchas gracias a @Voo, por sugerir muchas mejoras al código).

import multiprocessing as mp 
import logging 
import Queue 
import time 

logger=mp.log_to_stderr(logging.DEBUG) # or, 
# logger=mp.log_to_stderr(logging.WARN) # uncomment this to silence debug and info messages 

def worker(url_queue,seen): 
    while True: 
     url=url_queue.get() 
     if url not in seen: 
      logger.info('downloading {u}'.format(u=url)) 
      seen[url]=True 
      # Replace this with code to dowload url 
      # urllib2.open(...) 
      time.sleep(0.5) 
      content=url 
      logger.debug('parsing {c}'.format(c=content)) 
      # replace this with code that finds interesting links and 
      # puts them in url_queue 
      for i in range(3): 
       if content<5: 
        u=2*content+i-1 
        logger.debug('adding {u} to url_queue'.format(u=u)) 
        time.sleep(0.5) 
        url_queue.put(u) 
     else: 
      logger.debug('skipping {u}; seen before'.format(u=url)) 
     url_queue.task_done() 

if __name__=='__main__': 
    num_workers=4 
    url_queue=mp.JoinableQueue() 
    manager=mp.Manager() 
    seen=manager.dict() 

    # prime the url queue with at least one url 
    url_queue.put(1) 
    downloaders=[mp.Process(target=worker,args=(url_queue,seen)) 
       for i in range(num_workers)] 
    for p in downloaders: 
     p.daemon=True 
     p.start() 
    url_queue.join() 
  • Una piscina de (4) se crean procesos de trabajo.
  • Hay un JoinableQueue, llamado url_queue.
  • Cada trabajador recibe una url del url_queue, encuentra nuevas URL y agrega al url_queue.
  • Solo después de agregar nuevos elementos llama al url_queue.task_done().
  • El proceso principal llama al url_queue.join(). Esto bloquea el proceso principal hasta llamar al task_done para cada tarea en el url_queue.
  • Dado que los procesos de trabajador tienen el atributo daemon establecido en True, también finalizan cuando finaliza el proceso principal.

Todos los componentes utilizados en este ejemplo también se explican en Doug Hellman's excellent Python Module of the Week tutorial on multiprocessing.

+0

Personalmente, me quedaría sin la configuración del daemon y finalizaría los procesos regularmente, pero si lo hace, realmente tiene que unirse con los analizadores y no con los descargadores, porque de lo contrario es muy probable que algún archivo se descargue pero nunca se analice. Y eso obviamente lo hace un poco más complejo, así que simplemente agregaría un valor centinela a las colas para que los procesos sepan cuándo ya no hay datos. Probablemente sea mejor desde un punto de equilibrio de carga (y simplifica la lógica) para tener solo un grupo de cola/procesador. – Voo

+0

@Voo: ¿cómo sabrías cuándo agregar un centinela a la cola? – unutbu

+0

Mi solución para esto: use 'JoinableQueue'. El hilo principal crea el grupo de procesos, pone las tareas iniciales en cola, inicia el proceso y se une a la cola. El ciclo para cada proceso de trabajo: obtener trabajo, si el trabajo es un centinela, romper. De lo contrario, ejecute el trabajo, ponga nuevos trabajos en la cola, etc. Finalmente, llame a 'task_done' y repita. El hilo principal está bloqueado hasta que todos los trabajos estén terminados, cuando lo desbloquea coloca nrProcesses Sentinels en la cola y terminamos. – Voo

1

Lo que estás describiendo es esencialmente atravesar gráficos; La mayoría de los algoritmos de cruce de gráficos (que son más sofisticados que la profundidad primero), haga un seguimiento de dos conjuntos de nodos , en su caso, los nodos son url.

El primer conjunto se denomina "conjunto cerrado" y representa todos los nodos que ya se han visitado y procesado.Si, mientras procesas una página, encuentras un enlace que está en el conjunto cerrado, puedes ignorarlo, ya se ha procesado.

El segundo conjunto es, como era de esperar, llamado el "conjunto abierto", e incluye todos los bordes que se han encontrado, pero aún no se han procesado.

El mecanismo básico es comenzar poniendo el nodo raíz en el conjunto abierto (el conjunto cerrado está inicialmente vacío, no se han procesado los nodos) y comenzar a trabajar. Cada trabajador toma un único nodo del conjunto abierto, lo copia al conjunto cerrado, procesa el nodo y agrega los nodos que descubre de nuevo al conjunto abierto (siempre que no estén ya en los conjuntos abierto o cerrado) . Una vez que el conjunto abierto está vacío (y ningún trabajador todavía está procesando los nodos), el gráfico se ha recorrido completamente.

Realmente implementar esto en multiprocessing probablemente signifique que tendrá una tarea maestra que realiza un seguimiento de los conjuntos abierto y cerrado; Si un trabajador de un grupo de trabajadores indica que está listo para trabajar, el trabajador principal se encarga de mover el nodo del conjunto abierto al conjunto cerrado y de poner en marcha al trabajador. los trabajadores pueden pasar todos los de los nodos que encuentren, sin preocuparse por si ya están cerrados, volver al maestro; y el maestro ignorará los nodos que ya están cerrados.

Cuestiones relacionadas