2012-05-29 10 views
18

En Python he visto muchos ejemplos donde se llama multiproceso, pero el objetivo solo imprime algo. Tengo un escenario donde el objetivo devuelve 2 variables, que necesito usar más adelante. Por ejemplo:¿Es posible multiprocesar una función que devuelve algo en Python?

def foo(some args): 
    a = someObject 
    b = someObject 
    return a,b 

p1=multiprocess(target=foo,args(some args)) 
p2=multiprocess(target=foo,args(some args)) 
p3=multiprocess(target=foo,args(some args)) 

¿Ahora qué? Puedo iniciar y unirme, pero ¿cómo puedo recuperar los resultados individuales? Necesito ver la respuesta a, b para todos los trabajos que ejecuto y luego trabajar en ello.

Respuesta

15

Sí, claro, puede usar varios métodos. Uno de los más fáciles es un Queue compartido. Vea un ejemplo aquí: http://eli.thegreenplace.net/2012/01/16/python-parallelizing-cpu-bound-tasks-with-multiprocessing/

+0

¿Hay una restricción en el valor de retorno? ¿Qué pasa si se trata de un archivo binario como el pdf? Mi objetivo es obtener n no: de pdf_S y luego concatenar. El orden no es significativo para nosotros. – Nishant

+1

@Nishant: puede ser cualquier información realmente. Para transferir archivos reales, haría un análisis prudente teniendo en cuenta el tamaño del archivo. Puede ser más conveniente simplemente escribir los archivos en el disco y pasarles los punteros (es decir, los nombres), pero se debe tener cuidado en términos de sincronización y atomicidad. –

7

Estoy copiando este ejemplo directamente de los documentos porque no puedo darle un enlace directo a él. Tenga en cuenta que imprime los resultados de done_queue, pero puede hacer lo que quiera con él.

# 
# Simple example which uses a pool of workers to carry out some tasks. 
# 
# Notice that the results will probably not come out of the output 
# queue in the same in the same order as the corresponding tasks were 
# put on the input queue. If it is important to get the results back 
# in the original order then consider using `Pool.map()` or 
# `Pool.imap()` (which will save on the amount of code needed anyway). 
# 
# Copyright (c) 2006-2008, R Oudkerk 
# All rights reserved. 
# 

import time 
import random 

from multiprocessing import Process, Queue, current_process, freeze_support 

# 
# Function run by worker processes 
# 

def worker(input, output): 
    for func, args in iter(input.get, 'STOP'): 
     result = calculate(func, args) 
     output.put(result) 

# 
# Function used to calculate result 
# 

def calculate(func, args): 
    result = func(*args) 
    return '%s says that %s%s = %s' % \ 
     (current_process().name, func.__name__, args, result) 

# 
# Functions referenced by tasks 
# 

def mul(a, b): 
    time.sleep(0.5*random.random()) 
    return a * b 

def plus(a, b): 
    time.sleep(0.5*random.random()) 
    return a + b 

# 
# 
# 

def test(): 
    NUMBER_OF_PROCESSES = 4 
    TASKS1 = [(mul, (i, 7)) for i in range(20)] 
    TASKS2 = [(plus, (i, 8)) for i in range(10)] 

    # Create queues 
    task_queue = Queue() 
    done_queue = Queue() 

    # Submit tasks 
    for task in TASKS1: 
     task_queue.put(task) 

    # Start worker processes 
    for i in range(NUMBER_OF_PROCESSES): 
     Process(target=worker, args=(task_queue, done_queue)).start() 

    # Get and print results 
    print 'Unordered results:' 
    for i in range(len(TASKS1)): 
     print '\t', done_queue.get() 

    # Add more tasks using `put()` 
    for task in TASKS2: 
     task_queue.put(task) 

    # Get and print some more results 
    for i in range(len(TASKS2)): 
     print '\t', done_queue.get() 

    # Tell child processes to stop 
    for i in range(NUMBER_OF_PROCESSES): 
     task_queue.put('STOP') 


if __name__ == '__main__': 
    freeze_support() 
    test() 

Originalmente desde multiprocessing module docs.

2

No funcionará en Windows pero aquí está es mi decorador de multiprocesamiento para las funciones, devuelve una cola que puede sondear y recoger datos devueltos por

import os 
from Queue import Queue 
from multiprocessing import Process 

def returning_wrapper(func, *args, **kwargs): 
    queue = kwargs.get("multiprocess_returnable") 
    del kwargs["multiprocess_returnable"] 
    queue.put(func(*args, **kwargs)) 

class Multiprocess(object): 
    """Cute decorator to run a function in multiple processes.""" 
    def __init__(self, func): 
     self.func = func 
     self.processes = [] 

    def __call__(self, *args, **kwargs): 
     num_processes = kwargs.get("multiprocess_num_processes", 2) # default to two processes. 
     return_obj = kwargs.get("multiprocess_returnable", Queue()) # default to stdlib Queue 
     kwargs["multiprocess_returnable"] = return_obj 
     for i in xrange(num_processes): 
      pro = Process(target=returning_wrapper, args=tuple([self.func] + list(args)), kwargs=kwargs) 
      self.processes.append(pro) 
      pro.start() 
     return return_obj 


@Multiprocess 
def info(): 
    print 'module name:', __name__ 
    print 'parent process:', os.getppid() 
    print 'process id:', os.getpid() 
    return 4 * 22 

data = info() 
print data.get(False) 
+0

La complejidad está presente. – Nishant

+0

No es tan malo una vez que lo haya leído –

22

Usted está buscando hacer un trabajo embarazosamente paralelo utilizando múltiples procesos ... ¿por qué no utilizar un Pool? A Pool se encargará de iniciar los procesos, recuperar los resultados y devolverle los resultados. Aquí uso pathos, que tiene un tenedor de multiprocessing, porque tiene una serialización mucho mejor que la versión que proporciona la biblioteca estándar.

>>> from pathos.multiprocessing import ProcessingPool as Pool 
>>> 
>>> def foo(obj1, obj2): 
... a = obj1.x**2 
... b = obj2.x**2 
... return a,b 
... 
>>> class Bar(object): 
... def __init__(self, x): 
...  self.x = x 
... 
>>> res = Pool().map(foo, [Bar(1),Bar(2),Bar(3)], [Bar(4),Bar(5),Bar(6)]) 
>>> res 
[(1, 16), (4, 25), (9, 36)] 

y ves que foo toma dos argumentos, y devuelve una tupla de dos objetos. El método map de Pool envía foo a los procesos subyacentes y devuelve el resultado como res.

Puede obtener pathos aquí: https://github.com/uqfoundation

+5

podría valer la pena revelar que Mike McKerns es el autor de pathos. – Nimrod

+0

Absolutamente. Para su información, esta publicación fue anterior a mi constatación de que la divulgación de la autoría es una práctica estándar. –

1

Por qué nadie usa de devolución de llamada de multiprocessing.Pool?

Ejemplo:

from multiprocessing import Pool 
from contextlib import contextmanager 

from pprint import pprint 
from requests import get as get_page 

@contextmanager 
def _terminating(thing): 
    try: 
     yield thing 
    finally: 
     thing.terminate() 

def _callback(*args, **kwargs): 
    print("CALBACK") 
    pprint(args) 
    pprint(kwargs) 

print("Processing...") 
with _terminating(Pool(processes=WORKERS)) as pool: 
    results = pool.map_async(get_page, URLS, callback=_callback) 

    start_time = time.time() 
    results.wait() 
    end_time = time.time() 
    print("Time for Processing: %ssecs" % (end_time - start_time)) 

Aquí, puedo imprimir ambos argumentos y kwargs. Pero se puede reemplazar devolución de llamada por:

def _callback2(responses): 
    for r in responses: 
     print(r.status_code) # or do whatever with response... 
Cuestiones relacionadas