2012-02-08 9 views
5

A veces quiero ejecutar una cantidad máxima de acciones IO en paralelo a la vez para actividad de red, etc. Arremetí una pequeña función de subproceso concurrente que funciona bien con https://gist.github.com/810920, pero esto no es realmente un grupo como todas las acciones IO debe terminar antes de que otros puedan comenzar.¿Cómo creo un grupo de subprocesos?

El tipo de lo que estoy buscando sería algo así como:

runPool :: Int -> [IO a] -> IO [a] 

y deben ser capaces de operar en las listas finitos e infinitos.

Parece que el paquete de tuberías podría lograr esto bastante bien, pero creo que probablemente haya una solución similar a la esencia que he provisto usando simplemente mvars, etc., de la plataforma haskell.

¿Alguien ha encontrado una solución idiomática sin grandes dependencias?

Respuesta

7

Se necesita un grupo de subprocesos, si quieres algo más corto, se puede conseguir la inspiración de Control.ThreadPool (del paquete de control de motor que también proporcionan funciones más generales), por ejemplo threadPoolIO es simplemente:

threadPoolIO :: Int -> (a -> IO b) -> IO (Chan a, Chan b) 
threadPoolIO nr mutator = do 
    input <- newChan 
    output <- newChan 
    forM_ [1..nr] $ 
     \_ -> forkIO (forever $ do 
      i <- readChan input 
      o <- mutator i 
      writeChan output o) 
    return (input, output) 

Utiliza dos Chan para la comunicación con el exterior, pero eso es generalmente lo que quieres, realmente ayuda a escribir código que no se estropea.

Si es absolutamente desea envolverlo en función de su tipo se puede encapsular la comunicación también:

runPool :: Int -> [IO a] -> IO [a] 
runPool n as = do 
    (input, output) <- threadPoolIO n (id) 
    forM_ as $ writeChan input 
    sequence (repeat (length as) $ readChan output) 

Esto no va a mantener el orden de las acciones, es que un problema (es fácil para corregir transmitiendo el índice de la acción o simplemente usando una matriz en lugar de almacenar las respuestas)?

Nota: los n hilos se mantendrán vivos para siempre con esta versión simplista, agregando una acción devuelta "killAll" a threadPoolIO resolvería este problema fácilmente si tiene la intención de crear y destruir varios de esos conjuntos en una aplicación de larga ejecución (si no, dado el peso de los hilos en Haskell, probablemente no valga la pena la molestia). Tenga en cuenta que esta función solo funciona en listas finitas, porque IO es normalmente estricto, por lo que no puede comenzar a procesar elementos de IO [a] antes de que se genere toda la lista, si realmente desea que tenga que usar IO lazy con inseguroInterleaveIO (tal vez no sea la mejor idea) o cambie completamente su modelo y use algo así como conductos para transmitir sus resultados.

+1

Si no está casado con su tipo de plataforma de ejecución, threadPoolIO es un poco más robusto: puede volver a utilizarlo fácilmente en varios lugares en su programa una vez creado, puede controlar dividir una lista infinita y alimentarlo y leer el respuesta por fragmentos y así sucesivamente ... – Jedai

+0

'threadPoolIO' se ve bien. Echaré un vistazo al código para ver cómo se implementa esto, ya que estoy bastante interesado en la mejor forma de crear un grupo de hilos y saber qué versión de Hackage es la preferida de la comunidad. –

Cuestiones relacionadas