2012-07-14 11 views
5

que estaba esperando para ver una operación de intercalación no determinista para las fuentes, con una firma de tipo comoNondeterministically intercalación del conducto

interleave :: WhateverIOMonadClassItWouldWant m => [(k, Source m a)] -> Source m (k, a) 

El caso de uso es que tengo una aplicación P2P que mantiene conexiones abiertas a muchos nodos de la la red, y en su mayor parte es simplemente esperar mensajes de cualquiera de ellos. Cuando llega un mensaje, no importa de dónde proviene, pero necesita procesar el mensaje lo antes posible. En teoría, este tipo de aplicación (al menos cuando se usa para fuentes tipo socket) podría eludir por completo el administrador IO de GHC y ejecutar el select/epoll/etc. llama directamente, pero no me importa cómo se implementa, siempre y cuando funcione.

¿Algo como esto es posible con el conducto? Un enfoque menos general pero probablemente más factible podría ser escribir una función [(k, Socket)] -> Source m (k, ByteString) que maneje la recepción en todos los sockets por usted.

Me di cuenta de las operaciones ResumableSource en el conducto, pero todas parecen querer conocer un particular Sink, que se siente como una fuga de abstracción, al menos para esta operación.

+0

¿Cuán rápido más rápido necesita usted que la solución simple usando forkIO y una Fuente que envuelve un Chan/TChan? –

+0

No necesariamente necesito un alto rendimiento (preveo conectarme con alrededor de 1000 de estos pares como máximo) pero también estoy interesado en estas abstracciones tipo iteración y me preguntaba si era posible proporcionar este tipo de operación. con conductos. Parece un caso de uso razonablemente común para ciertos tipos de protocolos de red. – copumpkin

+0

Creo que el administrador de eventos es lo más parecido posible sin usar 'select' o' epoll' directamente oa través de algún paquete. No creo que exista una interfaz de sondeo expuesta desde el administrador de eventos (no sería mucho un administrador de eventos si lo hiciera) por lo que terminaría con una cantidad de conversaciones y conversaciones con la mayoría de los diseños de todos modos. Lo que haría es comenzar con una 'Fuente' envolviendo un' TChan' acotado, bifurcando un hilo por conexión inicialmente. Vaya al administrador de eventos si es necesario para el rendimiento.Si todavía tienes problemas, siempre hay FFI. –

Respuesta

5

El paquete stm-conduit proporciona el mergeSources que realiza algo similar, aunque no idéntico, a lo que está buscando. Probablemente sea un buen lugar para comenzar.

+1

Acepte la respuesta de Michael. Hace lo que describí. –

+0

¿Podría alguien explicar en detalle por qué el tipo de 'mergeSources' restringe el argumento de la mónada del' Source's con el que trata ser de la forma 'ResourceT m' en lugar de cualquier instancia' MonadResource'? –

+1

No puedo ver un motivo, vale la pena presentar un problema con el responsable. Es probable que lo arregles tú mismo cuando llamas a 'mergeSources' usando' hoist liftResourceT'. –

3

Sí, es posible.

Puede sondear un montón de Source s sin bloquear al bifurcar hilos para sondear donde en cada hilo empareje la Source con un Sink que envía la salida a algún canal de concurrencia:

concur :: (WhateverIOMonadClassItWouldWant m) => TChan a -> Sink a m r 

... y luego se define una Source que lee de ese canal:

synchronize :: (WhateverIOMonadClassItWouldWant m) => TChan a -> Source a m r 

Tenga en cuenta que esto no sería diferente que sólo se bifurcan los hilos para sondear las tomas de sí mismos, pero Sería útil para otros usuarios de conduit que quisieran sondear otras cosas además de los sockets que usan Source porque definieron porque es más general.

Si combina estas capacidades en una sola función, entonces la API global de la convocatoria sería algo como:

poll :: (WhateverIOMonadClassItWouldWant m) => [Source a m r] -> m (Source a m r) 

... pero todavía se puede lanzar en esos k s si lo desea.

Cuestiones relacionadas