Tengo algo de código usando Rx, llamado desde varios subprocesos que hace:evitando la superposición OnNext llama en Rx cuando se utiliza SubscribeOn (Scheduler.TaskPool)
subject.OnNext(value); // where subject is Subject<T>
Quiero que los valores que se procesan en el fondo, por lo mi suscripción es
subscription = subject.ObserveOn(Scheduler.TaskPool).Subscribe(value =>
{
// use value
});
realmente no importa lo que los hilos manejan los valores que salen de lo observable, siempre y cuando el trabajo se pone en el TaskPool y no bloquea el flujo actual. Sin embargo, mi uso de 'valor' dentro de mi delegado OnNext no es seguro para subprocesos. Por el momento, si un gran número de valores pasan por el Observable, estoy superponiendo llamadas a mi controlador OnNext.
Podría simplemente agregar un bloqueo a mi delegado de OnNext, pero eso no se siente como la forma de hacer Rx. ¿Cuál es la mejor manera de asegurarme de que solo tengo una llamada a mi controlador OnNext a la vez, cuando tengo varios hilos llamando al subject.OnNext(value);
?
La llamada a 'Sincronizar' debe ser anterior a la llamada a' ObserveOn'; de lo contrario, usted está violando el contrato de concurrencia para 'ObserveOn'. Pero en realidad, si el caso de uso comparte un tema entre varios hilos, la mejor solución es sincronizar el sujeto en lugar de sincronizar los suscriptores con el sujeto: 'var syncSubject = Subject.Synchronize (syncSubject);' Now hand 'syncSubject' out a los hilos del productor y pueden llamar a 'syncSubject.OnNext()' sin causar problemas con los suscriptores del tema original. – Brandon
@Brandon var syncSubject = Subject.Synchronize (syncSubject); ... estás usando una variable antes de asignarla, ¿puedes aclarar esto? – Beachwalker
@Beachwalker eso es un error tipográfico.Debe ser 'sujeto' pasado como argumento para Sincronizar como se muestra en la respuesta – Brandon