2012-08-13 10 views
11

Quiero usar Play2 para cargar archivos CSV muy grandes (millones de líneas) en elasticsearch. He escrito el siguiente código que funciona bien.Play 2 Scala - La mejor manera de cargar un gran archivo CSV con Iteratee para procesar cada línea de manera reactiva

No estoy satisfecho con la forma en que omito el encabezado de respuesta http en el primer fragmento. Debería haber una manera de encadenar esta iteración con una primera iteración que saltee el encabezado http y cambie directamente al estado Listo, pero no he encontrado cómo.

Si alguien puede ayudar

object ReactiveFileUpload extends Controller { 
    def upload = Action(BodyParser(rh => new CsvIteratee(isFirst = true))) { 
    request => 
     Ok("File Processed") 
    } 
} 

case class CsvIteratee(state: Symbol = 'Cont, input: Input[Array[Byte]] = Empty, lastChunk: String = "", isFirst: Boolean = false) extends Iteratee[Array[Byte], Either[Result, String]] { 
    def fold[B](
       done: (Either[Result, String], Input[Array[Byte]]) => Promise[B], 
       cont: (Input[Array[Byte]] => Iteratee[Array[Byte], Either[Result, String]]) => Promise[B], 
       error: (String, Input[Array[Byte]]) => Promise[B] 
       ): Promise[B] = state match { 
    case 'Done => 
     done(Right(lastChunk), Input.Empty) 

    case 'Cont => cont(in => in match { 
     case in: El[Array[Byte]] => { 
     // Retrieve the part that has not been processed in the previous chunk and copy it in front of the current chunk 
     val content = lastChunk + new String(in.e) 
     val csvBody = 
      if (isFirst) 
      // Skip http header if it is the first chunk 
      content.drop(content.indexOf("\r\n\r\n") + 4) 
      else content 
     val csv = new CSVReader(new StringReader(csvBody), ';') 
     val lines = csv.readAll 
     // Process all lines excepted the last one since it is cut by the chunk 
     for (line <- lines.init) 
      processLine(line) 
     // Put forward the part that has not been processed 
     val last = lines.last.toList.mkString(";") 
     copy(input = in, lastChunk = last, isFirst = false) 
     } 
     case Empty => copy(input = in, isFirst = false) 
     case EOF => copy(state = 'Done, input = in, isFirst = false) 
     case _ => copy(state = 'Error, input = in, isFirst = false) 
    }) 

    case _ => 
     error("Unexpected state", input) 

    } 

    def processLine(line: Array[String]) = WS.url("http://localhost:9200/affa/na/").post(
    toJson(
     Map(
     "date" -> toJson(line(0)), 
     "trig" -> toJson(line(1)), 
     "code" -> toJson(line(2)), 
     "nbjours" -> toJson(line(3).toDouble) 
    ) 
    ) 
) 
} 
+1

Es posible que por el interesado en el juego no oficial-iteratee-extras, que tiene una [analizador CSV] (https://github.com/jroper/play -iteratees-extras/blob/master/src/main/scala/play/extras/iteratees/Csv.scala). –

Respuesta

1

Para encadenar dos iteratees juntos, utilizar flatMap.

val combinedIteratee = firstIteratee.flatMap(firstResult => secondIteratee) 

O, utilizando una para la comprensión:

val combinedIteratee = for { 
    firstResult <- firstIteratee 
    secondResult <- secondIteratee 
} yield secondResult 

Usted puede utilizar flatMap para secuenciar el mayor número iteratees juntos como desee.

Es posible que desee hacer algo como:

val headerAndCsvIteratee = for { 
    headerResult <- headerIteratee 
    csvResult <- csvIteratee 
} yield csvResult 
Cuestiones relacionadas