2011-04-04 11 views
9

Tengo un pequeño grupo de trabajadores (4) y una lista muy grande de tareas (5000 ~). Estoy usando un grupo y enviando las tareas con map_async(). Debido a que la tarea que estoy ejecutando es bastante larga, estoy forzando un tamaño de chunksize de 1 para que un proceso largo no pueda contener algunos más cortos.Python: ¿Cómo puedo verificar el número de tareas pendientes en un multiprocesamiento.Pool?

Lo que me gustaría hacer es verificar periódicamente cuántas tareas quedan pendientes. Sé que a lo sumo 4 estarán activos. Me preocupa cuántos quedan por procesar.

He buscado en Google y no puedo encontrar a nadie que haga esto.

Algunos código simple para ayudar:

import multiprocessing 
import time 

def mytask(num): 
    print('Started task, sleeping %s' % num) 
    time.sleep(num) 

pool = multiprocessing.Pool(4) 
jobs = pool.map_async(mytask, [1,2,3,4,5,3,2,3,4,5,2,3,2,3,4,5,6,4], chunksize=1) 
pool.close() 

while True: 
    if not jobs.ready(): 
     print("We're not done yet, %s tasks to go!" % <somethingtogettasks>) 
     jobs.wait(2) 
    else: 
     break 
+0

Debo notar que estoy usando python2.6 en un sistema RHEL-6, sin embargo, estoy abierto a ejemplos en diferentes versiones/plataformas. – jkeating

+0

variable estática que se disminuye cuando se completa la tarea? (y se incrementa cuando la tarea comienza obviamente). – Enders

+0

Las tareas no "comienzan" hasta que el trabajador las recibe. Supongo que si creara un global que fuera del tamaño de las tareas a realizar, entonces lo disminuiría cada vez que comenzara una tarea que podría hacerlo, pero eso es un poco incómodo y requiere un poco de seguridad en el hilo. – jkeating

Respuesta

6

Parece que jobs._number_left es lo que quiere. _ indica que es un valor interno que puede cambiar según el capricho de los desarrolladores, pero parece ser la única forma de obtener esa información.

+0

¡Ah! No estaba en los documentos de API, y había olvidado hacer un dir() en trabajos en ipython. ¡Gracias por la respuesta! – jkeating

1

ninguna manera hermética, que yo sepa, pero si se utiliza la función Pool.imap_unordered() en lugar de map_async, puede interceptar los elementos que se procesan.

import multiprocessing 
import time 

process_count = 4 

def mytask(num): 
    print('Started task, sleeping %s' % num) 
    time.sleep(num) 
    # Actually, you should return the job you've created here. 
    return num 

pool = multiprocess.Pool(process_count) 
jobs = [] 
items = [1,2,3,4,5,3,2,3,4,5,2,3,2,3,4,5,6,4] 
job_count = 0 
for job in pool.imap_unordered(mytask, items): 
    jobs.append(job) 
    job_count += 1 

    incomplete = len(items) - job_count 
    unsubmitted = max(0, incomplete - process_count) 

    print "Jobs incomplete: %s. Unsubmitted: %s" % incomplete, unsubmitted 

pool.close() 

estoy restando process_count, porque se puede suponer que casi todos los procesos se procesan con una de las dos excepciones: 1) si se utiliza un iterador, puede que no haya más elementos dejaron de consumir y el proceso de , y 2) Puede que le queden menos de 4 elementos. No codifiqué para la primera excepción. Pero debería ser bastante fácil hacerlo si es necesario. De todos modos, tu ejemplo usa una lista, por lo que no deberías tener ese problema.

Editar: También me di cuenta de que está utilizando un bucle While, que hace que parezca que está tratando de actualizar algo periódicamente, por ejemplo, cada medio segundo o algo así. El código que di como ejemplo no lo hará de esa manera. No estoy seguro de si eso es un problema.

+0

Gracias. Realmente no había explorado las funciones imap (los documentos eran un poco ... escuetos). Sin embargo, tienes razón, me gustaría hacer otras cosas mientras se realizan los trabajos e informar periódicamente sobre cuántos trabajos quedan. – jkeating

1

Tengo requisitos similares: realizar un seguimiento del progreso, realizar trabajos provisionales en función de los resultados, detener todo el proceso de forma limpia en cualquier momento arbitrario. La forma en que lo he tratado es enviar tareas de una en una con apply_async. Una versión muy simplificada de lo que hago:

maxProcesses = 4 
q = multiprocessing.Queue() 
pool = multiprocessing.Pool() 
runlist = range(100000) 
sendcounter = 0 
donecounter = 0 
while donecounter < len(runlist): 
    if stopNowBooleanFunc(): # if for whatever reason I want to stop processing early 
     if donecounter == sendcounter: # wait til already sent tasks finish running 
      break 
    else: # don't send new tasks if it's time to stop 
     while sendcounter < len(runlist) and sendcounter - donecounter < maxProcesses: 
      pool.apply_async(mytask, (runlist[sendcounter], q)) 
      sendcounter += 1 

    while not q.empty(): # process completed results as they arrive 
     aresult = q.get() 
     processResults(aresult) 
     donecounter += 1 

Nota que utilizo un Queue en lugar de return ing los resultados.

Cuestiones relacionadas