¿Cómo leo un archivo CSV grande (> 1 Gb) con un Scala Stream? ¿Tienes un ejemplo de código? ¿O utilizarías una forma diferente de leer un gran archivo CSV sin cargarlo primero en la memoria?¿Cómo leo un archivo CSV grande con la clase Scala Stream?
Respuesta
Simplemente use Source.fromFile(...).getLines
como ya indicó.
que devuelve un iterador, que ya es perezoso (que tendría que utilizar la corriente como una colección perezosa en la que quería valores previamente recuperado para ser memoized, para que pueda leer de nuevo)
Si usted está recibiendo la memoria problemas, entonces el problema radicará en lo que está haciendo después de getLines. Cualquier operación como toList
, que obliga a una recopilación estricta, causará el problema.
Supongo que la excepción OutOfMemoryException es causada por las operaciones posteriores. ¡Gracias! –
Quizás no sea bueno tratar con el iterador cuando la lógica de su empresa necesita recorrer el iterador varias veces para calcular algo. Puedes usar el iterador una vez. Parece que sería mejor lidiar con la transmisión. como en esta pregunta: http://stackoverflow.com/questions/17004455/scala-iterator-and-stream-example-stream-fails-on-reuse – ses
Espero que no signifique Scala's collection.immutable.Stream
con Stream. Esto es no lo que quiere. Stream es flojo, pero realiza memoizaciones.
No sé lo que planeas hacer, pero solo leer el archivo línea por línea debería funcionar muy bien sin usar grandes cantidades de memoria.
getLines
debe evaluar lazily y no debe bloquearse (siempre y cuando su archivo no tenga más de 2³² líneas, afaik). Si lo hace, pregunte en #scala o presente un ticket de error (o haga ambas).
Si está tratando de procesar el archivo grande línea por línea evitando requerir que el contenido del archivo completo se cargue en la memoria de una sola vez, entonces puede usar el Iterator
devuelto por scala.io.Source
.
Tengo una pequeña función, tryProcessSource
, (que contiene dos subfunciones) que utilizo exactamente para estos tipos de casos de uso. La función toma hasta cuatro parámetros, de los cuales solo el primero es requerido. Los otros parámetros tienen valores predeterminados correctos proporcionados.
Aquí está el perfil de función (función completa aplicación es en la parte inferior):
def tryProcessSource(
file: File,
parseLine: (Int, String) => Option[List[String]] =
(index, unparsedLine) => Some(List(unparsedLine)),
filterLine: (Int, List[String]) => Option[Boolean] =
(index, parsedValues) => Some(true),
retainValues: (Int, List[String]) => Option[List[String]] =
(index, parsedValues) => Some(parsedValues),
): Try[List[List[String]]] = {
???
}
El primer parámetro, file: File
, se requiere. Y es cualquier instancia válida de java.io.File
que apunta a un archivo de texto orientado a línea, como un archivo CSV.
El segundo parámetro, parseLine: (Int, String) => Option[List[String]]
, es opcional. Y si se proporciona, debe ser una función que espera recibir dos parámetros de entrada; index: Int
, unparsedLine: String
. Y luego devuelve un Option[List[String]]
. La función puede devolver un Some
envuelto List[String]
que consta de los valores de columna válidos. O puede devolver un None
que indica que todo el proceso de transmisión se cancela antes de tiempo. Si no se proporciona este parámetro, se proporciona un valor predeterminado de (index, line) => Some(List(line))
. Este valor predeterminado da como resultado que la línea completa se devuelva como un único valor String
.
El tercer parámetro, filterLine: (Int, List[String]) => Option[Boolean]
, es opcional. Y si se proporciona, debe ser una función que espera recibir dos parámetros de entrada; index: Int
, parsedValues: List[String]
. Y luego devuelve un Option[Boolean]
. La función puede devolver un Some
envuelto Boolean
indicando si esta línea particular debe incluirse en la salida. O puede devolver un None
que indica que todo el proceso de transmisión se cancela antes de tiempo. Si no se proporciona este parámetro, se proporciona un valor predeterminado de (index, values) => Some(true)
.Este predeterminado da como resultado que todas las líneas estén incluidas.
El cuarto y último parámetro, retainValues: (Int, List[String]) => Option[List[String]]
, es opcional. Y si se proporciona, debe ser una función que espera recibir dos parámetros de entrada; index: Int
, parsedValues: List[String]
. Y luego devuelve un Option[List[String]]
. La función puede devolver un Some
envuelto List[String]
que consiste en algún subconjunto y/o alteración de los valores de columna existentes. O puede devolver un None
que indica que todo el proceso de transmisión se cancela antes de tiempo. Si no se proporciona este parámetro, se proporciona un valor predeterminado de (index, values) => Some(values)
. Este valor predeterminado da como resultado los valores analizados por el segundo parámetro, parseLine
.
Considere un archivo con el siguiente contenido (4 líneas):
street,street2,city,state,zip
100 Main Str,,Irving,TX,75039
231 Park Ave,,Irving,TX,75039
1400 Beltline Rd,Apt 312,Dallas,Tx,75240
El siguiente perfil de llamar ...
val tryLinesDefaults =
tryProcessSource(new File("path/to/file.csv"))
... resultados en esta salida para tryLinesDefaults
(el contenido inalterado del archivo):
Success(
List(
List("street,street2,city,state,zip"),
List("100 Main Str,,Irving,TX,75039"),
List("231 Park Ave,,Irving,TX,75039"),
List("1400 Beltline Rd,Apt 312,Dallas,Tx,75240")
)
)
el siguiente perfil de llamar ...
val tryLinesParseOnly =
tryProcessSource(
new File("path/to/file.csv")
, parseLine =
(index, unparsedLine) => Some(unparsedLine.split(",").toList)
)
... resultados en esta salida para tryLinesParseOnly
(cada línea analizado en los valores de columna individuales):
Success(
List(
List("street","street2","city","state","zip"),
List("100 Main Str","","Irving,TX","75039"),
List("231 Park Ave","","Irving","TX","75039"),
List("1400 Beltline Rd","Apt 312","Dallas","Tx","75240")
)
)
El siguiente perfil de llamada ...
val tryLinesIrvingTxNoHeader =
tryProcessSource(
new File("C:/Users/Jim/Desktop/test.csv")
, parseLine =
(index, unparsedLine) => Some(unparsedLine.split(",").toList)
, filterLine =
(index, parsedValues) =>
Some(
(index != 0) && //skip header line
(parsedValues(2).toLowerCase == "Irving".toLowerCase) && //only Irving
(parsedValues(3).toLowerCase == "Tx".toLowerCase)
)
)
... resultados en esta salida para tryLinesIrvingTxNoHeader
(cada línea analizados en los valores de las columnas individuales, sin cabecera y solo las dos filas en Irving, TX):
Success(
List(
List("100 Main Str","","Irving,TX","75039"),
List("231 Park Ave","","Irving","TX","75039"),
)
)
Aquí está toda la implementación tryProcessSource
función :
import scala.io.Source
import scala.util.Try
import java.io.File
def tryProcessSource(
file: File,
parseLine: (Int, String) => Option[List[String]] =
(index, unparsedLine) => Some(List(unparsedLine)),
filterLine: (Int, List[String]) => Option[Boolean] =
(index, parsedValues) => Some(true),
retainValues: (Int, List[String]) => Option[List[String]] =
(index, parsedValues) => Some(parsedValues)
): Try[List[List[String]]] = {
def usingSource[S <: Source, R](source: S)(transfer: S => R): Try[R] =
try {Try(transfer(source))} finally {source.close()}
def recursive(
remaining: Iterator[(String, Int)],
accumulator: List[List[String]],
isEarlyAbort: Boolean =
false
): List[List[String]] = {
if (isEarlyAbort || !remaining.hasNext)
accumulator
else {
val (line, index) =
remaining.next
parseLine(index, line) match {
case Some(values) =>
filterLine(index, values) match {
case Some(keep) =>
if (keep)
retainValues(index, values) match {
case Some(valuesNew) =>
recursive(remaining, valuesNew :: accumulator) //capture values
case None =>
recursive(remaining, accumulator, isEarlyAbort = true) //early abort
}
else
recursive(remaining, accumulator) //discard row
case None =>
recursive(remaining, accumulator, isEarlyAbort = true) //early abort
}
case None =>
recursive(remaining, accumulator, isEarlyAbort = true) //early abort
}
}
}
Try(Source.fromFile(file)).flatMap(
bufferedSource =>
usingSource(bufferedSource) {
source =>
recursive(source.getLines().buffered.zipWithIndex, Nil).reverse
}
)
}
Si bien esta solución es relativamente breve, que me llevó un tiempo considerable y muchos refactorización debe transcurrir antes de que finalmente fue capaz de llegar aquí. Por favor, avíseme si ve alguna forma en que podría mejorarse.
ACTUALIZACIÓN: Sólo he pedido a la cuestión más adelante como it's own StackOverflow question. Y ahora es has an answer fixing the error mencionado a continuación.
Tuve la idea de tratar de hacer esto aún más genérico cambiando el parámetro retainValues
a transformLine
con la nueva definición de función generic-ified a continuación. Sin embargo, sigo obteniendo el error de resaltado en IntelliJ "Expresión de tipo Algunos [Lista [Cadena]] no se ajusta a la opción de tipo esperado [A]" y no fue capaz de descubrir cómo cambiar el valor predeterminado para que el error se va.
def tryProcessSource2[A <: AnyRef](
file: File,
parseLine: (Int, String) => Option[List[String]] =
(index, unparsedLine) => Some(List(unparsedLine)),
filterLine: (Int, List[String]) => Option[Boolean] =
(index, parsedValues) => Some(true),
transformLine: (Int, List[String]) => Option[A] =
(index, parsedValues) => Some(parsedValues)
): Try[List[A]] = {
???
}
Cualquier ayuda sobre cómo hacer este trabajo sería muy apreciada.
- 1. ¿Cómo puedo omitir la fila del encabezado cuando leo un archivo CSV en Ruby? clase
- 2. Convertir DataTable a CSV stream
- 3. Modificar un archivo grande en Scala
- 4. Importar archivo csv grande usando phpMyAdmin
- 5. Pitón sin memoria en un archivo CSV grande (numpy)
- 6. ¿Cómo leo un archivo en App Engine?
- 7. En Scala, ¿cómo leer un archivo CSV simple con un encabezado en su primera línea?
- 8. Cómo cargar un archivo csv grande con columnas de tipo mixto usando el paquete bigmemory
- 9. ¿Cómo se traduce Stream-cons # :: en Scala?
- 10. Stream StringBuilder al archivo
- 11. Tratamiento de un SQL ResultSet como un Scala Stream
- 12. ¿Lectura de un archivo CSV en .NET?
- 13. Cómo abrir un archivo desde Memory Stream
- 14. ¿Cómo crear instancias y poblar un Scala Stream en Java?
- 15. ¿Subir archivo REST con HttpRequestMessage o Stream?
- 16. ¿Cómo lidiar con un archivo de texto muy grande?
- 17. Cómo codificar un hipervínculo en un archivo con formato CSV?
- 18. UNIX Importar CSV grande en SQLite
- 19. Cree dinámicamente un archivo CSV con FileHelpers
- 20. ¿Cómo leo un disco directamente con .Net?
- 21. procesamiento de un archivo CSV con headen con gnu paralelo
- 22. Matrix a CSV en Scala
- 23. ¿Cómo uso Scala para analizar datos CSV con columnas vacías?
- 24. Escribiendo un archivo desde StreamReader stream
- 25. Python: HTTP Publique un archivo grande con la transmisión
- 26. Generar archivo CSV con Django (contenido dinámico)
- 27. Consumo de memoria de un Scala Stream paralelo
- 28. ¿Cómo leo un archivo que se actualiza constantemente?
- 29. ¿Cómo leo un archivo gzip línea por línea?
- 30. ¿Diferencia entre Iterator y Stream en Scala?
¿Quiere decir transmitir como en la función de evaluación lenta? Es presumiblemente posible, pero no obligatorio. - Leer un archivo línea por línea ya es en esencia. Todavía no estoy muy actualizado con Scala, pero getLines (de una búsqueda rápida de la fuente) también se implementa de forma perezosa: ¿lee todo el archivo en la memoria? –
Creo que se lee en la memoria, ya que obtengo una OutOfMemoryException cuando uso scala.Source.fromFile() y luego getLines(). Entonces, usar una clase Stream suena como una alternativa válida, ¿verdad? –