Una opción confeccionada que podría ayudar con esto es twisted.internet.defer.DeferredSemaphore
. Esta es la versión asíncrona del semáforo normal (de conteo) que quizás ya conozca si ha realizado una gran cantidad de programación con subprocesos.
Un semáforo (de conteo) es muy parecido a un mutex (un bloqueo). Pero cuando un mutex solo puede adquirirse una vez hasta una versión correspondiente, un semáforo (de conteo) se puede configurar para permitir que un número arbitrario (pero especificado) de adquisiciones tenga éxito antes de que se requieran las versiones correspondientes.
He aquí un ejemplo del uso de DeferredSemaphore
a correr diez operaciones asíncronas, pero para correr a lo sumo tres de ellos a la vez:
from twisted.internet.defer import DeferredSemaphore, gatherResults
from twisted.internet.task import deferLater
from twisted.internet import reactor
def async(n):
print 'Starting job', n
d = deferLater(reactor, n, lambda: None)
def cbFinished(ignored):
print 'Finishing job', n
d.addCallback(cbFinished)
return d
def main():
sem = DeferredSemaphore(3)
jobs = []
for i in range(10):
jobs.append(sem.run(async, i))
d = gatherResults(jobs)
d.addCallback(lambda ignored: reactor.stop())
reactor.run()
if __name__ == '__main__':
main()
DeferredSemaphore
también tiene explícitos acquire
y release
métodos, pero el método run
es tan conveniente es casi siempre lo que quieres. Llama al método acquire
, que devuelve Deferred
. Para ese primer Deferred
, agrega una devolución de llamada que llama a la función que pasó (junto con cualquier argumento posicional o de palabra clave). Si esa función devuelve un Deferred
, a ese segundo Deferred
se agrega una devolución de llamada que llama al método release
.
El caso síncrona se maneja así, llamando inmediatamente release
. Los errores también se manejan, al permitir que se propaguen, pero asegurándose de que se hace necesario release
para dejar el DeferredSemaphore
en un estado consistente. El resultado de la función pasó a run
(o el resultado de la Deferred
devuelve) se convierte en el resultado de la Deferred
devuelto por run
.
Otro posible enfoque podría basarse en DeferredQueue
y cooperate
. DeferredQueue
es principalmente como una cola normal, pero su método get
devuelve Deferred
. Si no hay elementos en la cola en el momento de la llamada, el Deferred
no se disparará hasta que se agregue un elemento.
He aquí un ejemplo:
from random import randrange
from twisted.internet.defer import DeferredQueue
from twisted.internet.task import deferLater, cooperate
from twisted.internet import reactor
def async(n):
print 'Starting job', n
d = deferLater(reactor, n, lambda: None)
def cbFinished(ignored):
print 'Finishing job', n
d.addCallback(cbFinished)
return d
def assign(jobs):
# Create new jobs to be processed
jobs.put(randrange(10))
reactor.callLater(randrange(10), assign, jobs)
def worker(jobs):
while True:
yield jobs.get().addCallback(async)
def main():
jobs = DeferredQueue()
for i in range(10):
jobs.put(i)
assign(jobs)
for i in range(3):
cooperate(worker(jobs))
reactor.run()
if __name__ == '__main__':
main()
Tenga en cuenta que la función del trabajador async
es la misma que la del primer ejemplo. Sin embargo, esta vez, también hay una función worker
que está tirando explícitamente puestos de trabajo fuera de la DeferredQueue
y procesarlos con async
(añadiendo async
como llamada de retorno a la Deferred
devuelto por get
). El generador worker
es impulsado por cooperate
, que lo itera una vez después de cada Deferred
produce incendios.El ciclo principal, entonces, inicia tres de estos generadores de trabajadores para que haya tres trabajos en progreso en un momento dado.
Este enfoque implica un poco más código que el enfoque DeferredSemaphore
, pero tiene algunas ventajas que pueden ser interesantes. Primero, cooperate
devuelve una instancia CooperativeTask
que tiene métodos útiles como pause
, resume
y un par de otros. Además, todos los trabajos asignados al mismo cooperador serán que cooperen entre sí en la programación, para no sobrecargar el bucle de eventos (y esto es lo que le da a la API su nombre). En el lado DeferredQueue
, también es posible establecer límites en la cantidad de elementos que están pendientes de procesamiento, por lo que puede evitar sobrecargar completamente el servidor (por ejemplo, si los procesadores de imágenes se atascan y dejan de completar las tareas). Si el código que llama al put
maneja la excepción de desbordamiento de cola, puede usar esto como presión para tratar de dejar de aceptar nuevos trabajos (tal vez derivarlos a otro servidor o alertar a un administrador). Hacer cosas similares con DeferredSemaphore
es un poco más complicado, ya que no hay forma de limitar cuántos trabajos están esperando para poder adquirir el semáforo.
Genial, realmente aprecio estas ideas. En respuesta a la idea de utilizar un DemostradoSemaphore. Esto sería muy útil si hubiera lotes discretos de trabajos que debían completarse. Si un lote tiene demasiados trabajos por hacer, solo realiza unos pocos trabajos al mismo tiempo y luego, cuando se completan todos los trabajos, se recopila el lote. Esto tiene la desventaja de que no se devuelven resultados hasta que todo el lote termina ¿verdad? Y creo que este inconveniente se aborda mediante el uso de un DeferredQueue ... – agartland
El enfoque con un DeferredQueue y cooperar es inteligente. Realmente me dará más control en el futuro en cuanto a escalar el procesador. Ni siquiera creo que sea necesariamente más complicado. Gracias. – agartland