2012-02-12 8 views
5

unos días me ha hecho una pregunta sobre SO trata de ayudar a diseñar un paradigma para la estructuración de múltiples peticiones HTTPdiferencias se obtiene al usar corrutinas vs roscado hace

Este es el escenario. Me gustaría tener un sistema multi-productor, multi-consumidor. Mis productores rastrean y rastrean algunos sitios y agregan los enlaces que encuentra en una cola. Como voy a rastrear varios sitios, me gustaría tener varios productores/rastreadores.

Los consumidores/trabajadores se alimentan de esta cola, realizan solicitudes TCP/UDP a estos enlaces y guardan los resultados en mi Django DB. También me gustaría tener trabajadores múltiples ya que cada elemento de la cola es totalmente independiente el uno del otro.

Las personas sugirieron que se use una biblioteca de coroutine para esto, es decir, Gevent o Eventlet. Como nunca trabajé con corutinas, leí que aunque el paradigma de programación es similar a los paradigmas de subprocesos, solo un subproceso se está ejecutando activamente, pero cuando se producen llamadas, como las llamadas de E/S, las pilas se cambian en memoria y el otro verde el hilo toma el control hasta que encuentra algún tipo de llamada de E/S de bloqueo. Espero tener esto bien? Aquí está el código de uno de mis mensajes SO:

import gevent 
from gevent.queue import * 
import time 
import random 

q = JoinableQueue() 
workers = [] 
producers = [] 


def do_work(wid, value): 
    gevent.sleep(random.randint(0,2)) 
    print 'Task', value, 'done', wid 


def worker(wid): 
    while True: 
     item = q.get() 
     try: 
      print "Got item %s" % item 
      do_work(wid, item) 
     finally: 
      print "No more items" 
      q.task_done() 


def producer(): 
    while True: 
     item = random.randint(1, 11) 
     if item == 10: 
      print "Signal Received" 
      return 
     else: 
      print "Added item %s" % item 
      q.put(item) 


for i in range(4): 
    workers.append(gevent.spawn(worker, random.randint(1, 100000))) 

# This doesn't work. 
for j in range(2): 
    producers.append(gevent.spawn(producer)) 

# Uncommenting this makes this script work. 
# producer() 

q.join() 

Esto funciona bien porque los sleep llamadas son llamadas de bloqueo y cuando se produce un evento sleep, otro hilo verde se hace cargo. Esto es mucho más rápido que la ejecución secuencial. Como puede ver, no tengo ningún código en mi programa que deliberadamente ceda la ejecución de un hilo a otro. No veo cómo encaja esto en el escenario anterior, ya que me gustaría tener todos los subprocesos ejecutándose simultáneamente.

Todo funciona bien, pero creo que el rendimiento que he logrado con Gevent/Eventlets es mayor que con el programa secuencial original, pero drásticamente más bajo que con el real-threading.

Si tuviera que volver a implementar mi programa mediante el uso de mecanismos de enhebrado, cada uno de mis productores y consumidores podría estar trabajando simultáneamente sin la necesidad de intercambiar las pilas como corutinas.

¿Debería volver a implementarse utilizando el enhebrado? ¿Está mal mi diseño? No he podido ver los beneficios reales del uso de corutinas.

Tal vez mis conceptos son un poco embarrados, pero esto es lo que he asimilado. Cualquier ayuda o aclaración de mi paradigma y conceptos sería genial.

Gracias

+0

¿Por qué no utilizar procesos múltiples? –

+0

No conozco los pros y contras de multi-threading versus multi-processing, así que no sé si está bien o no. –

+1

no existe el "enhebrado real" (solo se ejecuta un subproceso del sistema operativo real en un momento dado) en programas Python sin recurrir a extensiones C (o procesos de SO pesados) debido al bloqueo de Intérprete global. –

Respuesta

5

Como puedes ver, no tengo ningún código en mi programa que deliberadamente ceda la ejecución de un hilo a otro hilo. No veo cómo encaja esto en el escenario anterior ya que me gustaría tener todos los hilos ejecutándose simultáneamente.

Hay un único subproceso del sistema operativo pero varios puntos de inserción. En su caso, gevent.sleep() permite que los trabajadores ejecuten al mismo tiempo. El bloqueo de llamadas IO como urllib2.urlopen(url).read() hace lo mismo si usa urllib2 parcheado para trabajar con gevent (llamando al gevent.monkey.patch_*()).

Consulte también A Curious Course on Coroutines and Concurrency para comprender cómo un código puede funcionar al mismo tiempo en un único entorno de subprocesos.

para comparar las diferencias de rendimiento entre GEvent, roscado, multiprocesamiento se podría escribir el código que es compatible con todos los aproaches:

#!/usr/bin/env python 
concurrency_impl = 'gevent' # single process, single thread 
##concurrency_impl = 'threading' # single process, multiple threads 
##concurrency_impl = 'multiprocessing' # multiple processes 

if concurrency_impl == 'gevent': 
    import gevent.monkey; gevent.monkey.patch_all() 

import logging 
import time 
import random 
from itertools import count, islice 

info = logging.info 

if concurrency_impl in ['gevent', 'threading']: 
    from Queue import Queue as JoinableQueue 
    from threading import Thread 
if concurrency_impl == 'multiprocessing': 
    from multiprocessing import Process as Thread, JoinableQueue 

El resto del guión es el mismo para todas las implementaciones de concurrencia:

def do_work(wid, value): 
    time.sleep(random.randint(0,2)) 
    info("%d Task %s done" % (wid, value)) 

def worker(wid, q): 
    while True: 
     item = q.get() 
     try: 
      info("%d Got item %s" % (wid, item)) 
      do_work(wid, item) 
     finally: 
      q.task_done() 
      info("%d Done item %s" % (wid, item)) 

def producer(pid, q): 
    for item in iter(lambda: random.randint(1, 11), 10): 
     time.sleep(.1) # simulate a green blocking call that yields control 
     info("%d Added item %s" % (pid, item)) 
     q.put(item) 
    info("%d Signal Received" % (pid,)) 

no ejecutar código en un nivel módulo puso en main():

def main(): 
    logging.basicConfig(level=logging.INFO, 
         format="%(asctime)s %(process)d %(message)s") 

    q = JoinableQueue() 
    it = count(1) 
    producers = [Thread(target=producer, args=(i, q)) for i in islice(it, 2)] 
    workers = [Thread(target=worker, args=(i, q)) for i in islice(it, 4)] 
    for t in producers+workers: 
     t.daemon = True 
     t.start() 

    for t in producers: t.join() # put items in the queue 
    q.join() # wait while it is empty 
    # exit main thread (daemon workers die at this point) 

if __name__=="__main__":  
    main() 
+0

Hola, Sebastian, he investigado mi código y he visto que mis productores y consumidores están trabajando simultáneamente. Cuando ocurre una operación de bloqueo en uno de mis greenlets, cede el control a los otros greenlets. He agregado la llamada 'monkey_patch' faltante para que el módulo de socket no sea bloqueable también, pero no puedo obtener suficiente procesador crunch. Una PC normal tiene suficiente energía para tener más conexiones simultáneas y más áreas verdes, pero no tengo la velocidad suficiente. Estoy muy perdido y confundido porque no usa más procesador y trabaja más rápido. ¿Podrías ayudarme a entender por favor? Estoy muy perdido. Gracias. –

+0

@Mridang Agarwalla: He comentado el código que publicaste en tu pregunta. 'productores' * no * funcionan concurrentemente en él. – jfs

+1

@Mridang Agarwalla: si su problema está vinculado a IO (disco, red), entonces no importa qué tan rápido sea su CPU; por ejemplo, si puede escribir en disco solo a 50MB/s, entonces no importa que su CPU pueda procesa 1GB/s. Además, su programa puede consumir otros recursos finitos, como la cantidad de archivos abiertos. Si usa 'gevent', asegúrese de que todas las llamadas de bloqueo sean" verdes ", es decir, no bloquean, por ejemplo, su controlador de base de datos podría no ser compatible con' gevent'. – jfs

1

GEvent es grande cuando usted tiene muchos hilos (verde). Lo probé con miles y funcionó muy bien. tienes que asegurarte de que todas las bibliotecas que usas tanto para raspar como para guardar en db se vuelvan verdes. afaik si usan el socket de python, la inyección gevent debería funcionar. las extensiones escritas en C (por ejemplo, mysqldb) bloquearían sin embargo y necesitarías usar equivalentes verdes en su lugar.

si usas gevent puedes eliminar las colas, engendrar un hilo nuevo (verde) para cada tarea, el código para el hilo es tan simple como db.save(web.get(address)). gevent se encargará de prioridad cuando alguna biblioteca en db o bloques web. funcionará siempre que tus tareas quepan en la memoria.

0

En este caso, su problema no es la velocidad del programa (es decir, la elección de gevent o threading), sino el rendimiento de la red IO. Ese es (debería ser) el cuello de botella que determina qué tan rápido se ejecuta el programa.

Gevent es una buena forma de asegurarse de que sea el cuello de botella, y no la arquitectura de su programa.

Este es el tipo de proceso que querría:

import gevent 
from gevent.queue import Queue, JoinableQueue 
from gevent.monkey import patch_all 


patch_all() # Patch urllib2, etc 


def worker(work_queue, output_queue): 
    for work_unit in work_queue: 
     finished = do_work(work_unit) 
     output_queue.put(finished) 
     work_queue.task_done() 


def producer(input_queue, work_queue): 
    for url in input_queue: 
     url_list = crawl(url) 
     for work in url_list: 
      work_queue.put(work) 
     input_queue.task_done() 


def do_work(work): 
    gevent.sleep(0) # Actually proces link here 
    return work 


def crawl(url): 
    gevent.sleep(0) 
    return list(url) # Actually process url here 

input = JoinableQueue() 
work = JoinableQueue() 
output = Queue() 

workers = [gevent.spawn(worker, work, output) for i in range(0, 10)] 
producers = [gevent.spawn(producer, input, work) for i in range(0, 10)] 


list_of_urls = ['foo', 'bar'] 

for url in list_of_urls: 
    input.put(url) 

# Wait for input to finish processing 
input.join() 
print 'finished producing' 
# Wait for workers to finish processing work 
work.join() 
print 'finished working' 

# We now have output! 
print 'output:' 
for message in output: 
    print message 
# Or if you'd like, you could use the output as it comes! 

No es necesario esperar a que las colas de entrada y de trabajo para terminar, he aquí que acaban de demostrar.

Cuestiones relacionadas