2010-07-25 27 views
23

Estaba seguro de que había algo como esto en la biblioteca estándar, pero parece que estaba equivocado.Python: algo así como `map` que funciona en los hilos

Tengo un montón de direcciones URL que quiero urlopen en paralelo. Quiero algo así como la función integrada map, excepto que el trabajo se realiza en paralelo por un montón de hilos.

¿Hay un buen módulo que hace esto?

+1

Qué quiere decir que desea asignar a iniciar las discusiones (como la respuesta de pillmuncher, mapa (urlopen, urls)), o ¿iniciará manualmente los hilos de urlopening, y querrá que algo como map actúe sobre los resultados de la ejecución de cada hilo, a medida que estén disponibles? – rbp

Respuesta

10

Alguien recomienda que utilizo el paquete futures para esto. Lo intenté y parece estar funcionando.

http://pypi.python.org/pypi/futures

He aquí un ejemplo:

"Download many URLs in parallel." 

import functools 
import urllib.request 
import futures 

URLS = ['http://www.foxnews.com/', 
     'http://www.cnn.com/', 
     'http://europe.wsj.com/', 
     'http://www.bbc.co.uk/', 
     'http://some-made-up-domain.com/'] 

def load_url(url, timeout): 
    return urllib.request.urlopen(url, timeout=timeout).read() 

with futures.ThreadPoolExecutor(50) as executor: 
    future_list = executor.run_to_futures(
      [functools.partial(load_url, url, 30) for url in URLS]) 
0

Me envuelvo en una función (no probado):

import itertools 
import threading 
import urllib2 
import Queue 

def openurl(url, queue): 
    def starter(): 
     try: 
      result = urllib2.urlopen(url) 
     except Ecxeption, exc: 
      def raiser(): 
       raise exc 
      queue.put((url, raiser)) 
     else: 
      queue.put((url, lambda:result)) 
    threadind.Thread(target=starter).start() 

myurls = ... # the list of urls 
myqueue = Queue.Queue() 

map(openurl, myurls, itertools.repeat(myqueue)) 

for each in myurls: 
    url, getresult = queue.get() 
    try: 
     result = getresult() 
    except Exception, exc: 
     print 'exception raised:' + str(exc) 
    else: 
     # do stuff with result 
34

hay un método map en multiprocessing.Pool. Eso hace múltiples procesos.

Y si los procesos múltiples no son su plato, puede usar multiprocessing.dummy que utiliza hilos.

import urllib 
import multiprocessing.dummy 

p = multiprocessing.dummy.Pool(5) 
def f(post): 
    return urllib.urlopen('http://stackoverflow.com/questions/%u' % post) 

print p.map(f, range(3329361, 3329361 + 5)) 
+0

Esto es genial, pero no funcionará en python2.6 si se ejecuta desde un hilo debido a este error: http://bugs.python.org/issue14881 – gregsabo

+0

Funciona muy bien en python 2.79 - actualmente es la última versión de 2x, y bastante bien en eso también! – FredTheWebGuy

1

Aquí es mi aplicación de la hoja de rosca:

from threading import Thread 
from queue import Queue 

def thread_map(f, iterable, pool=None): 
    """ 
    Just like [f(x) for x in iterable] but each f(x) in a separate thread. 
    :param f: f 
    :param iterable: iterable 
    :param pool: thread pool, infinite by default 
    :return: list if results 
    """ 
    res = {} 
    if pool is None: 
     def target(arg, num): 
      try: 
       res[num] = f(arg) 
      except: 
       res[num] = sys.exc_info() 

     threads = [Thread(target=target, args=[arg, i]) for i, arg in enumerate(iterable)] 
    else: 
     class WorkerThread(Thread): 
      def run(self): 
       while True: 
        try: 
         num, arg = queue.get(block=False) 
         try: 
          res[num] = f(arg) 
         except: 
          res[num] = sys.exc_info() 
        except Empty: 
         break 

     queue = Queue() 
     for i, arg in enumerate(iterable): 
      queue.put((i, arg)) 

     threads = [WorkerThread() for _ in range(pool)] 

    [t.start() for t in threads] 
    [t.join() for t in threads] 
    return [res[i] for i in range(len(res))] 
+0

Debe importar 'Vacío' en la segunda línea. – speedplane

Cuestiones relacionadas