2012-10-10 14 views
11

¿Puedo utilizar una primitiva de apio Group como tarea paraguas en un mapa/reducir el flujo de trabajo?Tarea del grupo de apio para usar en un flujo de trabajo de mapa/reducir

O más específico: ¿Pueden las subtareas en un grupo ejecutarse en varios trabajadores en múltiples servidores ?

A partir de los documentos:

However, if you call apply_async on the group it will send a special 
grouping task, so that the action of calling the tasks happens in a worker 
instead of the current process 

Eso parece implicar todas las tareas se envíe a un trabajador ...

Antes 3.0 (y sigue siendo) uno podría disparar las subtareas en un taskset cuales se ejecutaría en múltiples servidores. El problema es determinar si todas las tareas han terminado de ejecutarse. Eso normalmente se hace al sondear todas las subtareas que no son realmente elegantes. Me pregunto si la primitiva de grupo se puede utilizar para mitigar este problema.

+0

distribuye las tareas perfectamente bien con un comando regular de 'grupo' al menos en apio 3.1, parece que la declaración anterior se eliminó de los documentos – Grozz

Respuesta

23

Descubrí que es posible utilizar Acordes para ese mapa, como reducir el problema.

@celery.task(name='ic.mapper') 
def mapper(): 
    #split your problem in embarrassingly parallel maps 
    maps = [map.s(), map.s(), map.s(), map.s(), map.s(), map.s(), map.s(), map.s()] 
    #and put them in a chord that executes them in parallel and after they finish calls 'reduce' 
    mapreduce = celery.chord(maps)(reduce.s())  
    return "{0} mapper ran on {1}".format(celery.current_task.request.id, celery.current_task.request.hostname) 

@celery.task(name='ic.map') 
def map(): 
    #do something useful here 
    import time 
    time.sleep(10.0) 
    return "{0} map ran on {1}".format(celery.current_task.request.id, celery.current_task.request.hostname) 

@celery.task(name='ic.reduce') 
def reduce(results): 
    #put the maps together and do something with the results 
    return "{0} reduce ran on {1}".format(celery.current_task.request.id, celery.current_task.request.hostname) 

Cuando se ejecuta el asignador de un grupo de tres trabajadores/servidores que ejecuta primero el asignador que divide su problema y la crea nuevas tareas parciales que se presentan de nuevo al corredor. Se ejecutan en paralelo porque todos los intermediarios consumen la cola. También se crea una tarea de acordes que sondea todos los mapas para ver si han terminado. Cuando termine, la tarea de reducción se ejecuta donde puede pegar sus resultados nuevamente.

En total: si es posible. Gracias por los chicos vegetales!

Cuestiones relacionadas