Usted' Tiene razón que no puede expresar lo que quiere expresar en cacheiter
. El decorador inlineCallbacks
no le permitirá tener una función que devuelva un iterador. Si decora una función con ella, el resultado es una función que siempre devuelve Deferred
. Para eso es para eso.
Parte de lo que hace esto difícil es que los iteradores no funcionan bien con el código asincrónico. Si hay un diferido involucrado en la producción de los elementos de su iterador, entonces los elementos que salen de su iterador serán diferidos primero.
Usted podría hacer algo como esto para tener en cuenta que:
@inlineCallbacks
def process_work():
for element_deferred in some_jobs:
element = yield element_deferred
work_on(element)
Esto puede funcionar, pero parece particularmente extraño. Dado que los generadores solo pueden ceder el paso a la persona que llama (no, por ejemplo, a la persona que llama), el iterador some_jobs
no puede hacer nada al respecto; solo el código léxicamente dentro de process_work
puede producir un trampolín provisto de inlineCallbacks
para esperar.
Si no le importa este patrón, entonces podríamos obtener imágenes de su código que está siendo escrito algo como:
from twisted.internet.task import deferLater
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet import reactor
class cacheiter(object):
def __init__(self, cached):
self._cached = iter(cached.items())
self._remaining = []
def __iter__(self):
return self
@inlineCallbacks
def next(self):
# First re-fill the list of synchronously-producable values if it is empty
if not self._remaining:
for name, value in self._cached:
# Wait on this Deferred to determine if this cache item should be included
if (yield check_condition(name, value)):
# If so, put all of its values into the value cache so the next one
# can be returned immediately next time this method is called.
self._remaining.extend([(name, k, v) for (k, v) in value.items()])
# Now actually give out a value, if there is one.
if self._remaining:
returnValue(self._remaining.pop())
# Otherwise the entire cache has been visited and the iterator is complete.
# Sadly we cannot signal completion with StopIteration, because the iterator
# protocol isn't going to add an errback to this Deferred and check for
# StopIteration. So signal completion with a simple None value.
returnValue(None)
@inlineCallbacks
def process_chunk(myiter, num):
for i in xrange(num):
nextval = yield myiter.next()
if nextval is None:
# The iterator signaled completion via the special None value.
# Processing is complete.
returnValue(True)
# Otherwise process the value.
yield some_processing(nextval)
# Indicate there is more processing to be done.
returnValue(False)
def sleep(sec):
# Simple helper to delay asynchronously for some number of seconds.
return deferLater(reactor, sec, lambda: None)
@inlineCallbacks
def process_loop(cached):
myiter = cacheiter(cached)
while True:
# Loop processing 10 items from myiter at a time, until process_chunk signals
# there are no values left.
result = yield process_chunk(myiter, 10)
if result:
print 'All done'
break
print 'More left'
# Insert the 5 second delay before starting on the next chunk.
yield sleep(5)
d = process_loop(cached)
Otro enfoque que podría ser capaz de tomar, sin embargo, es el uso de twisted.internet.task.cooperate
. cooperate
toma un iterador y lo consume, suponiendo que consumirlo es potencialmente costoso y dividir el trabajo en múltiples iteraciones de reactores. Tomando la definición de cacheiter
desde arriba:
from twisted.internet.task import cooperate
def process_loop(cached):
finished = []
def process_one(value):
if value is None:
finished.append(True)
else:
return some_processing(value)
myiter = cacheiter(cached)
while not finished:
value_deferred = myiter.next()
value_deferred.addCallback(process_one)
yield value_deferred
task = cooperate(process_loop(cached))
d = task.whenDone()
+1 para 'confundir a fondo' y 'twisted' etiqueta :) – Henry