2010-05-18 4 views
11

La fuerza de Twisted (para python) es su marco asincrónico (creo). He escrito un servidor de procesamiento de imágenes que recibe solicitudes a través de Perspective Broker. Funciona muy bien siempre que lo alimente menos de un par de cientos de imágenes a la vez. Sin embargo, a veces se dispara con cientos de imágenes prácticamente al mismo tiempo. Como trata de procesarlos todos simultáneamente, el servidor se bloquea.¿Llamadas remotas en cola a un intermediario de perspectiva de Python Twisted?

Como solución me gustaría hacer cola los remote_calls en el servidor de forma que sólo procesa ~ 100 imágenes a la vez. Parece que esto podría ser algo que Twisted ya hace, pero parece que no puedo encontrarlo. ¿Alguna idea sobre cómo comenzar a implementar esto? Un punto en la dirección correcta? ¡Gracias!

Respuesta

29

Una opción confeccionada que podría ayudar con esto es twisted.internet.defer.DeferredSemaphore. Esta es la versión asíncrona del semáforo normal (de conteo) que quizás ya conozca si ha realizado una gran cantidad de programación con subprocesos.

Un semáforo (de conteo) es muy parecido a un mutex (un bloqueo). Pero cuando un mutex solo puede adquirirse una vez hasta una versión correspondiente, un semáforo (de conteo) se puede configurar para permitir que un número arbitrario (pero especificado) de adquisiciones tenga éxito antes de que se requieran las versiones correspondientes.

He aquí un ejemplo del uso de DeferredSemaphore a correr diez operaciones asíncronas, pero para correr a lo sumo tres de ellos a la vez:

from twisted.internet.defer import DeferredSemaphore, gatherResults 
from twisted.internet.task import deferLater 
from twisted.internet import reactor 


def async(n): 
    print 'Starting job', n 
    d = deferLater(reactor, n, lambda: None) 
    def cbFinished(ignored): 
     print 'Finishing job', n 
    d.addCallback(cbFinished) 
    return d 


def main(): 
    sem = DeferredSemaphore(3) 

    jobs = [] 
    for i in range(10): 
     jobs.append(sem.run(async, i)) 

    d = gatherResults(jobs) 
    d.addCallback(lambda ignored: reactor.stop()) 
    reactor.run() 


if __name__ == '__main__': 
    main() 

DeferredSemaphore también tiene explícitos acquire y release métodos, pero el método run es tan conveniente es casi siempre lo que quieres. Llama al método acquire, que devuelve Deferred. Para ese primer Deferred, agrega una devolución de llamada que llama a la función que pasó (junto con cualquier argumento posicional o de palabra clave). Si esa función devuelve un Deferred, a ese segundo Deferred se agrega una devolución de llamada que llama al método release.

El caso síncrona se maneja así, llamando inmediatamente release. Los errores también se manejan, al permitir que se propaguen, pero asegurándose de que se hace necesario release para dejar el DeferredSemaphore en un estado consistente. El resultado de la función pasó a run (o el resultado de la Deferred devuelve) se convierte en el resultado de la Deferred devuelto por run.

Otro posible enfoque podría basarse en DeferredQueue y cooperate. DeferredQueue es principalmente como una cola normal, pero su método get devuelve Deferred. Si no hay elementos en la cola en el momento de la llamada, el Deferred no se disparará hasta que se agregue un elemento.

He aquí un ejemplo:

from random import randrange 

from twisted.internet.defer import DeferredQueue 
from twisted.internet.task import deferLater, cooperate 
from twisted.internet import reactor 


def async(n): 
    print 'Starting job', n 
    d = deferLater(reactor, n, lambda: None) 
    def cbFinished(ignored): 
     print 'Finishing job', n 
    d.addCallback(cbFinished) 
    return d 


def assign(jobs): 
    # Create new jobs to be processed 
    jobs.put(randrange(10)) 
    reactor.callLater(randrange(10), assign, jobs) 


def worker(jobs): 
    while True: 
     yield jobs.get().addCallback(async) 


def main(): 
    jobs = DeferredQueue() 

    for i in range(10): 
     jobs.put(i) 

    assign(jobs) 

    for i in range(3): 
     cooperate(worker(jobs)) 

    reactor.run() 


if __name__ == '__main__': 
    main() 

Tenga en cuenta que la función del trabajador async es la misma que la del primer ejemplo. Sin embargo, esta vez, también hay una función worker que está tirando explícitamente puestos de trabajo fuera de la DeferredQueue y procesarlos con async (añadiendo async como llamada de retorno a la Deferred devuelto por get). El generador worker es impulsado por cooperate, que lo itera una vez después de cada Deferred produce incendios.El ciclo principal, entonces, inicia tres de estos generadores de trabajadores para que haya tres trabajos en progreso en un momento dado.

Este enfoque implica un poco más código que el enfoque DeferredSemaphore, pero tiene algunas ventajas que pueden ser interesantes. Primero, cooperate devuelve una instancia CooperativeTask que tiene métodos útiles como pause, resume y un par de otros. Además, todos los trabajos asignados al mismo cooperador serán que cooperen entre sí en la programación, para no sobrecargar el bucle de eventos (y esto es lo que le da a la API su nombre). En el lado DeferredQueue, también es posible establecer límites en la cantidad de elementos que están pendientes de procesamiento, por lo que puede evitar sobrecargar completamente el servidor (por ejemplo, si los procesadores de imágenes se atascan y dejan de completar las tareas). Si el código que llama al put maneja la excepción de desbordamiento de cola, puede usar esto como presión para tratar de dejar de aceptar nuevos trabajos (tal vez derivarlos a otro servidor o alertar a un administrador). Hacer cosas similares con DeferredSemaphore es un poco más complicado, ya que no hay forma de limitar cuántos trabajos están esperando para poder adquirir el semáforo.

+0

Genial, realmente aprecio estas ideas. En respuesta a la idea de utilizar un DemostradoSemaphore. Esto sería muy útil si hubiera lotes discretos de trabajos que debían completarse. Si un lote tiene demasiados trabajos por hacer, solo realiza unos pocos trabajos al mismo tiempo y luego, cuando se completan todos los trabajos, se recopila el lote. Esto tiene la desventaja de que no se devuelven resultados hasta que todo el lote termina ¿verdad? Y creo que este inconveniente se aborda mediante el uso de un DeferredQueue ... – agartland

+1

El enfoque con un DeferredQueue y cooperar es inteligente. Realmente me dará más control en el futuro en cuanto a escalar el procesador. Ni siquiera creo que sea necesariamente más complicado. Gracias. – agartland

-2

También puede ser que al igual que el txRDQ (de tamaño variable de Despacho de la cola) me escribió. Google it, está en la colección tx en LaunchPad. Lo siento, no tengo más tiempo para responder, para ir al escenario.

Terry

Cuestiones relacionadas