2012-04-24 14 views
18

Así que en los tristes días de C# 4.0, creé la siguiente clase "WorkflowExecutor" que permitía flujos de trabajo asincrónicos en el hilo de la GUI pirateando las continuaciones de "rendimiento de retorno" de IEnumerable para esperar observables. Entonces, el siguiente código, en button1Click, simplemente inicia un flujo de trabajo simple que actualiza el texto, espera que haga clic en el botón 2, y loops después de 1 segundo.esperando en un observable

public sealed partial class Form1 : Form { 
    readonly Subject<Unit> _button2Subject = new Subject<Unit>(); 
    readonly WorkflowExecutor _workflowExecutor = new WorkflowExecutor(); 

    public Form1() { 
     InitializeComponent(); 
    } 

    IEnumerable<IObservable<Unit>> CreateAsyncHandler() { 
     Text = "Initializing"; 
     var scheduler = new ControlScheduler(this); 
     while (true) { 
      yield return scheduler.WaitTimer(1000); 
      Text = "Waiting for Click"; 
      yield return _button2Subject; 
      Text = "Click Detected!"; 
      yield return scheduler.WaitTimer(1000); 
      Text = "Restarting"; 
     } 
    } 

    void button1_Click(object sender, EventArgs e) { 
     _workflowExecutor.Run(CreateAsyncHandler()); 
    } 

    void button2_Click(object sender, EventArgs e) { 
     _button2Subject.OnNext(Unit.Default); 
    } 

    void button3_Click(object sender, EventArgs e) { 
     _workflowExecutor.Stop(); 
    } 
} 

public static class TimerHelper { 
    public static IObservable<Unit> WaitTimer(this IScheduler scheduler, double ms) { 
     return Observable.Timer(TimeSpan.FromMilliseconds(ms), scheduler).Select(_ => Unit.Default); 
    } 
} 

public sealed class WorkflowExecutor { 
    IEnumerator<IObservable<Unit>> _observables; 
    IDisposable _subscription; 

    public void Run(IEnumerable<IObservable<Unit>> actions) { 
     _observables = (actions ?? new IObservable<Unit>[0]).GetEnumerator(); 
     Continue(); 
    } 

    void Continue() { 
     if (_subscription != null) { 
      _subscription.Dispose(); 
     } 
     if (_observables.MoveNext()) { 
      _subscription = _observables.Current.Subscribe(_ => Continue()); 
     } 
    } 

    public void Stop() { 
     Run(null); 
    } 
} 

La parte inteligente de la idea, usando continuaciones "rendimiento" para hacer el trabajo asíncrono, fue tomada desde la idea de Daniel AsyncIOPipe Earwicker: http://smellegantcode.wordpress.com/2008/12/05/asynchronous-sockets-with-yield-return-of-lambdas/, entonces añadí el marco reactiva en la parte superior de la misma.

Ahora estoy teniendo problemas para volver a escribir esto utilizando la función de sincronización en C# 5.0, pero parece que debería ser algo sencillo. Cuando convierto los observables en tareas, solo se ejecutan una vez y el ciclo while se bloquea la segunda vez. Cualquier ayuda para arreglar eso sería genial.

Dicho todo esto/preguntado, ¿qué me proporciona el mecanismo de asincronización/espera que el WorkflowExecutor no lo hace? ¿Hay algo que pueda hacer con async/await que no puedo hacer (dado una cantidad similar de código) con el WorkflowExecutor?

+0

¿Cómo es exactamente lo que hacen que la conversión a 'Task's? ¿Cómo se ve el aspecto? – svick

+1

Y 'await' tiene muchas ventajas sobre este tipo de asincronía, pero una de las grandes diferencias es que regresan de los esperables. P.ej. 'string s = await client.DownloadStringAsync (url);'. – svick

Respuesta

24

Como habrás notado, la tarea es algo que se usa una sola vez, a diferencia de la "secuencia de eventos" de Observable. Una buena manera de pensar de esto (en mi humilde opinión) es el gráfico de 2x2 en la Rx team's post about 2.0 Beta:

2x2 chart for task vs observable

Dependiendo de las circunstancias (de una sola vez frente a 'flujo' de los acontecimientos), manteniendo observable podría tener más sentido.

Si puede saltar hasta el Reactive 2.0 Beta, entonces puede 'esperar' observables con eso. Por ejemplo, mi propio intento de una versión 'asíncrono/Await' (aproximado) de su código sería:

public sealed partial class Form1 : Form 
{ 
    readonly Subject<Unit> _button2Subject = new Subject<Unit>(); 

    private bool shouldRun = false; 

    public Form1() 
    { 
     InitializeComponent(); 
    } 

    async Task CreateAsyncHandler() 
    { 
     Text = "Initializing"; 
     while (shouldRun) 
     { 
      await Task.Delay(1000); 
      Text = "Waiting for Click"; 
      await _button2Subject.FirstAsync(); 
      Text = "Click Detected!"; 
      await Task.Delay(1000); 
      Text = "Restarting"; 
     } 
    } 

    async void button1_Click(object sender, EventArgs e) 
    { 
     shouldRun = true; 
     await CreateAsyncHandler(); 
    } 

    void button2_Click(object sender, EventArgs e) 
    { 
     _button2Subject.OnNext(Unit.Default); 
    } 

    void button3_Click(object sender, EventArgs e) 
    { 
     shouldRun = false; 
    } 
} 
+0

'Tarea' es un uso de una sola vez, pero puede' esperar' cosas que no son 'Tareas's.Por lo tanto, debería ser posible crear un awaitable que pueda representar todo el 'IObservable ', no solo un elemento. – svick

+0

Eso es lo que hice en el ejemplo de código. Con Rx 2.0, puede esperar observables. El comportamiento predeterminado es devolver el último elemento del observable, por lo que lo hace FirstAsync –

22

Como se mencionó James, se puede esperar una secuencia de > IObservable < T a partir de Rx v2.0 Beta . El comportamiento es devolver el último elemento (antes de OnCompleted) o lanzar el OnError que se observó. Si la secuencia no contiene elementos, obtendrá una excepción InvalidOperationException.

Aviso de usar esto, usted puede obtener todos los otros comportamientos deseados:

  • obtener el primer elemento de espera xs.FirstAsync()
  • Asegúrese de que sólo hay un único valor de la espera xs.SingleAsync()
  • Cuando estás bien con una secuencia vacía, esperan xs.DefaultIfEmpty()
  • para obtener todos los elementos, esperan xs.ToArray() o esperan xs.ToList()

Se puede hacer incluso las cosas más extravagantes, como computar el resultado de una agregación menos de observar valores intermedios mediante el uso de Do y exploración:

var xs = Observable.Range(0, 10, Scheduler.Default); 

var res = xs.Scan((x, y) => x + y) 
      .Do(x => { Console.WriteLine("Busy. Current sum is {0}", x); }); 

Console.WriteLine("Done! The sum is {0}", await res); 
+1

Esta es la información que estaba buscando después de sorprenderme al ver en un proyecto reciente que esperaba un IObservable perfectamente construido. Gracias por compartir. – jpierson

Cuestiones relacionadas