2011-05-02 7 views
10

Estoy implementando un programa haskell que compara cada línea de un archivo con cada otra línea en el archivo. Que puede ser implementado un solo subproceso de la siguiente manera¿Por qué mi implementación de Mapreduce (haskell del mundo real) usando iteratee IO también falla con "Demasiados archivos abiertos"

distance :: Int -> Int -> Int 
distance a b = (a-b)*(a-b) 

sumOfDistancesOnSmallFile :: FilePath -> IO Int 
sumOfDistancesOnSmallFile path = do 
       fileContents <- readFile path 
       return $ allDistances $ map read $ lines $ fileContents 
       where 
        allDistances (x:xs) = (allDistances xs) + (sum $ map (distance x) xs) 
        allDistances _ = 0 

Esto ejecutará en O (n^2), y tiene que mantener actualizada la lista completa de los números enteros en la memoria todo el tiempo. En mi programa actual, la línea contiene más números, de los cuales construyo un tipo de datos un poco más complejo que Int. Esto me dio errores de memoria en los datos que tengo que procesar.

Por lo tanto, se deben realizar dos mejoras en la solución de rosca única mencionada anteriormente. Primero, acelere el tiempo de ejecución real. Segundo, encuentre una manera de no mantener toda la lista en la memoria a tiempo completo. Sé que esto requiere analizar el archivo completo n veces. Por lo tanto, habrá comparaciones O (n^2) y se analizarán O (n^2) líneas. Esto está bien para mí, ya que prefiero tener un programa lento y exitoso que un programa fallido. Cuando el archivo de entrada es lo suficientemente pequeño, siempre puedo residir en una versión más simple.

Para usar múltiples núcleos de CPU, saqué la implementación de Mapreduce de Real World Haskell (capítulo 24, disponible here).

que modifica la función de fragmentación del libro que, en lugar de dividir el archivo completo en trozos, el retorno tantos trozos como líneas con cada trozo que representa un elemento de

tails . lines . readFile 

porque quiero que el programa también a ser escalable en tamaño de archivo, utilicé inicialmente perezoso IO. Sin embargo, esto falla con "Demasiados archivos abiertos", sobre lo cual pregunté en un previous question (los manejadores de archivos fueron eliminados demasiado tarde por el GC). La versión full IO perezoso se publica allí.

Como la respuesta aceptada explica, strict IO podría resolver el problema. Eso resuelve el problema "Demasiados archivos abiertos" para archivos de 2k líneas, pero falla con "falta de memoria" en un archivo de 50k.

Tenga en cuenta que la primera implementación de rosca simple (sin mapreduce) es capaz de manejar un archivo de 50k.

La solución alternativa, que también me resulta más atractiva, es usar iteratee IO. Esperé que esto solucionara tanto el manejo del archivo como el agotamiento de los recursos de la memoria. Sin embargo, mi implementación aún falla con un error "Demasiados archivos abiertos" en un archivo de línea de 2k.

El iteratee versión IO tiene la misma MapReduce función que en el libro, pero tiene un modificado chunkedFileEnum a dejarlo trabajar con un enumerador .

Así que mi pregunta es; ¿Qué está mal con la siguiente implementación iteratee IO base? ¿Dónde está la holgazanería ?.

import Control.Monad.IO.Class (liftIO) 
import Control.Monad.Trans (MonadIO, liftIO) 
import System.IO 

import qualified Data.Enumerator.List as EL 
import qualified Data.Enumerator.Text as ET 
import Data.Enumerator hiding (map, filter, head, sequence) 

import Data.Text(Text) 
import Data.Text.Read 
import Data.Maybe 

import qualified Data.ByteString.Char8 as Str 
import Control.Exception (bracket,finally) 
import Control.Monad(forM,liftM) 
import Control.Parallel.Strategies 
import Control.Parallel 
import Control.DeepSeq (NFData) 
import Data.Int (Int64) 

--Goal: in a file with n values, calculate the sum of all n*(n-1)/2 squared distances 

--My operation for one value pair 
distance :: Int -> Int -> Int 
distance a b = (a-b)*(a-b) 

combineDistances :: [Int] -> Int 
combineDistances = sum 

--Test file generation 
createTestFile :: Int -> FilePath -> IO() 
createTestFile n path = writeFile path $ unlines $ map show $ take n $ infiniteList 0 1 
     where infiniteList :: Int->Int-> [Int] 
       infiniteList i j = (i + j) : infiniteList j (i+j) 

--Applying my operation simply on a file 
--(Actually does NOT throw an Out of memory on a file generated by createTestFile 50000) 
--But i want to use multiple cores.. 
sumOfDistancesOnSmallFile :: FilePath -> IO Int 
sumOfDistancesOnSmallFile path = do 
        fileContents <- readFile path 
        return $ allDistances $ map read $ lines $ fileContents 
        where 
         allDistances (x:xs) = (allDistances xs) + (sum $ map (distance x) xs) 
         allDistances _ = 0 

--Setting up an enumerator of read values from a text stream 
readerEnumerator :: Monad m =>Integral a => Reader a -> Step a m b -> Iteratee Text m b 
readerEnumerator reader = joinI . (EL.concatMapM transformer) 
          where transformer input = case reader input of 
             Right (val, remainder) -> return [val] 
             Left err -> return [0] 

readEnumerator :: Monad m =>Integral a => Step a m b -> Iteratee Text m b 
readEnumerator = readerEnumerator (signed decimal) 

--The iteratee version of my operation 
distancesFirstToTailIt :: Monad m=> Iteratee Int m Int 
distancesFirstToTailIt = do 
    maybeNum <- EL.head 
    maybe (return 0) distancesOneToManyIt maybeNum 

distancesOneToManyIt :: Monad m=> Int -> Iteratee Int m Int 
distancesOneToManyIt base = do 
    maybeNum <- EL.head 
    maybe (return 0) combineNextDistance maybeNum 
    where combineNextDistance nextNum = do 
       rest <- distancesOneToManyIt base 
       return $ combineDistances [(distance base nextNum),rest] 

--The mapreduce algorithm 
mapReduce :: Strategy b -- evaluation strategy for mapping 
      -> (a -> b) -- map function 
      -> Strategy c -- evaluation strategy for reduction 
      -> ([b] -> c) -- reduce function 
      -> [a]  -- list to map over 
      -> c 
mapReduce mapStrat mapFunc reduceStrat reduceFunc input = 
      mapResult `pseq` reduceResult 
      where mapResult = parMap mapStrat mapFunc input 
       reduceResult = reduceFunc mapResult `using` reduceStrat 

--Applying the iteratee operation using mapreduce 
sumOfDistancesOnFileWithIt :: FilePath -> IO Int 
sumOfDistancesOnFileWithIt path = chunkedFileEnum chunkByLinesTails (distancesUsingMapReduceIt) path 

distancesUsingMapReduceIt :: [Enumerator Text IO Int] -> IO Int 
distancesUsingMapReduceIt = mapReduce rpar (runEnumeratorAsMapFunc) 
             rpar (sumValuesAsReduceFunc) 
          where runEnumeratorAsMapFunc :: Enumerator Text IO Int -> IO Int 
            runEnumeratorAsMapFunc = (\source->run_ (source $$ readEnumerator $$ distancesFirstToTailIt)) 
            sumValuesAsReduceFunc :: [IO Int] -> IO Int 
            sumValuesAsReduceFunc = liftM sum . sequence 


--Working with (file)chunk enumerators: 
data ChunkSpec = CS{ 
    chunkOffset :: !Int 
    , chunkLength :: !Int 
    } deriving (Eq,Show) 

chunkedFileEnum :: (NFData (a)) => MonadIO m => 
       (FilePath-> IO [ChunkSpec]) 
      -> ([Enumerator Text m b]->IO a) 
      -> FilePath 
      -> IO a 
chunkedFileEnum chunkCreator funcOnChunks path = do 
    (chunks, handles)<- chunkedEnum chunkCreator path 
    r <- funcOnChunks chunks 
    (rdeepseq r `seq` (return r)) `finally` mapM_ hClose handles 

chunkedEnum :: MonadIO m=> 
       (FilePath -> IO [ChunkSpec]) 
      -> FilePath 
      -> IO ([Enumerator Text m b], [Handle]) 
chunkedEnum chunkCreator path = do 
    chunks <- chunkCreator path 
    liftM unzip . forM chunks $ \spec -> do 
     h <- openFile path ReadMode 
     hSeek h AbsoluteSeek (fromIntegral (chunkOffset spec)) 
     let chunk = ET.enumHandle h --Note:chunklength not taken into account, so just to EOF 
     return (chunk,h) 

-- returns set of chunks representing tails . lines . readFile 
chunkByLinesTails :: FilePath -> IO[ChunkSpec] 
chunkByLinesTails path = do 
    bracket (openFile path ReadMode) hClose $ \h-> do 
     totalSize <- fromIntegral `liftM` hFileSize h 
     let chunkSize = 1 
      findChunks offset = do 
      let newOffset = offset + chunkSize 
      hSeek h AbsoluteSeek (fromIntegral newOffset) 
      let findNewline lineSeekOffset = do 
       eof <- hIsEOF h 
       if eof 
        then return [CS offset (totalSize - offset)] 
        else do 
         bytes <- Str.hGet h 256 
         case Str.elemIndex '\n' bytes of 
          Just n -> do 
           nextChunks <- findChunks (lineSeekOffset + n + 1) 
           return (CS offset (totalSize-offset):nextChunks) 
          Nothing -> findNewline (lineSeekOffset + Str.length bytes) 
      findNewline newOffset 
     findChunks 0 

Por cierto, estoy corriendo HaskellPlatform 2011.2.0 en Mac OS X 10.6.7 (Snow Leopard)
con los siguientes paquetes:
ByteString 0.9.1.10
paralelo 3.1.0.1
empadronador 0.4.8, con un manual here

+4

usted tiene demasiado paralelismo. Con un tamaño de 1 y 2k, está abriendo el archivo 2k veces. Y cuanto más paralelismo, más memoria usas también. Realmente no creo que un problema como este, que requiere cruzar una estructura consigo mismo, sea adecuado para la estrategia de paralelización que haya elegido. Debe establecer un tamaño de porción razonablemente grande, y hacer sus cálculos dentro de cada fragmento y luego a través de los fragmentos. – sclv

+0

Para seguir hablando de esto, está tomando una operación que es potencialmente lineal en espacio y lee discos y la convierte en una operación que es n^2 en espacio y lee discos. La holgazanería y la rigurosidad de las lecturas se intercambian entre quedarse sin archivadores o quedarse sin memoria para guardar todos los resultados de las lecturas. De cualquier forma, este es el enfoque equivocado. – sclv

+0

¿No debería _pseq_ ser lo suficientemente inteligente como para no generar todos los hilos 2k inmediatamente? Los considero más como trabajos. Todos tienen que hacerse en algún momento, y al usar _pseq_ intento decirle a Haskell que podría optimizar el tiempo de ejecución ejecutando algunos en paralelo. – gerben

Respuesta

3

Como dice el error, hay demasiados archivos abiertos. Esperaba que Haskell ejecutara la mayor parte del programa de forma secuencial, pero algunos paralelos chispas. Sin embargo, como se mencionó anteriormente, Haskell siempre despierta las evaluaciones.

Esto generalmente no es un problema en un programa funcional puro, pero es cuando se trata de IO (recursos). Escaleré el paralelismo como se describe en el libro Real World Haskell demasiado arriba. Así que mi conclusión es hacer el paralelismo solo en una escala limitada cuando se trata de recursos de IO dentro de las chispas. En la parte funcional pura, el paralelismo excesivo puede tener éxito.

Por lo tanto la respuesta a mi mensaje es, a no utilizar MapReduce en todo el programa, pero dentro de una parte funcional pura interior.

Para mostrar dónde falló realmente el programa, lo configuré con --enable-executable-profiling -p, lo construí y lo ejecuté usando + RTS -p -hc -L30. Debido a que el archivo ejecutable falla inmediatamente, no hay un perfil de asignación de memoria. El perfil de la distribución del tiempo que resulta en el archivo .prof comienza con lo siguiente:

                       individual inherited 
COST CENTRE    MODULE            no. entries %time %alloc %time %alloc 

MAIN      MAIN             1   0 0.0 0.3 100.0 100.0 
    main     Main            1648   2 0.0 0.0 50.0 98.9 
    sumOfDistancesOnFileWithIt MapReduceTest         1649   1 0.0 0.0 50.0 98.9 
     chunkedFileEnum  MapReduceTest          1650   1 0.0 0.0 50.0 98.9 
     chunkedEnum   MapReduceTest         1651   495 0.0 24.2 50.0 98.9 
      lineOffsets   MapReduceTest         1652   1 50.0 74.6 50.0 74.6 

chunkedEnum devuelve IO ([enumerador de texto m b], [Mango]), y al parecer recibe 495 entradas. El archivo de entrada era un archivo de línea de 2k, por lo que la entrada única en lineOffsets devolvió una lista de compensaciones de 2000. No hay una sola entrada en distanceUsingMapReduceIt, ¡así que el trabajo real ni siquiera comenzó!

Cuestiones relacionadas