Quiero usar Play2 para cargar archivos CSV muy grandes (millones de líneas) en elasticsearch. He escrito el siguiente código que funciona bien.Play 2 Scala - La mejor manera de cargar un gran archivo CSV con Iteratee para procesar cada línea de manera reactiva
No estoy satisfecho con la forma en que omito el encabezado de respuesta http en el primer fragmento. Debería haber una manera de encadenar esta iteración con una primera iteración que saltee el encabezado http y cambie directamente al estado Listo, pero no he encontrado cómo.
Si alguien puede ayudar
object ReactiveFileUpload extends Controller {
def upload = Action(BodyParser(rh => new CsvIteratee(isFirst = true))) {
request =>
Ok("File Processed")
}
}
case class CsvIteratee(state: Symbol = 'Cont, input: Input[Array[Byte]] = Empty, lastChunk: String = "", isFirst: Boolean = false) extends Iteratee[Array[Byte], Either[Result, String]] {
def fold[B](
done: (Either[Result, String], Input[Array[Byte]]) => Promise[B],
cont: (Input[Array[Byte]] => Iteratee[Array[Byte], Either[Result, String]]) => Promise[B],
error: (String, Input[Array[Byte]]) => Promise[B]
): Promise[B] = state match {
case 'Done =>
done(Right(lastChunk), Input.Empty)
case 'Cont => cont(in => in match {
case in: El[Array[Byte]] => {
// Retrieve the part that has not been processed in the previous chunk and copy it in front of the current chunk
val content = lastChunk + new String(in.e)
val csvBody =
if (isFirst)
// Skip http header if it is the first chunk
content.drop(content.indexOf("\r\n\r\n") + 4)
else content
val csv = new CSVReader(new StringReader(csvBody), ';')
val lines = csv.readAll
// Process all lines excepted the last one since it is cut by the chunk
for (line <- lines.init)
processLine(line)
// Put forward the part that has not been processed
val last = lines.last.toList.mkString(";")
copy(input = in, lastChunk = last, isFirst = false)
}
case Empty => copy(input = in, isFirst = false)
case EOF => copy(state = 'Done, input = in, isFirst = false)
case _ => copy(state = 'Error, input = in, isFirst = false)
})
case _ =>
error("Unexpected state", input)
}
def processLine(line: Array[String]) = WS.url("http://localhost:9200/affa/na/").post(
toJson(
Map(
"date" -> toJson(line(0)),
"trig" -> toJson(line(1)),
"code" -> toJson(line(2)),
"nbjours" -> toJson(line(3).toDouble)
)
)
)
}
Es posible que por el interesado en el juego no oficial-iteratee-extras, que tiene una [analizador CSV] (https://github.com/jroper/play -iteratees-extras/blob/master/src/main/scala/play/extras/iteratees/Csv.scala). –