2012-02-06 8 views
10

Tengo algo de código usando Rx, llamado desde varios subprocesos que hace:evitando la superposición OnNext llama en Rx cuando se utiliza SubscribeOn (Scheduler.TaskPool)

subject.OnNext(value); // where subject is Subject<T> 

Quiero que los valores que se procesan en el fondo, por lo mi suscripción es

subscription = subject.ObserveOn(Scheduler.TaskPool).Subscribe(value => 
{ 
    // use value 
}); 

realmente no importa lo que los hilos manejan los valores que salen de lo observable, siempre y cuando el trabajo se pone en el TaskPool y no bloquea el flujo actual. Sin embargo, mi uso de 'valor' dentro de mi delegado OnNext no es seguro para subprocesos. Por el momento, si un gran número de valores pasan por el Observable, estoy superponiendo llamadas a mi controlador OnNext.

Podría simplemente agregar un bloqueo a mi delegado de OnNext, pero eso no se siente como la forma de hacer Rx. ¿Cuál es la mejor manera de asegurarme de que solo tengo una llamada a mi controlador OnNext a la vez, cuando tengo varios hilos llamando al subject.OnNext(value);?

Respuesta

10

De Using Subjects en MSDN

Por defecto, los sujetos no realiza ninguna sincronización a través de hilos. [...] Sin embargo, si desea sincronizar las llamadas salientes a los observadores utilizando un programador, puede usar el método Sincronizar para hacerlo.

Así que, como Brandon dice en los comentarios, debe sincronizar el tema y entregarlo a los hilos del productor. p.ej.

var syncSubject = Subject.Synchronize(subject); 

// syncSubject.OnNext(value) can be used from multiple threads 

subscription = syncSubject.ObserveOn(TaskPoolScheduler.Default).Subscribe(value => 
{ 
    // use value 
}); 
+2

La llamada a 'Sincronizar' debe ser anterior a la llamada a' ObserveOn'; de lo contrario, usted está violando el contrato de concurrencia para 'ObserveOn'. Pero en realidad, si el caso de uso comparte un tema entre varios hilos, la mejor solución es sincronizar el sujeto en lugar de sincronizar los suscriptores con el sujeto: 'var syncSubject = Subject.Synchronize (syncSubject);' Now hand 'syncSubject' out a los hilos del productor y pueden llamar a 'syncSubject.OnNext()' sin causar problemas con los suscriptores del tema original. – Brandon

+0

@Brandon var syncSubject = Subject.Synchronize (syncSubject); ... estás usando una variable antes de asignarla, ¿puedes aclarar esto? – Beachwalker

+0

@Beachwalker eso es un error tipográfico.Debe ser 'sujeto' pasado como argumento para Sincronizar como se muestra en la respuesta – Brandon

-3

Puede intentar implementar su propio IObserver que aplicará un bloqueo en su método OnNext. Simplemente un simple decorador: aplique un candado, llame a OnNext interno, quite el candado.

Luego puede implementar un método de extensión en IObservable, algo así como .AsThreadSafe().

+1

No es la mejor solución, pero no es necesario rechazarla desde mi punto de vista porque es una solución válida. Así que +1 de mí para equilibrar esto un poco con los votos abajo aquí. – Beachwalker

5

Creo que está buscando el método de extensión .Synchronize(). Para obtener mejoras de rendimiento en una versión reciente (finales de 2011), el equipo de Rx relajó las suposiciones sobre la naturaleza secuencial de los productores de secuencias observables. Sin embargo, parece que rompes estas suposiciones (no es algo malo) pero para que Rx vuelva a reproducirse como los usuarios esperarían, debes Sincronizar la secuencia para asegurarte de que sea secuencial nuevamente.

1

Aquí hay un poco más de explicación sobre por qué usar Synchronize (el segundo párrafo). Desde el otro lado, Synchronize puede tomar parte en un interbloqueo si usa activamente el bloqueo de su código, al menos presencié dicha situación.

Cuestiones relacionadas