2010-11-23 5 views
35

¿Cómo leo un archivo CSV grande (> 1 Gb) con un Scala Stream? ¿Tienes un ejemplo de código? ¿O utilizarías una forma diferente de leer un gran archivo CSV sin cargarlo primero en la memoria?¿Cómo leo un archivo CSV grande con la clase Scala Stream?

+0

¿Quiere decir transmitir como en la función de evaluación lenta? Es presumiblemente posible, pero no obligatorio. - Leer un archivo línea por línea ya es en esencia. Todavía no estoy muy actualizado con Scala, pero getLines (de una búsqueda rápida de la fuente) también se implementa de forma perezosa: ¿lee todo el archivo en la memoria? –

+0

Creo que se lee en la memoria, ya que obtengo una OutOfMemoryException cuando uso scala.Source.fromFile() y luego getLines(). Entonces, usar una clase Stream suena como una alternativa válida, ¿verdad? –

Respuesta

62

Simplemente use Source.fromFile(...).getLines como ya indicó.

que devuelve un iterador, que ya es perezoso (que tendría que utilizar la corriente como una colección perezosa en la que quería valores previamente recuperado para ser memoized, para que pueda leer de nuevo)

Si usted está recibiendo la memoria problemas, entonces el problema radicará en lo que está haciendo después de getLines. Cualquier operación como toList, que obliga a una recopilación estricta, causará el problema.

+1

Supongo que la excepción OutOfMemoryException es causada por las operaciones posteriores. ¡Gracias! –

+0

Quizás no sea bueno tratar con el iterador cuando la lógica de su empresa necesita recorrer el iterador varias veces para calcular algo. Puedes usar el iterador una vez. Parece que sería mejor lidiar con la transmisión. como en esta pregunta: http://stackoverflow.com/questions/17004455/scala-iterator-and-stream-example-stream-fails-on-reuse – ses

10

Espero que no signifique Scala's collection.immutable.Stream con Stream. Esto es no lo que quiere. Stream es flojo, pero realiza memoizaciones.

No sé lo que planeas hacer, pero solo leer el archivo línea por línea debería funcionar muy bien sin usar grandes cantidades de memoria.

getLines debe evaluar lazily y no debe bloquearse (siempre y cuando su archivo no tenga más de 2³² líneas, afaik). Si lo hace, pregunte en #scala o presente un ticket de error (o haga ambas).

3

Si está tratando de procesar el archivo grande línea por línea evitando requerir que el contenido del archivo completo se cargue en la memoria de una sola vez, entonces puede usar el Iterator devuelto por scala.io.Source.

Tengo una pequeña función, tryProcessSource, (que contiene dos subfunciones) que utilizo exactamente para estos tipos de casos de uso. La función toma hasta cuatro parámetros, de los cuales solo el primero es requerido. Los otros parámetros tienen valores predeterminados correctos proporcionados.

Aquí está el perfil de función (función completa aplicación es en la parte inferior):

def tryProcessSource(
    file: File, 
    parseLine: (Int, String) => Option[List[String]] = 
    (index, unparsedLine) => Some(List(unparsedLine)), 
    filterLine: (Int, List[String]) => Option[Boolean] = 
    (index, parsedValues) => Some(true), 
    retainValues: (Int, List[String]) => Option[List[String]] = 
    (index, parsedValues) => Some(parsedValues), 
): Try[List[List[String]]] = { 
    ??? 
} 

El primer parámetro, file: File, se requiere. Y es cualquier instancia válida de java.io.File que apunta a un archivo de texto orientado a línea, como un archivo CSV.

El segundo parámetro, parseLine: (Int, String) => Option[List[String]], es opcional. Y si se proporciona, debe ser una función que espera recibir dos parámetros de entrada; index: Int, unparsedLine: String. Y luego devuelve un Option[List[String]]. La función puede devolver un Some envuelto List[String] que consta de los valores de columna válidos. O puede devolver un None que indica que todo el proceso de transmisión se cancela antes de tiempo. Si no se proporciona este parámetro, se proporciona un valor predeterminado de (index, line) => Some(List(line)). Este valor predeterminado da como resultado que la línea completa se devuelva como un único valor String.

El tercer parámetro, filterLine: (Int, List[String]) => Option[Boolean], es opcional. Y si se proporciona, debe ser una función que espera recibir dos parámetros de entrada; index: Int, parsedValues: List[String]. Y luego devuelve un Option[Boolean]. La función puede devolver un Some envuelto Boolean indicando si esta línea particular debe incluirse en la salida. O puede devolver un None que indica que todo el proceso de transmisión se cancela antes de tiempo. Si no se proporciona este parámetro, se proporciona un valor predeterminado de (index, values) => Some(true).Este predeterminado da como resultado que todas las líneas estén incluidas.

El cuarto y último parámetro, retainValues: (Int, List[String]) => Option[List[String]], es opcional. Y si se proporciona, debe ser una función que espera recibir dos parámetros de entrada; index: Int, parsedValues: List[String]. Y luego devuelve un Option[List[String]]. La función puede devolver un Some envuelto List[String] que consiste en algún subconjunto y/o alteración de los valores de columna existentes. O puede devolver un None que indica que todo el proceso de transmisión se cancela antes de tiempo. Si no se proporciona este parámetro, se proporciona un valor predeterminado de (index, values) => Some(values). Este valor predeterminado da como resultado los valores analizados por el segundo parámetro, parseLine.

Considere un archivo con el siguiente contenido (4 líneas):

street,street2,city,state,zip 
100 Main Str,,Irving,TX,75039 
231 Park Ave,,Irving,TX,75039 
1400 Beltline Rd,Apt 312,Dallas,Tx,75240 

El siguiente perfil de llamar ...

val tryLinesDefaults = 
    tryProcessSource(new File("path/to/file.csv")) 

... resultados en esta salida para tryLinesDefaults (el contenido inalterado del archivo):

Success(
    List(
    List("street,street2,city,state,zip"), 
    List("100 Main Str,,Irving,TX,75039"), 
    List("231 Park Ave,,Irving,TX,75039"), 
    List("1400 Beltline Rd,Apt 312,Dallas,Tx,75240") 
) 
) 

el siguiente perfil de llamar ...

val tryLinesParseOnly = 
    tryProcessSource(
     new File("path/to/file.csv") 
    , parseLine = 
     (index, unparsedLine) => Some(unparsedLine.split(",").toList) 
) 

... resultados en esta salida para tryLinesParseOnly (cada línea analizado en los valores de columna individuales):

Success(
    List(
    List("street","street2","city","state","zip"), 
    List("100 Main Str","","Irving,TX","75039"), 
    List("231 Park Ave","","Irving","TX","75039"), 
    List("1400 Beltline Rd","Apt 312","Dallas","Tx","75240") 
) 
) 

El siguiente perfil de llamada ...

val tryLinesIrvingTxNoHeader = 
    tryProcessSource(
     new File("C:/Users/Jim/Desktop/test.csv") 
    , parseLine = 
     (index, unparsedLine) => Some(unparsedLine.split(",").toList) 
    , filterLine = 
     (index, parsedValues) => 
      Some(
      (index != 0) && //skip header line 
      (parsedValues(2).toLowerCase == "Irving".toLowerCase) && //only Irving 
      (parsedValues(3).toLowerCase == "Tx".toLowerCase) 
     ) 
) 

... resultados en esta salida para tryLinesIrvingTxNoHeader (cada línea analizados en los valores de las columnas individuales, sin cabecera y solo las dos filas en Irving, TX):

Success(
    List(
    List("100 Main Str","","Irving,TX","75039"), 
    List("231 Park Ave","","Irving","TX","75039"), 
) 
) 

Aquí está toda la implementación tryProcessSource función :

import scala.io.Source 
import scala.util.Try 

import java.io.File 

def tryProcessSource(
    file: File, 
    parseLine: (Int, String) => Option[List[String]] = 
    (index, unparsedLine) => Some(List(unparsedLine)), 
    filterLine: (Int, List[String]) => Option[Boolean] = 
    (index, parsedValues) => Some(true), 
    retainValues: (Int, List[String]) => Option[List[String]] = 
    (index, parsedValues) => Some(parsedValues) 
): Try[List[List[String]]] = { 
    def usingSource[S <: Source, R](source: S)(transfer: S => R): Try[R] = 
    try {Try(transfer(source))} finally {source.close()} 
    def recursive(
    remaining: Iterator[(String, Int)], 
    accumulator: List[List[String]], 
    isEarlyAbort: Boolean = 
     false 
): List[List[String]] = { 
    if (isEarlyAbort || !remaining.hasNext) 
     accumulator 
    else { 
     val (line, index) = 
     remaining.next 
     parseLine(index, line) match { 
     case Some(values) => 
      filterLine(index, values) match { 
      case Some(keep) => 
       if (keep) 
       retainValues(index, values) match { 
        case Some(valuesNew) => 
        recursive(remaining, valuesNew :: accumulator) //capture values 
        case None => 
        recursive(remaining, accumulator, isEarlyAbort = true) //early abort 
       } 
       else 
       recursive(remaining, accumulator) //discard row 
      case None => 
       recursive(remaining, accumulator, isEarlyAbort = true) //early abort 
      } 
     case None => 
      recursive(remaining, accumulator, isEarlyAbort = true) //early abort 
     } 
    } 
    } 
    Try(Source.fromFile(file)).flatMap(
    bufferedSource => 
     usingSource(bufferedSource) { 
     source => 
      recursive(source.getLines().buffered.zipWithIndex, Nil).reverse 
     } 
) 
} 

Si bien esta solución es relativamente breve, que me llevó un tiempo considerable y muchos refactorización debe transcurrir antes de que finalmente fue capaz de llegar aquí. Por favor, avíseme si ve alguna forma en que podría mejorarse.


ACTUALIZACIÓN: Sólo he pedido a la cuestión más adelante como it's own StackOverflow question. Y ahora es has an answer fixing the error mencionado a continuación.

Tuve la idea de tratar de hacer esto aún más genérico cambiando el parámetro retainValues a transformLine con la nueva definición de función generic-ified a continuación. Sin embargo, sigo obteniendo el error de resaltado en IntelliJ "Expresión de tipo Algunos [Lista [Cadena]] no se ajusta a la opción de tipo esperado [A]" y no fue capaz de descubrir cómo cambiar el valor predeterminado para que el error se va.

def tryProcessSource2[A <: AnyRef](
    file: File, 
    parseLine: (Int, String) => Option[List[String]] = 
    (index, unparsedLine) => Some(List(unparsedLine)), 
    filterLine: (Int, List[String]) => Option[Boolean] = 
    (index, parsedValues) => Some(true), 
    transformLine: (Int, List[String]) => Option[A] = 
    (index, parsedValues) => Some(parsedValues) 
): Try[List[A]] = { 
    ??? 
} 

Cualquier ayuda sobre cómo hacer este trabajo sería muy apreciada.

Cuestiones relacionadas