2011-05-12 14 views
18

gente, estoy confundido a fondo, por lo que es posible que ni siquiera estoy pidiendo cosas correctamente, pero aquí va:Python torcido: iteradores y rendimientos/inlineCallbacks

Tengo una aplicación torcida usando inlineCallbacks. Ahora necesito definir un iterador que significará que se devuelve un generador a la persona que llama. Sin embargo, el iterador no puede ser decorado en línea, ¿puede ser? Si no, entonces cómo lo hago código algo así.

Solo para aclarar: el objetivo es procesar_loop debe llamarse cada, digamos 5 segundos, puede procesar solo UN trozo de, digamos 10, y luego tiene que dejar ir. Sin embargo, para conocer ese fragmento de 10 (almacenado en caché, que es un dict de un dict), necesita llamar a una función que devuelve diferido.

@inlineCallbacks ### can\'t have inlineCallbacks here, right? 
def cacheiter(cached): 
    for cachename,cachevalue in cached.items(): 
     result = yield (call func here which returns deferred) 
     if result is True: 
      for k,v in cachedvalue.items(): 
       yield cachename, k, v 

@inlineCallbacks 
def process_chunk(myiter, num): 
    try: 
     for i in xrange(num): 
      nextval = myiter.next() 
      yield some_processing(nextval) 
     returnValue(False) 
    except StopIteration: 
     returnValue(True) 

@inlineCallbacks 
def process_loop(cached): 
    myiter = cacheiter(cached) 
    result = yield process_chunk(myiter, 10) 
    if not result: 
     print 'More left' 
     reactor.callLater(5, process_loop, cached) 
    else: 
     print 'All done' 
+2

+1 para 'confundir a fondo' y 'twisted' etiqueta :) – Henry

Respuesta

-1

Intente escribir su iterador como DeferredGenerator.

+2

deferredGenerator es sólo una versión anterior de inlineCallbacks; el OP básicamente ya está haciendo esto. – Glyph

1

creo que estamos tratando de hacer esto:

@inlineCallbacks 
def cacheiter(cached): 
    for cachename,cachevalue in cached.items(): 
     result = yield some_deferred() # some deferred you'd like evaluated 
     if result is True: 
      # here you want to return something, so you have to use returnValue 
      # the generator you want to return can be written as a generator expression 
      gen = ((cachename, k, v) for k,v in cachedvalue.items()) 
      returnValue(gen) 

Cuando un genexp no puede expresar lo que estamos tratando de volver puede escribir un cierre:

@inlineCallbacks 
def cacheiter(cached): 
    for cachename,cachevalue in cached.items(): 
     result = yield some_deferred() 
     if result is True: 
      # define the generator, saving the current values of the cache 
      def gen(cachedvalue=cachedvalue, cachename=cachename): 
       for k,v in cachedvalue.items(): 
        yield cachename, k, v 
      returnValue(gen()) # return it 
+0

No es incorrecto en este caso, pero debe tener cuidado al aconsejar a las personas que escriban cierres integrados en bucles 'for', especialmente en generadores. Si el 'cacheiter' externo realmente fuera a continuar una iteración adicional, 'cachedvalue' cambiaría en 'gen'. – Glyph

+0

Glifo tiene razón, se ha perdido el bucle for externo, lo que puede ocasionar problemas con los cierres.No importa aquí porque la función solo devolverá un solo generador. Lo arreglé de todos modos :-) –

12

Usted' Tiene razón que no puede expresar lo que quiere expresar en cacheiter. El decorador inlineCallbacks no le permitirá tener una función que devuelva un iterador. Si decora una función con ella, el resultado es una función que siempre devuelve Deferred. Para eso es para eso.

Parte de lo que hace esto difícil es que los iteradores no funcionan bien con el código asincrónico. Si hay un diferido involucrado en la producción de los elementos de su iterador, entonces los elementos que salen de su iterador serán diferidos primero.

Usted podría hacer algo como esto para tener en cuenta que:

@inlineCallbacks 
def process_work(): 
    for element_deferred in some_jobs: 
     element = yield element_deferred 
     work_on(element) 

Esto puede funcionar, pero parece particularmente extraño. Dado que los generadores solo pueden ceder el paso a la persona que llama (no, por ejemplo, a la persona que llama), el iterador some_jobs no puede hacer nada al respecto; solo el código léxicamente dentro de process_work puede producir un trampolín provisto de inlineCallbacks para esperar.

Si no le importa este patrón, entonces podríamos obtener imágenes de su código que está siendo escrito algo como:

from twisted.internet.task import deferLater 
from twisted.internet.defer import inlineCallbacks, returnValue 
from twisted.internet import reactor 

class cacheiter(object): 
    def __init__(self, cached): 
     self._cached = iter(cached.items()) 
     self._remaining = [] 

    def __iter__(self): 
     return self 


    @inlineCallbacks 
    def next(self): 
     # First re-fill the list of synchronously-producable values if it is empty 
     if not self._remaining: 
      for name, value in self._cached: 
       # Wait on this Deferred to determine if this cache item should be included 
       if (yield check_condition(name, value)): 
        # If so, put all of its values into the value cache so the next one 
        # can be returned immediately next time this method is called. 
        self._remaining.extend([(name, k, v) for (k, v) in value.items()]) 

     # Now actually give out a value, if there is one. 
     if self._remaining: 
      returnValue(self._remaining.pop()) 

     # Otherwise the entire cache has been visited and the iterator is complete. 
     # Sadly we cannot signal completion with StopIteration, because the iterator 
     # protocol isn't going to add an errback to this Deferred and check for 
     # StopIteration. So signal completion with a simple None value. 
     returnValue(None) 


@inlineCallbacks 
def process_chunk(myiter, num): 
    for i in xrange(num): 
     nextval = yield myiter.next() 
     if nextval is None: 
      # The iterator signaled completion via the special None value. 
      # Processing is complete. 
      returnValue(True) 
     # Otherwise process the value. 
     yield some_processing(nextval) 

    # Indicate there is more processing to be done. 
    returnValue(False) 


def sleep(sec): 
    # Simple helper to delay asynchronously for some number of seconds. 
    return deferLater(reactor, sec, lambda: None) 


@inlineCallbacks 
def process_loop(cached): 
    myiter = cacheiter(cached) 
    while True: 
     # Loop processing 10 items from myiter at a time, until process_chunk signals 
     # there are no values left. 
     result = yield process_chunk(myiter, 10) 
     if result: 
      print 'All done' 
      break 

     print 'More left' 
     # Insert the 5 second delay before starting on the next chunk. 
     yield sleep(5) 

d = process_loop(cached) 

Otro enfoque que podría ser capaz de tomar, sin embargo, es el uso de twisted.internet.task.cooperate. cooperate toma un iterador y lo consume, suponiendo que consumirlo es potencialmente costoso y dividir el trabajo en múltiples iteraciones de reactores. Tomando la definición de cacheiter desde arriba:

from twisted.internet.task import cooperate 

def process_loop(cached): 
    finished = [] 

    def process_one(value): 
     if value is None: 
      finished.append(True) 
     else: 
      return some_processing(value) 

    myiter = cacheiter(cached) 

    while not finished: 
     value_deferred = myiter.next() 
     value_deferred.addCallback(process_one) 
     yield value_deferred 

task = cooperate(process_loop(cached)) 
d = task.whenDone() 
Cuestiones relacionadas