2012-06-03 15 views
14

Necesito diseñar un sistema de programación de tareas escalable impulsado por Redis.Ejecución escalable escalable de tareas con Redis

Requisitos:

  • procesos de trabajo múltiples.
  • Muchas tareas, pero son posibles períodos largos de inactividad.
  • Precisión de tiempo razonable.
  • Mínimo desperdicio de recursos cuando está inactivo.
  • Debe usar la API síncrona Redis.
  • Debería funcionar para Redis 2.4 (es decir, no tiene características de la próxima versión 2.6).
  • No debe usar otros medios de RPC que Redis.

Pseudo-API: schedule_task(timestamp, task_data). La marca de tiempo está en segundos enteros.

idea básica:

  • Escuchar para las próximas tareas en la lista.
  • Ponga las tareas en cubos por marca de tiempo.
  • Dormir hasta la fecha más cercana.
  • Si aparece una nueva tarea con marca de tiempo menor que la más cercana, despiértate.
  • Procese todas las tareas venideras con indicación de fecha y hora ≤ ahora, en lotes (suponiendo que la ejecución de tareas sea rápida).
  • Asegúrese de que el trabajador concurrente no procese las mismas tareas. Al mismo tiempo, asegúrese de que no se pierdan tareas si fallamos al procesarlas.

Hasta ahora no puedo encontrar la manera de encajar esto en primitivas Redis ...

Alguna pista?

Tenga en cuenta que hay una pregunta anterior similar: Delayed execution/scheduling with Redis? En esta nueva pregunta, presento más detalles (lo más importante, muchos trabajadores). Hasta el momento, no pude averiguar cómo aplicar las respuestas antiguas aquí, por lo tanto, una nueva pregunta.

+0

Me gustaría señalar explícitamente que la interrogación de una clave de Redis en un bucle violaría el requisito de "mínimo desperdicio de recursos cuando está inactivo". Los trabajadores deben dormir cuando no hay nada que hacer. –

+0

sondeo con BLPOP/BRPOP puede bloquear hasta que la lista esté completa, y es lo que la mayoría de la gente usa para hacer esto. Por lo general, bloqueas durante unos segundos en un bucle, pero en términos de tiempo de CPU es insignificante. Puede usar redis pub/sub pero eso es malo porque si no hay un trabajador, las tareas se perderán. –

+0

@Not_a_Golfer: Las cosas son un poco más complicadas que BLPOPping una sola lista. Tenga en cuenta que necesito demorar la ejecución de la tarea (es decir, el planificador de tareas), no un procesador de tareas sencillo. –

Respuesta

0

Un enfoque combinado parece plausible:

  1. No hay nueva marca de tiempo de trabajo puede ser inferior al tiempo actual (pinza si es menor). Asumiendo una sincronización NTP confiable.

  2. Todas las tareas van a listas de depósitos en las claves, con el sufijo con la marca de tiempo de la tarea.

  3. Además, todas las marcas de tiempo de la tarea van a una zset dedicada (clave y puntuación - marca de tiempo).

  4. Se aceptan nuevas tareas de los clientes a través de la lista de Redis por separado.

  5. Loop: Fetch más antiguo N timestamps caducados a través de zrangebyscore ... limit.

  6. BLPOP con tiempo de espera en listas de tareas nuevas y listas de marcas de tiempo recuperadas.

  7. Si tiene una tarea antigua, trátela. Si es nuevo, agréguelo a bucket y zset.

  8. Compruebe si las cubetas procesadas están vacías. Si es así, elimine la lista y entrt de zset. Probablemente no revise los cubos caducados recientemente, para evitar problemas de sincronización de tiempo. Fin del ciclo

Critique? ¿Comentarios? ¿Alternativas?

4

No especificó el idioma que está utilizando. Tienes al menos 3 alternativas para hacer esto sin escribir una sola línea de código en Python al menos.

  1. Apio tiene un agente de redis opcional. http://celeryproject.org/

  2. resque es una cola de tareas redis extremadamente popular que utiliza redis. https://github.com/defunkt/resque

  3. RQ es una cola a base Redis simples y pequeños que se pretende "tomar las cosas buenas de apio y resque" y ser mucho más fácil de trabajar. http://python-rq.org/

Al menos puede mirar a su diseño si no se pueden utilizar.

Pero para responder a su pregunta, puede hacer lo que desee con redis. De hecho, he escrito más o menos eso en el pasado.

EDIT: En cuanto a modelar lo que quieres en Redis, esto es lo que haría:

  1. cola una tarea con una marca de tiempo se llevará a cabo directamente por el cliente - se pone la tarea de una conjunto ordenado con la marca de tiempo como puntaje y la tarea como el valor (ver ZADD).

  2. Un despachador central se activa cada N segundos, comprueba las primeras marcas de tiempo en este conjunto, y si hay tareas listas para su ejecución, empuja la tarea a la lista "para ejecutarse AHORA". Esto se puede hacer con ZREVRANGEBYSCORE en el conjunto ordenado "en espera", obteniendo todos los artículos con la marca de tiempo < = ahora, para que pueda obtener todos los elementos listos a la vez. empujar es hecho por RPUSH.

  3. los trabajadores usan BLPOP en la lista "para ser ejecutados AHORA", despierten cuando haya algo en lo que trabajar, y hagan lo suyo. Esto es seguro ya que el redis tiene un solo hilo, y ningún 2 trabajadores tomará la misma tarea.

  4. una vez finalizados, los trabajadores vuelven a colocar el resultado en una cola de respuesta, que es revisada por el despachador u otro hilo. Puede agregar un depósito "pendiente" para evitar fallas o algo así.

lo que el código se verá algo como esto (esto es sólo pseudo-código):

cliente:

ZADD "new_tasks" <TIMESTAMP> <TASK_INFO> 

despachador:

while working: 
    tasks = ZREVRANGEBYSCORE "new_tasks" <NOW> 0 #this will only take tasks with timestamp lower/equal than now 
    for task in tasks: 

     #do the delete and queue as a transaction 
     MULTI 
     RPUSH "to_be_executed" task 
     ZREM "new_tasks" task 
     EXEC 

    sleep(1) 

No añadí el manejo de la cola de respuesta, pero es más o menos como el trabajador:

trabajador:

while working: 
    task = BLPOP "to_be_executed" <TIMEOUT> 
    if task: 
     response = work_on_task(task) 
     RPUSH "results" response 

EDIT: despachador atómica sin estado:

while working: 

    MULTI 
    ZREVRANGE "new_tasks" 0 1 
    ZREMRANGEBYRANK "new_tasks" 0 1 
    task = EXEC 

    #this is the only risky place - you can solve it by using Lua internall in 2.6 
    SADD "tmp" task 

    if task.timestamp <= now: 
     MULTI 
     RPUSH "to_be_executed" task 
     SREM "tmp" task 
     EXEC 
    else: 

     MULTI 
     ZADD "new_tasks" task.timestamp task 
     SREM "tmp" task 
     EXEC 

    sleep(RESOLUTION) 
+0

Estoy usando Lua. :-) –

+0

¡Interesante! ¿Quieres usar realmente lua dentro de redis? o simplemente conectarse a redis con lua? el primero no funcionará –

+0

Gracias por las referencias, las revisaré. Ya que escribiste algo similar en el pasado, ¿te importa elaborar un poco sobre los puntos clave del diseño? No importa el lenguaje. –

8

Aquí hay otra solución que se basa en un par de otros [1]. Utiliza el comando redis WATCH para eliminar la condición de carrera sin usar lua en redis 2.6.

El esquema básico es:

  • Utilice un zConfigurar Redis para las tareas programadas y las colas para Redis listos para ejecutar tareas.
  • Haga que un despachador sondee las tareas zset y mueva las tareas que están listas para ejecutarse en las colas redis. Es posible que desee más de 1 despachador para la redundancia, pero probablemente no necesite o desee muchos.
  • Tenga tantos trabajadores como desee, y bloqueos en las colas de redis.

no he probado :-)

El creador de empleo foo haría:

def schedule_task(queue, data, delay_secs): 
    # This calculation for run_at isn't great- it won't deal well with daylight 
    # savings changes, leap seconds, and other time anomalies. Improvements 
    # welcome :-) 
    run_at = time.time() + delay_secs 

    # If you're using redis-py's Redis class and not StrictRedis, swap run_at & 
    # the dict. 
    redis.zadd(SCHEDULED_ZSET_KEY, run_at, {'queue': queue, 'data': data}) 

schedule_task('foo_queue', foo_data, 60) 

El despachador (s) se vería así:

while working: 
    redis.watch(SCHEDULED_ZSET_KEY) 
    min_score = 0 
    max_score = time.time() 
    results = redis.zrangebyscore(
     SCHEDULED_ZSET_KEY, min_score, max_score, start=0, num=1, withscores=False) 
    if results is None or len(results) == 0: 
     redis.unwatch() 
     sleep(1) 
    else: # len(results) == 1 
     redis.multi() 
     redis.rpush(results[0]['queue'], results[0]['data']) 
     redis.zrem(SCHEDULED_ZSET_KEY, results[0]) 
     redis.exec() 

El El trabajador foo se vería así:

while working: 
    task_data = redis.blpop('foo_queue', POP_TIMEOUT) 
    if task_data: 
     foo(task_data) 

[1] Esta solución se basa en not_a_golfer, una en http://www.saltycrane.com/blog/2011/11/unique-python-redis-based-queue-delay/, y la redis docs para transacciones.

+1

Si tiene interés para alguien, he creado una implementación Java de lo anterior ... totalmente probado y funcional. https://github.com/davidmarquis/redis-scheduler –

1

Si está buscando una solución preparada para Java. Redisson es adecuado para usted. Permite programar y ejecutar tareas (con compatibilidad cron-expression) de forma distribuida en Redisson nodes usando la conocida API ScheduledExecutorService y basada en la cola Redis.

Aquí hay un ejemplo. Primero defina una tarea usando la interfaz java.lang.Runnable. Cada tarea puede acceder a la instancia Redis a través del objeto inyectado RedissonClient.

public class RunnableTask implements Runnable { 

    @RInject 
    private RedissonClient redissonClient; 

    @Override 
    public void run() throws Exception { 
     RMap<String, Integer> map = redissonClient.getMap("myMap"); 
     Long result = 0; 
     for (Integer value : map.values()) { 
      result += value; 
     } 
     redissonClient.getTopic("myMapTopic").publish(result); 
    } 

} 

Ahora ya está listo para complete en ScheduledExecutorService:

RScheduledExecutorService executorService = redisson.getExecutorService("myExecutor"); 
ScheduledFuture<?> future = executorService.schedule(new CallableTask(), 10, 20, TimeUnit.MINUTES); 

future.get(); 
// or cancel it 
future.cancel(true); 

ejemplos con expresiones cron:

executorService.schedule(new RunnableTask(), CronSchedule.of("10 0/5 * * * ?")); 

executorService.schedule(new RunnableTask(), CronSchedule.dailyAtHourAndMinute(10, 5)); 

executorService.schedule(new RunnableTask(), CronSchedule.weeklyOnDayAndHourAndMinute(12, 4, Calendar.MONDAY, Calendar.FRIDAY)); 

Todas las tareas se ejecutan en Redisson node.

Cuestiones relacionadas