Estoy implementando un programa haskell que compara cada línea de un archivo con cada otra línea en el archivo. Que puede ser implementado un solo subproceso de la siguiente manera¿Por qué mi implementación de Mapreduce (haskell del mundo real) usando iteratee IO también falla con "Demasiados archivos abiertos"
distance :: Int -> Int -> Int
distance a b = (a-b)*(a-b)
sumOfDistancesOnSmallFile :: FilePath -> IO Int
sumOfDistancesOnSmallFile path = do
fileContents <- readFile path
return $ allDistances $ map read $ lines $ fileContents
where
allDistances (x:xs) = (allDistances xs) + (sum $ map (distance x) xs)
allDistances _ = 0
Esto ejecutará en O (n^2), y tiene que mantener actualizada la lista completa de los números enteros en la memoria todo el tiempo. En mi programa actual, la línea contiene más números, de los cuales construyo un tipo de datos un poco más complejo que Int. Esto me dio errores de memoria en los datos que tengo que procesar.
Por lo tanto, se deben realizar dos mejoras en la solución de rosca única mencionada anteriormente. Primero, acelere el tiempo de ejecución real. Segundo, encuentre una manera de no mantener toda la lista en la memoria a tiempo completo. Sé que esto requiere analizar el archivo completo n veces. Por lo tanto, habrá comparaciones O (n^2) y se analizarán O (n^2) líneas. Esto está bien para mí, ya que prefiero tener un programa lento y exitoso que un programa fallido. Cuando el archivo de entrada es lo suficientemente pequeño, siempre puedo residir en una versión más simple.
Para usar múltiples núcleos de CPU, saqué la implementación de Mapreduce de Real World Haskell (capítulo 24, disponible here).
que modifica la función de fragmentación del libro que, en lugar de dividir el archivo completo en trozos, el retorno tantos trozos como líneas con cada trozo que representa un elemento de
tails . lines . readFile
porque quiero que el programa también a ser escalable en tamaño de archivo, utilicé inicialmente perezoso IO. Sin embargo, esto falla con "Demasiados archivos abiertos", sobre lo cual pregunté en un previous question (los manejadores de archivos fueron eliminados demasiado tarde por el GC). La versión full IO perezoso se publica allí.
Como la respuesta aceptada explica, strict IO podría resolver el problema. Eso resuelve el problema "Demasiados archivos abiertos" para archivos de 2k líneas, pero falla con "falta de memoria" en un archivo de 50k.
Tenga en cuenta que la primera implementación de rosca simple (sin mapreduce) es capaz de manejar un archivo de 50k.
La solución alternativa, que también me resulta más atractiva, es usar iteratee IO. Esperé que esto solucionara tanto el manejo del archivo como el agotamiento de los recursos de la memoria. Sin embargo, mi implementación aún falla con un error "Demasiados archivos abiertos" en un archivo de línea de 2k.
El iteratee versión IO tiene la misma MapReduce función que en el libro, pero tiene un modificado chunkedFileEnum a dejarlo trabajar con un enumerador .
Así que mi pregunta es; ¿Qué está mal con la siguiente implementación iteratee IO base? ¿Dónde está la holgazanería ?.
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Trans (MonadIO, liftIO)
import System.IO
import qualified Data.Enumerator.List as EL
import qualified Data.Enumerator.Text as ET
import Data.Enumerator hiding (map, filter, head, sequence)
import Data.Text(Text)
import Data.Text.Read
import Data.Maybe
import qualified Data.ByteString.Char8 as Str
import Control.Exception (bracket,finally)
import Control.Monad(forM,liftM)
import Control.Parallel.Strategies
import Control.Parallel
import Control.DeepSeq (NFData)
import Data.Int (Int64)
--Goal: in a file with n values, calculate the sum of all n*(n-1)/2 squared distances
--My operation for one value pair
distance :: Int -> Int -> Int
distance a b = (a-b)*(a-b)
combineDistances :: [Int] -> Int
combineDistances = sum
--Test file generation
createTestFile :: Int -> FilePath -> IO()
createTestFile n path = writeFile path $ unlines $ map show $ take n $ infiniteList 0 1
where infiniteList :: Int->Int-> [Int]
infiniteList i j = (i + j) : infiniteList j (i+j)
--Applying my operation simply on a file
--(Actually does NOT throw an Out of memory on a file generated by createTestFile 50000)
--But i want to use multiple cores..
sumOfDistancesOnSmallFile :: FilePath -> IO Int
sumOfDistancesOnSmallFile path = do
fileContents <- readFile path
return $ allDistances $ map read $ lines $ fileContents
where
allDistances (x:xs) = (allDistances xs) + (sum $ map (distance x) xs)
allDistances _ = 0
--Setting up an enumerator of read values from a text stream
readerEnumerator :: Monad m =>Integral a => Reader a -> Step a m b -> Iteratee Text m b
readerEnumerator reader = joinI . (EL.concatMapM transformer)
where transformer input = case reader input of
Right (val, remainder) -> return [val]
Left err -> return [0]
readEnumerator :: Monad m =>Integral a => Step a m b -> Iteratee Text m b
readEnumerator = readerEnumerator (signed decimal)
--The iteratee version of my operation
distancesFirstToTailIt :: Monad m=> Iteratee Int m Int
distancesFirstToTailIt = do
maybeNum <- EL.head
maybe (return 0) distancesOneToManyIt maybeNum
distancesOneToManyIt :: Monad m=> Int -> Iteratee Int m Int
distancesOneToManyIt base = do
maybeNum <- EL.head
maybe (return 0) combineNextDistance maybeNum
where combineNextDistance nextNum = do
rest <- distancesOneToManyIt base
return $ combineDistances [(distance base nextNum),rest]
--The mapreduce algorithm
mapReduce :: Strategy b -- evaluation strategy for mapping
-> (a -> b) -- map function
-> Strategy c -- evaluation strategy for reduction
-> ([b] -> c) -- reduce function
-> [a] -- list to map over
-> c
mapReduce mapStrat mapFunc reduceStrat reduceFunc input =
mapResult `pseq` reduceResult
where mapResult = parMap mapStrat mapFunc input
reduceResult = reduceFunc mapResult `using` reduceStrat
--Applying the iteratee operation using mapreduce
sumOfDistancesOnFileWithIt :: FilePath -> IO Int
sumOfDistancesOnFileWithIt path = chunkedFileEnum chunkByLinesTails (distancesUsingMapReduceIt) path
distancesUsingMapReduceIt :: [Enumerator Text IO Int] -> IO Int
distancesUsingMapReduceIt = mapReduce rpar (runEnumeratorAsMapFunc)
rpar (sumValuesAsReduceFunc)
where runEnumeratorAsMapFunc :: Enumerator Text IO Int -> IO Int
runEnumeratorAsMapFunc = (\source->run_ (source $$ readEnumerator $$ distancesFirstToTailIt))
sumValuesAsReduceFunc :: [IO Int] -> IO Int
sumValuesAsReduceFunc = liftM sum . sequence
--Working with (file)chunk enumerators:
data ChunkSpec = CS{
chunkOffset :: !Int
, chunkLength :: !Int
} deriving (Eq,Show)
chunkedFileEnum :: (NFData (a)) => MonadIO m =>
(FilePath-> IO [ChunkSpec])
-> ([Enumerator Text m b]->IO a)
-> FilePath
-> IO a
chunkedFileEnum chunkCreator funcOnChunks path = do
(chunks, handles)<- chunkedEnum chunkCreator path
r <- funcOnChunks chunks
(rdeepseq r `seq` (return r)) `finally` mapM_ hClose handles
chunkedEnum :: MonadIO m=>
(FilePath -> IO [ChunkSpec])
-> FilePath
-> IO ([Enumerator Text m b], [Handle])
chunkedEnum chunkCreator path = do
chunks <- chunkCreator path
liftM unzip . forM chunks $ \spec -> do
h <- openFile path ReadMode
hSeek h AbsoluteSeek (fromIntegral (chunkOffset spec))
let chunk = ET.enumHandle h --Note:chunklength not taken into account, so just to EOF
return (chunk,h)
-- returns set of chunks representing tails . lines . readFile
chunkByLinesTails :: FilePath -> IO[ChunkSpec]
chunkByLinesTails path = do
bracket (openFile path ReadMode) hClose $ \h-> do
totalSize <- fromIntegral `liftM` hFileSize h
let chunkSize = 1
findChunks offset = do
let newOffset = offset + chunkSize
hSeek h AbsoluteSeek (fromIntegral newOffset)
let findNewline lineSeekOffset = do
eof <- hIsEOF h
if eof
then return [CS offset (totalSize - offset)]
else do
bytes <- Str.hGet h 256
case Str.elemIndex '\n' bytes of
Just n -> do
nextChunks <- findChunks (lineSeekOffset + n + 1)
return (CS offset (totalSize-offset):nextChunks)
Nothing -> findNewline (lineSeekOffset + Str.length bytes)
findNewline newOffset
findChunks 0
Por cierto, estoy corriendo HaskellPlatform 2011.2.0 en Mac OS X 10.6.7 (Snow Leopard)
con los siguientes paquetes:
ByteString 0.9.1.10
paralelo 3.1.0.1
empadronador 0.4.8, con un manual here
usted tiene demasiado paralelismo. Con un tamaño de 1 y 2k, está abriendo el archivo 2k veces. Y cuanto más paralelismo, más memoria usas también. Realmente no creo que un problema como este, que requiere cruzar una estructura consigo mismo, sea adecuado para la estrategia de paralelización que haya elegido. Debe establecer un tamaño de porción razonablemente grande, y hacer sus cálculos dentro de cada fragmento y luego a través de los fragmentos. – sclv
Para seguir hablando de esto, está tomando una operación que es potencialmente lineal en espacio y lee discos y la convierte en una operación que es n^2 en espacio y lee discos. La holgazanería y la rigurosidad de las lecturas se intercambian entre quedarse sin archivadores o quedarse sin memoria para guardar todos los resultados de las lecturas. De cualquier forma, este es el enfoque equivocado. – sclv
¿No debería _pseq_ ser lo suficientemente inteligente como para no generar todos los hilos 2k inmediatamente? Los considero más como trabajos. Todos tienen que hacerse en algún momento, y al usar _pseq_ intento decirle a Haskell que podría optimizar el tiempo de ejecución ejecutando algunos en paralelo. – gerben