2012-07-18 19 views
6

¿Hay alguna manera de volver a enviar un fragmento de datos para su procesamiento, si el cálculo original falló, utilizando un grupo simple?Reintentos de agrupación de multiprocesamiento de python

import random 
from multiprocessing import Pool 

def f(x): 
    if random.getrandbits(1): 
     raise ValueError("Retry this computation") 
    return x*x 

p = Pool(5) 
# If one of these f(x) calls fails, retry it with another (or same) process 
p.map(f, [1,2,3]) 
+1

Tal vez usted quiere 'retorno f (x) 'en lugar de subir un' ValueError'? Solo adivinando ... –

+0

¿Qué tan alta es la probabilidad de falla en su aplicación real? Es decir, ¿qué tan importante es que el proceso intente de inmediato en lugar de esperar a que los otros procesos terminen primero? – Isaac

+0

Es una posibilidad moderada de falla, y no necesita ser reintentada inmediatamente (pero debe ser reintentada en paralelo, eventualmente). – ash

Respuesta

9

Si se puede (o no les importa) reintentar inmediatamente, use un decorador de envolver la función:

import random 
from multiprocessing import Pool 
from functools import wraps 

def retry(f): 
    @wraps(f) 
    def wrapped(*args, **kwargs): 
     while True: 
      try: 
       return f(*args, **kwargs) 
      except ValueError: 
       pass 
    return wrapped 

@retry 
def f(x): 
    if random.getrandbits(1): 
     raise ValueError("Retry this computation") 
    return x*x 

p = Pool(5) 
# If one of these f(x) calls fails, retry it with another (or same) process 
p.map(f, [1,2,3]) 
5

Puede utilizar un Queue para retroalimentar las fallas en el Pool a través de un bucle en el inicio de Process:

import multiprocessing as mp 
import random 

def f(x): 
    if random.getrandbits(1): 
     # on failure/exception catch 
     f.q.put(x) 
     return None 
    return x*x 

def f_init(q): 
    f.q = q 

def main(pending): 
    total_items = len(pending) 
    successful = [] 
    failure_tracker = [] 

    q = mp.Queue() 
    p = mp.Pool(None, f_init, [q]) 
    results = p.imap(f, pending) 
    retry_results = [] 
    while len(successful) < total_items: 
     successful.extend([r for r in results if not r is None]) 
     successful.extend([r for r in retry_results if not r is None]) 
     failed_items = [] 
     while not q.empty(): 
      failed_items.append(q.get()) 
     if failed_items: 
      failure_tracker.append(failed_items) 
      retry_results = p.imap(f, failed_items); 
    p.close() 
    p.join() 

    print "Results: %s" % successful 
    print "Failures: %s" % failure_tracker 

if __name__ == '__main__': 
    main(range(1, 10)) 

La salida es la siguiente:

Results: [1, 4, 36, 49, 25, 81, 16, 64, 9] 
Failures: [[3, 4, 5, 8, 9], [3, 8, 4], [8, 3], []] 

Un Pool no puede ser compartido entre múltiples procesos. Por lo tanto, este enfoque basado en Queue. Si intenta pasar una piscina como un parámetro a los procesos de piscinas, obtendrá este error:

NotImplementedError: pool objects cannot be passed between processes or pickled 

Usted podría alternativamente probar un par de reintentos inmediatos dentro de su función f, para evitar la sobrecarga de sincronización. Realmente se trata de cuán pronto debe esperar su función para volver a intentarlo, y de qué tan probable es un éxito si se vuelve a intentar de inmediato.


respuesta antigua:En aras de la exhaustividad, aquí está mi respuesta anterior, que no es tan óptima como volver a presentar directamente en la piscina, pero aún podría ser relevante en función del caso de uso , ya que proporciona una forma natural para hacer frente a/límite de reintentos n -level:

se puede utilizar un Queue a fallos agregados y volver a presentar al final de cada carrera, a través de múltiples:

import multiprocessing as mp 
import random 


def f(x): 
    if random.getrandbits(1): 
     # on failure/exception catch 
     f.q.put(x) 
     return None 
    return x*x 

def f_init(q): 
    f.q = q 

def main(pending): 
    run_number = 1 
    while pending: 
     jobs = pending 
     pending = [] 

     q = mp.Queue() 
     p = mp.Pool(None, f_init, [q]) 
     results = p.imap(f, jobs) 
     p.close() 

     p.join() 
     failed_items = [] 
     while not q.empty(): 
      failed_items.append(q.get()) 
     successful = [r for r in results if not r is None] 
     print "(%d) Succeeded: %s" % (run_number, successful) 
     print "(%d) Failed: %s" % (run_number, failed_items) 
     print 
     pending = failed_items 
     run_number += 1 

if __name__ == '__main__': 
    main(range(1, 10)) 

con una salida como ésta:

(1) Succeeded: [9, 16, 36, 81] 
(1) Failed: [2, 1, 5, 7, 8] 

(2) Succeeded: [64] 
(2) Failed: [2, 1, 5, 7] 

(3) Succeeded: [1, 25] 
(3) Failed: [2, 7] 

(4) Succeeded: [49] 
(4) Failed: [2] 

(5) Succeeded: [4] 
(5) Failed: [] 
+0

Actualicé mi respuesta a una que no requiere varias ejecuciones, y ahora funciona en el mismo conjunto original. –

+0

Gracias por la respuesta detallada. Me gusta la idea de volver a intentar cálculos fallidos en una cola. Debo recompensar a Andrew con la recompensa porque su solución hace un simple intento. – ash

+0

@ash Mencioné los intentos inmediatos en mi respuesta, pensando que sería una adición trivial/simple y no lo que estaba buscando. Tenga en cuenta también que (reintentos inmediatos) no es óptimo para todos los casos, especialmente aquellos en los que un reintento inmediato tiene pocas posibilidades de éxito (en cuyo caso es muy poco óptimo ya que causa inanición de recursos para trabajos que potencialmente podrían tener éxito). Felicitaciones a Andrew de todas formas. –

Cuestiones relacionadas