2011-05-19 17 views
11

¿Hay alguna manera de tener una secuencia observable para reanudar la ejecución con el siguiente elemento en la secuencia si ocurre un error? De this post parece que necesita especificar una nueva secuencia observable en Catch() para reanudar la ejecución, pero ¿qué sucede si solo necesita continuar procesando con el siguiente elemento de la secuencia? ¿Hay una manera de lograr esto?Manejo de errores en una secuencia observable usando Rx

ACTUALIZACIÓN: El escenario es el siguiente: Tengo un montón de elementos que debo procesar. El procesamiento se compone de varios pasos. Tengo descompuse los pasos en las tareas que me gustaría componer. Seguí las instrucciones para ToObservable() publicado here para convertir por tareas en un observables para la composición. así que básicamente estoy haciendo somethng como tal -

foreach(element in collection) 
{ 
    var result = from aResult in DoAAsync(element).ToObservable() 
     from bResult in DoBAsync(aResult).ToObservable() 
     from cResult in DoCAsync(bResult).ToObservable() 
     select cResult; 
    result.subscribe(register on next and error handlers here) 
} 

o pude algo como esto:

var result = 
     from element in collection.ToObservable() 
     from aResult in DoAAsync(element).ToObservable() 
     from bResult in DoBAsync(aResult).ToObservable() 
     from cResult in DoCAsync(bResult).ToObservable() 
     select cResult; 

¿Cuál es la mejor manera para continuar procesando otros elementos, aunque digamos que el procesamiento de uno de los elementos arroja una excepción. Me gustaría poder registrar el error y avanzar de manera ideal.

Respuesta

1

El contrato entre IObservable y IObserver es OnNext*(OnCompelted|OnError)?, confirmado por todos los operadores, aunque no por la fuente.

Su única opción es volver a suscribirse a la fuente usando Retry, pero si la fuente devuelve el IObservable instancia para cada descripción que no verá cualquier valor nuevo.

¿Podría proporcionar más información sobre su situación? Tal vez hay otra forma de verlo.

Editar: Sobre la base de sus comentarios actualizada, parece que sólo tiene Catch:

var result = 
    from element in collection.ToObservable() 
    from aResult in DoAAsync(element).ToObservable().Log().Catch(Observable.Empty<TA>()) 
    from bResult in DoBAsync(aResult).ToObservable().Log().Catch(Observable.Empty<TB>()) 
    from cResult in DoCAsync(bResult).ToObservable().Log().Catch(Observable.Empty<TC>()) 
    select cResult; 

Esto reemplaza un error con un Empty que no daría lugar a la siguiente secuencia (ya que utiliza SelectMany bajo la . capó

+0

He actualizado la publicación para incluir el escenario que estoy tratando de lograr –

11

Tanto James & Richard hicieron algunos buenos puntos, pero no creo que le han dado el mejor método para resolver su problema.

James sugirió usar .Catch(Observable.Never<Unit>()). Se equivocó cuando dijo que "permitirá ... que la transmisión continúe" porque una vez que tocas una excepción, la transmisión debe finalizar; eso es lo que señaló Richard cuando mencionó el contrato entre observadores y observables.

Además, usar Never de esta manera hará que sus observables nunca se completen.

La respuesta corta es que .Catch(Observable.Empty<Unit>()) es la forma correcta de cambiar una secuencia de una que finaliza con un error a una que finaliza con la finalización.

Ha acertado con la idea correcta de usar SelectMany para procesar cada valor de la colección fuente para que pueda detectar cada excepción, pero le quedan un par de problemas.

Está utilizando tareas (TPL) solo para convertir una llamada de función en una observable. Esto obliga a su observable a utilizar subprocesos de grupo de tareas, lo que significa que la instrucción SelectMany probablemente producirá valores en un orden no determinista.

También se ocultan las llamadas reales para procesar sus datos lo que dificulta la refacturación y el mantenimiento.

Creo que es mejor que cree un método de extensión que permita omitir las excepciones. Aquí está:

public static IObservable<R> SelectAndSkipOnException<T, R>(
    this IObservable<T> source, Func<T, R> selector) 
{ 
    return 
     source 
      .Select(t => 
       Observable.Start(() => selector(t)).Catch(Observable.Empty<R>())) 
      .Merge(); 
} 

Con este método se puede ahora simplemente hacer esto:

var result = 
    collection.ToObservable() 
     .SelectAndSkipOnException(t => 
     { 
      var a = DoA(t); 
      var b = DoB(a); 
      var c = DoC(b); 
      return c; 
     }); 

Este código es mucho más simple, pero oculta la excepción (s). Si deseas aferrarte a las excepciones mientras dejas que tu secuencia continúe, entonces necesitas hacer algo de funkiness extra. Agregar un par de sobrecargas al método de extensión Materialize funciona para mantener los errores.

public static IObservable<Notification<R>> Materialize<T, R>(
    this IObservable<T> source, Func<T, R> selector) 
{ 
    return source.Select(t => Notification.CreateOnNext(t)).Materialize(selector); 
} 

public static IObservable<Notification<R>> Materialize<T, R>(
    this IObservable<Notification<T>> source, Func<T, R> selector) 
{ 
    Func<Notification<T>, Notification<R>> f = nt => 
    { 
     if (nt.Kind == NotificationKind.OnNext) 
     { 
      try 
      { 
       return Notification.CreateOnNext<R>(selector(nt.Value)); 
      } 
      catch (Exception ex) 
      { 
       ex.Data["Value"] = nt.Value; 
       ex.Data["Selector"] = selector; 
       return Notification.CreateOnError<R>(ex); 
      } 
     } 
     else 
     { 
      if (nt.Kind == NotificationKind.OnError) 
      { 
       return Notification.CreateOnError<R>(nt.Exception); 
      } 
      else 
      { 
       return Notification.CreateOnCompleted<R>(); 
      } 
     } 
    }; 
    return source.Select(nt => f(nt)); 
} 

Estos métodos le permiten escribir esto:

var result = 
    collection 
     .ToObservable() 
     .Materialize(t => 
     { 
      var a = DoA(t); 
      var b = DoB(a); 
      var c = DoC(b); 
      return c; 
     }) 
     .Do(nt => 
     { 
      if (nt.Kind == NotificationKind.OnError) 
      { 
       /* Process the error in `nt.Exception` */ 
      } 
     }) 
     .Where(nt => nt.Kind != NotificationKind.OnError) 
     .Dematerialize(); 

Incluso puede encadenar estos métodos Materialize y utilizar ex.Data["Value"] & ex.Data["Selector"] para obtener la función de valor y el selector que arrojó el error fuera.

Espero que esto ayude.

+0

Me he encontrado con un problema similar al intentar trabajar con observables de observables. Cuando un observable interno arroja un OnError, el observador externo que lo observa se mueve a OnError también, haciendo que todo se apague. He intentado con tu solución de atrapar la excepción y lanzar OnCompleted, pero esto produce exactamente el mismo comportamiento que OnCompleted y OnError que hacen que la suscripción se cierre. – letstango

Cuestiones relacionadas