7

Así que la Play2.0 Enumeratee page muestra un ejemplo del uso de un método de la &> o through para cambiar una Enumerator[String] en un Enumerator[Int]:Cómo escribir un trozo enumeratee a un empadronador a lo largo de diferentes límites

val toInt: Enumeratee[String,Int] = Enumeratee.map[String]{ s => s.toInt } 
val ints: Enumerator[Int] = strings &> toInt 

Hay también un Enumeratee.grouped enumeradoe para crear un enumerador de fragmentos de elementos individuales. Eso pareció funcionar bien.

Pero lo que veo es que la entrada habitual sería en forma de Array[Byte] (que se devuelve por Enumerator.fromFile y Enumerator.fromStream). Con eso en mente, me gustaría tomar esas entradas Array[Byte] y las convierte en Enumerator[String], por ejemplo, donde cada cadena es una línea (terminada por '\n'). Los límites para las líneas y los elementos Array[Byte] generalmente no coincidirán. ¿Cómo escribo un enumerador que puede convertir las matrices fragmentadas en cadenas fragmentadas?

El propósito es dividir esas líneas en el navegador cuando cada Array[Byte] esté disponible y conservar los bytes sobrantes que no formaban parte de una línea completa hasta que aparezca el siguiente fragmento de entrada.

Idealmente me gustaría tener un método que da una iter: Iteratee[Array[Byte], T] y un Enumerator[Array[Byte]] me dará una vuelta Enumerator[T], donde mis elementos T fueron analizadas por iter.

Información adicional: Tengo un poco de tiempo para limpiar mi código y aquí hay un ejemplo específico de lo que estoy tratando de hacer. Tengo las siguientes iteratees que detectan la siguiente línea:

import play.api.libs.iteratee._ 
type AB = Array[Byte] 

def takeWhile(pred: Byte => Boolean): Iteratee[AB, AB] = { 
    def step(e: Input[AB], acc: AB): Iteratee[AB, AB] = e match { 
    case Input.EOF => Done(acc, Input.EOF) 
    case Input.Empty => Cont(step(_, acc)) 
    case Input.El(arr) => 
     val (taking, rest) = arr.span(pred) 
     if (rest.length > 0) Done(acC++ taking, Input.El(rest)) 
     else Cont(step(_, acC++ taking)) 
    } 
    Cont(step(_, Array())) 
} 

val line = for { 
    bytes <- takeWhile(b => !(b == '\n' || b == '\r')) 
    _  <- takeWhile(b => b == '\n' || b == '\r') 
} yield bytes 

Y lo que me gustaría hacer es algo así:

Ok.stream(Enumerator.fromFile(filename) &> chunkBy(line)).as("text/plain") 

Respuesta

5

https://github.com/playframework/Play20/commit/f979006a7e2c1c08ca56ee0bae67b5463ee099c1#L3R131 Hace algo similar a lo que está haciendo. Lo arreglé agrupado para ocuparme de la entrada restante. El código básicamente se parece a:

val upToNewLine = 
    Traversable.splitOnceAt[String,Char](_ != '\n') &>> 
    Iteratee.consume() 

Enumeratee.grouped(upToNewLine) 

También he de fijar la repetición de la misma manera

+0

fresca. Sentí que 'agrupado' debería haber hecho lo que yo quería. – huynhjl

2

Aquí es lo que tengo después de algunas horas de experimentación. Espero que alguien pueda llegar a una implementación más elegante, ya que apenas puedo seguir la mía.

def chunkBy(chunker: Iteratee[AB, AB]) = new Enumeratee[AB, AB] { 
    def applyOn[A](inner: Iteratee[AB, A]): Iteratee[AB, Iteratee[AB, A]] = { 
    def step(e: Input[AB], in: Iteratee[AB, A], leftover: Input[AB]): 
      Iteratee[AB, Iteratee[AB, A]] = { 
     e match { 
     case Input.EOF => 
      // if we have a leftover and it's a chunk, then output it 
      leftover match { 
      case Input.EOF | Input.Empty => Done(in, leftover) 
      case Input.El(_) => 
       val lastChunk = Iteratee.flatten(Enumerator.enumInput(leftover) 
       >>> Enumerator.eof |>> chunker) 
       lastChunk.pureFlatFold(
       done = { (chunk, rest) => 
        val nextIn = Iteratee.flatten(Enumerator(chunk) |>> in) 
        nextIn.pureFlatFold(
        done = (a, e2) => Done(nextIn, e2), 
        // nothing more will come 
        cont = k => Done(nextIn, Input.EOF), 
        error = (msg, e2) => Error(msg, e2)) 
       }, 
       // not enough content to get a chunk, so drop content 
       cont = k => Done(in, Input.EOF), 
       error = (msg, e2) => Error(msg, e2)) 
      } 
     case Input.Empty => Cont(step(_, in, leftover)) 
     case Input.El(arr) => 
      // feed through chunker 
      val iChunks = Iteratee.flatten(
      Enumerator.enumInput(leftover) 
       >>> Enumerator(arr) 
       >>> Enumerator.eof // to extract the leftover 
       |>> repeat(chunker)) 
      iChunks.pureFlatFold(
      done = { (chunks, rest) => 
       // we have our chunks, feed them to the inner iteratee 
       val nextIn = Iteratee.flatten(Enumerator(chunks: _*) |>> in) 
       nextIn.pureFlatFold(
       done = (a, e2) => Done(nextIn, e2), 
       // inner iteratee needs more data 
       cont = k => Cont(step(_: Input[AB], nextIn, 
        // we have to ignore the EOF we fed to repeat 
        if (rest == Input.EOF) Input.Empty else rest)), 
       error = (msg, e2) => Error(msg, e2)) 
      }, 
      // not enough content to get a chunk, continue 
      cont = k => Cont(step(_: Input[AB], in, leftover)), 
      error = (msg, e2) => Error(msg, e2)) 
     } 
    } 
    Cont(step(_, inner, Input.Empty)) 
    } 
} 

Aquí es la definición de mi costumbre repeat:

// withhold the last chunk so that it may be concatenated with the next one 
def repeat(chunker: Iteratee[AB, AB]) = { 
    def loop(e: Input[AB], ch: Iteratee[AB, AB], acc: Vector[AB], 
     leftover: Input[AB]): Iteratee[AB, Vector[AB]] = e match { 
    case Input.EOF => ch.pureFlatFold(
     done = (a, e) => Done(acc, leftover), 
     cont = k => k(Input.EOF).pureFlatFold(
     done = (a, e) => Done(acc, Input.El(a)), 
     cont = k => sys.error("divergent iter"), 
     error = (msg, e) => Error(msg, e)), 
     error = (msg, e) => Error(msg, e)) 
    case Input.Empty => Cont(loop(_, ch, acc, leftover)) 
    case Input.El(_) => 
     val i = Iteratee.flatten(Enumerator.enumInput(leftover) 
      >>> Enumerator.enumInput(e) |>> ch) 
     i.pureFlatFold(
     done = (a, e) => loop(e, chunker, acc :+ a, Input.Empty), 
     cont = k => Cont(loop(_, i, acc, Input.Empty)), 
     error = (msg, e) => Error(msg, e)) 
    } 
    Cont(loop(_: Input[AB], chunker, Vector(), Input.Empty)) 
} 

Esto funciona en algunas muestras incluido éste:

val source = Enumerator(
    "bippy".getBytes, 
    "foo\n\rbar\n\r\n\rbaz\nb".getBytes, 
    "azam\ntoto\n\n".getBytes) 
Ok.stream(source 
    &> chunkBy(line) 
    &> Enumeratee.map(l => l ++ ".\n".getBytes) 
).as("text/plain") 

que imprime:

bippyfoo. 
bar. 
baz. 
bazam. 
toto. 
Cuestiones relacionadas