2010-12-03 16 views
6

Tengo, digamos, 1000 observables. Ahora quiero agregar todos los eventos en un nuevo observable que se active OnNext una vez que todos los demás hayan enviado un evento. ¿Cuál es la mejor manera de hacerlo usando Rx?Agregado gran número de observables en nuevo observable

Actualización: Algunos comentarios buenos en el foro de Rx, especialmente por Dave Sexton. Mostró cómo crear un método de extensión Zip que toma múltiples observables: http://social.msdn.microsoft.com/Forums/en-US/rx/thread/daaa84db-b560-4eda-871e-e523098db20c/

+0

¿Son iguales todos los tipos de los 1000 observables? ¿Qué tipo de agregación observable es? –

+0

Todos los 1000 observables son del mismo tipo, el nuevo agregado puede ser un tipo nuevo. P.ej. El evento se convierte en AggregateEvent. – lukebuehler

+0

¿Desea combinar solo sus últimos valores? ES DECIR. si Observable a activa dos eventos y Observable b activa solo uno, ¿desea agregar el primer evento de a, o el último evento de a, con el evento de b? –

Respuesta

2

Hay un MailboxProcessor en F # ... Yo usaría un SynchronizationContext en C# para el mismo propósito. Dame unos minutos y escribiré un ejemplo.

Aparte: Aquí está mi código en F # que hace algo similar ... Será mucho más esfuerzo, pero aún se puede hacer en C# con Rx.

open System.Diagnostics 

let numWorkers = 20 
let asyncDelay = 100 

type MessageForMailbox = 
    | DataMessage of AsyncReplyChannel<unit> 
    | GetSummary of AsyncReplyChannel<unit> 

let main = 
    let actor = 
     MailboxProcessor.Start(fun inbox -> 
     let rec loop acc = 
      async { 
       let! message = inbox.Receive() 
       match message with 
       | DataMessage replyChannel -> replyChannel.Reply(); return! loop acc 
       | GetSummary replyChannel -> replyChannel.Reply(); return! loop acc 
      } 

     loop 0 // seed for acc 
    ) 

    let codeBlocks = [for i in 1..numWorkers -> 
         async { 
          do! Async.Sleep asyncDelay 
          return! actor.PostAndAsyncReply DataMessage 
         } ] 

    while true do 
     printfn "Concurrent started..." 
     let sw = new Stopwatch() 
     sw.Start() 
     codeBlocks |> Async.Parallel |> Async.RunSynchronously |> ignore 
     actor.PostAndReply GetSummary 
     sw.Stop() 
     printfn "Concurrent in %d millisec" sw.ElapsedMilliseconds 
     printfn "efficiency: %d%%" (int64 (asyncDelay * 100)/sw.ElapsedMilliseconds) 

     printfn "Synchronous started..." 
     let sw = new Stopwatch() 
     sw.Start() 
     for codeBlock in codeBlocks do codeBlock |> Async.RunSynchronously |> ignore 
     sw.Stop() 
     printfn "Synchronous in %d millisec" sw.ElapsedMilliseconds 
     printfn "efficiency: %d%%" (int64 (asyncDelay * numWorkers * 100)/sw.ElapsedMilliseconds) 

main 
+0

hmm, ¿quiere decir algo similar al uso de SynchronizationContext.Send() para sincronizar todos los observables creando Eventos? Veré un poco lo que hace tu código F # pero no soy lo suficientemente inteligente como para entenderlo por completo. – lukebuehler

+0

Creo que lo tienes. RunSynchronously implementa ForkJoin con flujos de trabajo asincrónicos. – GregC

+0

+1: nunca antes había visto un buen ejemplo de Mailbox Processor. :) –

Cuestiones relacionadas