2011-11-22 7 views
6

Tengo Iterators muy grandes que quiero dividir en pedazos. Tengo un predicado que mira un elemento y devuelve verdadero si es el comienzo de una nueva pieza. Necesito que las piezas sean iteradores, porque incluso las piezas no caben en la memoria. Hay tantas piezas que desconfiaría de una solución recursiva que explote tu pila. La situación es similar a this question, pero necesito Iteradores en lugar de Listas, y los "centinelas" (elementos para los que el predicado es verdadero) ocurren (y deberían incluirse) al comienzo de una pieza. Los iteradores resultantes solo se usarán en orden, aunque algunos no se usarán en absoluto, y solo deberán usar memoria O (1). Me imagino que esto significa que todos deben compartir el mismo iterador subyacente. El rendimiento es importante.Scala: Agrupe un Iterable en un Iterable de Iterables por un predicado

Si tuviera que tomar una puñalada en una firma de función, sería la siguiente:

def groupby[T](iter: Iterator[T])(startsGroup: T => Boolean): Iterator[Iterator[T]] = ... 

Me hubiera gustado utilizar takeWhile, pero pierde el último elemento. Investigué span, pero amortigua los resultados. Mi mejor idea actual involucra BufferedIterator, pero tal vez hay una mejor manera.

Usted sabrá que ha hecho bien, porque algo así no se bloquea la JVM:

groupby((1 to Int.MaxValue).iterator)(_ % (Int.MaxValue/2) == 0).foreach(group => println(group.sum)) 
groupby((1 to Int.MaxValue).iterator)(_ % 10 == 0).foreach(group => println(group.sum)) 
+0

Ver http://stackoverflow.com/questions/5410846/how-do-i-apply-the-pimp-my-library-pattern-to-scala-collections/5411133#5411133 – huynhjl

Respuesta

5

Aquí está mi solución usando BufferedIterator. No le permite saltarse iteradores correctamente, pero es bastante simple y funcional. Los primeros elementos entran en un grupo incluso si es !startsGroup(first).

def groupby[T](iter: Iterator[T])(startsGroup: T => Boolean): Iterator[Iterator[T]] = 
    new Iterator[Iterator[T]] { 
    val base = iter.buffered 
    override def hasNext = base.hasNext 
    override def next() = Iterator(base.next()) ++ new Iterator[T] { 
     override def hasNext = base.hasNext && !startsGroup(base.head) 
     override def next() = if (hasNext) base.next() else Iterator.empty.next() 
    } 
    } 

Actualización: El mantenimiento de un pequeño estado le permite saltar y evitar que la gente iteradores de jugar con los anteriores:

def groupby[T](iter: Iterator[T])(startsGroup: T => Boolean): Iterator[Iterator[T]] = 
new Iterator[Iterator[T]] { 
    val base = iter.buffered 
    var prev: Iterator[T] = Iterator.empty 
    override def hasNext = base.hasNext 
    override def next() = { 
    while (prev.hasNext) prev.next()  // Exhaust previous iterator; take* and drop* do NOT always work!! (Jira SI-5002?) 
    prev = Iterator(base.next()) ++ new Iterator[T] { 
     var hasMore = true 
     override def hasNext = { hasMore = hasMore && base.hasNext && !startsGroup(base.head) ; hasMore } 
     override def next() = if (hasNext) base.next() else Iterator.empty.next() 
    } 
    prev 
    } 
} 
5

tiene un problema inherente. Iterable implica que puede obtener múltiples iteradores. Iterator implica que solo puede pasar una vez. Eso significa que su Iterable[Iterable[T]] debe poder producir Iterator[Iterable[T]] s. Pero cuando esto devuelve un elemento - un Iterable[T] - y eso pide múltiples iteradores, el iterador individual subyacente no puede cumplir sin almacenar en caché los resultados de la lista (demasiado grande) o llamando al original iterable y pasando por absolutamente todo de nuevo (muy ineficiente).

Por lo tanto, mientras que podría hacer esto, creo que debe concebir su problema de una manera diferente.

En cambio, si pudiera comenzar con un Seq, podría tomar subconjuntos como rangos.

Si ya sabe cómo desea utilizar su iterable, se podría escribir un método

def process[T](source: Iterable[T])(starts: T => Boolean)(handlers: T => Unit *) 

que se incrementa a través del conjunto de manipuladores cada vez starts incendios fuera un "verdadero". Si hay alguna manera de que pueda hacer su procesamiento de una vez, algo como esto es el camino a seguir. (Sin embargo, sus manejadores tendrán que guardar el estado a través de variables o estructuras de datos mutables)

Si puede permitir que la iteración en la lista externa rompa la lista interna, podría tener un Iterable[Iterator[T]] con la restricción adicional que una vez que itere a un subsector posterior, todos los sub iteradores anteriores no son válidos.


Aquí hay una solución del último tipo (Iterator[T]-Iterator[Iterator[T]]; uno puede envolver esto para hacer las capas externas Iterable en su lugar).

class GroupedBy[T](source: Iterator[T])(starts: T => Boolean) 
extends Iterator[Iterator[T]] { 
    private val underlying = source 
    private var saved: T = _ 
    private var cached = false 
    private var starting = false 
    private def cacheNext() { 
    saved = underlying.next 
    starting = starts(saved) 
    cached = true 
    } 
    private def oops() { throw new java.util.NoSuchElementException("empty iterator") } 
    // Comment the next line if you do NOT want the first element to always start a group 
    if (underlying.hasNext) { cacheNext(); starting = true } 
    def hasNext = { 
    while (!(cached && starting) && underlying.hasNext) cacheNext() 
    cached && starting 
    } 
    def next = { 
    if (!(cached && starting) && !hasNext) oops() 
    starting = false 
    new Iterator[T] { 
     var presumablyMore = true 
     def hasNext = { 
     if (!cached && !starting && underlying.hasNext && presumablyMore) cacheNext() 
     presumablyMore = cached && !starting 
     presumablyMore 
     } 
     def next = { 
     if (presumablyMore && (cached || hasNext)) { 
      cached = false 
      saved 
     } 
     else oops() 
     } 
    } 
    } 
} 
+1

'Iterator [ Iterator [T]] 'estaría bien; mi iterador subyacente solo puede y debe permitir solo una pasada de todos modos. Quiero omitir los iteradores para invalidar los sub iteradores anteriores. No sé la longitud antes de tiempo, por lo que un 'Seq' no es posible. Sé cómo quiero usar mi iterable, pero pensé que tal función sería útil en general. –

1

Si está buscando restricciones de memoria, entonces lo siguiente funcionará. Solo puede usarlo si su objeto iterable subyacente admite vistas.Esta implementación iterará sobre Iterable y luego generará IterableViews que luego podrá iterarse. A esta implementación no le importa si el primer elemento prueba como un grupo de inicio, ya que será independiente.

def groupby[T](iter: Iterable[T])(startsGroup: T => Boolean): Iterable[Iterable[T]] = new Iterable[Iterable[T]] { 
    def iterator = new Iterator[Iterable[T]] { 
    val i = iter.iterator 
    var index = 0 
    var nextView: IterableView[T, Iterable[T]] = getNextView() 
    private def getNextView() = { 
     val start = index 
     var hitStartGroup = false 
     while (i.hasNext && ! hitStartGroup) { 
     val next = i.next() 
     index += 1 
     hitStartGroup = (index > 1 && startsGroup(next)) 
     } 
     if (hitStartGroup) { 
     if (start == 0) iter.view(start, index - 1) 
     else iter.view(start - 1, index - 1) 
     } else { // hit end 
     if (start == index) null 
     else if (start == 0) iter.view(start, index) 
     else iter.view(start - 1, index) 
     } 
    } 
    def hasNext = nextView != null 
    def next() = { 
     if (nextView != null) { 
     val next = nextView 
     nextView = getNextView() 
     next 
     } else null 
    } 
    } 
} 
+0

Se corrigió el código de respuesta. Faltaba el caso "if (start == index) null" en getNextView –

1

Usted puede mantener la memoria baja huella mediante el uso de corrientes. Use result.toIterator, si es un iterador nuevamente.

Con transmisiones, no hay estado mutable, solo un solo condicional y es casi tan conciso como la solución de Jay Hacker.

def batchBy[A,B](iter: Iterator[A])(f: A => B): Stream[(B, Iterator[A])] = { 
    val base = iter.buffered 
    val empty = Stream.empty[(B, Iterator[A])] 

    def getBatch(key: B) = { 
     Iterator(base.next()) ++ new Iterator[A] { 
     def hasNext: Boolean = base.hasNext && (f(base.head) == key) 
     def next(): A = base.next() 
     } 
    } 

    def next(skipList: Option[Iterator[A]] = None): Stream[(B, Iterator[A])] = { 
     skipList.foreach{_.foreach{_=>}} 

     if (base.isEmpty) empty 
     else { 
     val key = f(base.head) 
     val batch = getBatch(key) 

     Stream.cons((key, batch), next(Some(batch))) 
     } 
    } 

    next() 
    } 

me corrieron las pruebas:

scala> batchBy((1 to Int.MaxValue).iterator)(_ % (Int.MaxValue/2) == 0) 
     .foreach{case(_,group) => println(group.sum)} 
-1610612735 
1073741823 
-536870909 
2147483646 
2147483647 

Las segundas pruebas de impresión demasiado para pegar a desbordamiento de pila.

0
import scala.collection.mutable.ArrayBuffer 

object GroupingIterator { 

    /** 
    * Create a new GroupingIterator with a grouping predicate. 
    * 
    * @param it The original iterator 
    * @param p Predicate controlling the grouping 
    * @tparam A Type of elements iterated 
    * @return A new GroupingIterator 
    */ 
    def apply[A](it: Iterator[A])(p: (A, IndexedSeq[A]) => Boolean): GroupingIterator[A] = 
    new GroupingIterator(it)(p) 
} 

/** 
* Group elements in sequences of contiguous elements that satisfy a predicate. The predicate 
* tests each single potential next element of the group with the help of the elements grouped so far. 
* If it returns true, the potential next element is added to the group, otherwise 
* a new group is started with the potential next element as first element 
* 
* @param self The original iterator 
* @param p Predicate controlling the grouping 
* @tparam A Type of elements iterated 
*/ 
class GroupingIterator[+A](self: Iterator[A])(p: (A, IndexedSeq[A]) => Boolean) extends Iterator[IndexedSeq[A]] { 

    private[this] val source = self.buffered 
    private[this] val buffer: ArrayBuffer[A] = ArrayBuffer() 

    def hasNext: Boolean = source.hasNext 

    def next(): IndexedSeq[A] = { 
    if (hasNext) 
     nextGroup() 
    else 
     Iterator.empty.next() 
    } 

    private[this] def nextGroup(): IndexedSeq[A] = { 
    assert(source.hasNext) 

    buffer.clear() 
    buffer += source.next 

    while (source.hasNext && p(source.head, buffer)) { 
     buffer += source.next 
    } 

    buffer.toIndexedSeq 
    } 
} 
Cuestiones relacionadas