7

He tenido algunos problemas con una cadena recursiva de observables.RxJS: Lista recursiva de observables y observador único

Estoy trabajando con RxJS, que se encuentra actualmente en la versión 1.0.10621, y contiene la funcionalidad Rx más básica, junto con Rx para jQuery.

Permítanme introducir un escenario de ejemplo para mi problema: Estoy sondeando el Twitter search API (respuesta JSON) para los tweets/actualizaciones que contienen una determinada palabra clave. La respuesta también incluye un "refresh_url" que debería usarse para generar una solicitud de seguimiento. La respuesta a esa solicitud de seguimiento volverá a contener una nueva actualización_url, etc.

Rx.jQuery me permite hacer que la API de búsqueda de Twitter llame a un evento observable, que produce uno en Siguiente y luego completa. Lo que he intentado hasta ahora es hacer que el controlador de onNext recuerde el refresh_url y lo use en el controlador onCompleted para producir un nuevo observador observable y correspondiente para la próxima solicitud. De esta manera, un par observable + observador sigue al otro indefinidamente.

El problema con este enfoque es:

  1. El observable/observador de seguimiento ya están vivos cuando sus predecesores aún no han sido eliminados.

  2. Tengo que hacer un montón de contabilidad desagradable para mantener una referencia válida para el observador actualmente vivo, de los cuales en realidad puede haber dos. (Uno en onCompleted y el otro en otro lugar en su ciclo de vida) Esta referencia es, por supuesto, necesaria para anular la suscripción/disposición del observador. Una alternativa a la contabilidad sería implementar un efecto secundario por medio de un "boom todavía en ejecución?", Como lo he hecho en mi ejemplo.

código Ejemplo:

  running = true; 
      twitterUrl = "http://search.twitter.com/search.json"; 
      twitterQuery = "?rpp=10&q=" + encodeURIComponent(text); 
      twitterMaxId = 0; //actually twitter ignores its since_id parameter 

      newTweetObserver = function() { 
       return Rx.Observer.create(
         function (tweet) { 
          if (tweet.id > twitterMaxId) { 
           twitterMaxId = tweet.id; 
           displayTweet(tweet); 
          } 
         } 
        ); 
      } 

      createTwitterObserver = function() { 
       twitterObserver = Rx.Observer.create(
         function (response) { 
          if (response.textStatus == "success") { 
           var data = response.data; 
           if (data.error == undefined) { 
            twitterQuery = data.refresh_url; 
            var tweetObservable; 
            tweetObservable = Rx.Observable.fromArray(data.results.reverse()); 
            tweetObservable.subscribe(newTweetObserver()); 
           } 
          } 
         }, 
         function(error) { alert(error); }, 
         function() { 
          //create and listen to new observer that includes a delay 
          if (running) { 
           twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery).delay(3000); 
           twitterObservable.subscribe(createTwitterObserver()); 
          } 
         } 
        ); 
       return twitterObserver; 
      } 
      twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery); 
      twitterObservable.subscribe(createTwitterObserver()); 

no sea engañado por la doble capa de observables/observadores de solicitudes a tweets. Mi ejemplo se refiere principalmente a la primera capa: solicitar datos de Twitter. Si al resolver este problema, la segunda capa (convirtiendo respuestas en tweets) puede convertirse en una con la primera, sería fantástico; Pero creo que eso es completamente diferente. Por ahora.

Erik Meijer me indicó el operador Expandir (vea el ejemplo a continuación) y sugirió Join patterns como alternativa.

var ys = Observable.Expand 
(new[]{0}.ToObservable() // initial sequence 
        , i => (i == 10 ? Observable.Empty<int>() // terminate 
     : new[]{i+1}.ToObservable() // recurse 
) 
); 

ys.ToArray().Select(a => string.Join(",", a)).DumpLive(); 

Se debe poder copiar en LINQPad. Asume observaciones únicas y produce un observador final.

Así que mi pregunta es: ¿Cómo puedo hacer el truco de expansión más bonito en RxJS?

EDIT:
El operador de expansión probablemente puede implementarse como se muestra en this thread. Pero uno necesitaría generators (y solo tengo JS < 1.6).
Desafortunadamente RxJS 2.0.20304-beta no implementa el método Extender.

+2

he llegado a la conclusión de que la solución a este problema es de hecho el [Ampliar el operador] (http://social.msdn.microsoft.com/Forums/da-DK/rx/thread/2746e373- bf43-4381-834c-8cc182704ae9) que aún no está implementado en RxJS hasta la versión 2.0.20304-beta. – derabbink

+1

expand es exactamente lo que necesita. Me doy cuenta de que esta publicación es antigua, pero es posible simplemente trasplantar los operadores que necesita de una versión futura de Rx a su propia versión de Rx. Puede requerir alguna manipulación, pero la base del código no ha cambiado sustancialmente de pre-v1. –

+0

Además, actualizar Rx (cuando sea posible) es una buena decisión. Hay muchas correcciones de errores y mejoras en las últimas versiones. –

Respuesta

4

Así que voy a intentar resolver su problema ligeramente diferente de lo que hizo y tomar algunas libertades que puede resolver más fácilmente.

Así 1 cosa que no dicen es que estás intentando los siguientes pasos

  • conseguir la primera lista pío, que contiene próxima url
  • Una vez lista Tweet recibieron, onNext el observador actual y obtener el siguiente serie de tweets
    • Haz esto indefinidamente

¿O hay una acción del usuario (obtener más/desplazarse hacia abajo). De cualquier manera, es realmente el mismo problema. Sin embargo, puedo estar leyendo tu problema incorrectamente. Aquí está la respuesta para eso.

function getMyTweets(headUrl) { 
    return Rx.Observable.create(function(observer) { 

     innerRequest(headUrl); 
     function innerRequest(url) { 
      var next = ''; 

      // Some magic get ajax function 
      Rx.get(url).subscribe(function(res) { 
       observer.onNext(res); 
       next = res.refresh_url; 
      }, 
      function() { 
       // Some sweet handling code 
       // Perhaps get head? 
      }, 
      function() { 
       innerRequest(next); 
      }); 
     } 
    }); 
} 

Puede que esta no sea la respuesta que estaba solicitando. Si no, lo siento!


Editar: Después de revisar su código, parece que desea tomar los resultados como una matriz y observarlo.

// From the results perform a select then a merge (if ordering does not matter). 
getMyTweets('url') 
    .selectMany(function(data) { 
     return Rx.Observable.fromArray(data.results.reverse()); 
    }); 

// Ensures ordering 
getMyTweets('url') 
    .select(function(data) { 
     return Rx.Observable.fromArray(data.results.reverse()); 
    }) 
    .concat(); 
Cuestiones relacionadas