2012-03-22 11 views
6

He escrito una aplicación Scala (2.9.1-1) que necesita procesar varios millones de filas de una consulta de base de datos. Estoy convirtiendo el ResultSet a un Stream utilizando la técnica mostrada en la respuesta a una de mis previous questions:Consumo de memoria de un Scala Stream paralelo

class Record(...) 

val resultSet = statement.executeQuery(...) 

new Iterator[Record] { 
    def hasNext = resultSet.next() 
    def next = new Record(resultSet.getString(1), resultSet.getInt(2), ...) 
}.toStream.foreach { record => ... } 

y esto ha funcionado muy bien.

Puesto que el cuerpo del cierre foreach es muy intensivo de la CPU, y como un testimonio de la viabilidad de la programación funcional, si añado un .par antes de la foreach, los cierres de conseguir corrido en paralelo con ningún otro esfuerzo, excepto para hacer Asegúrese de que el cuerpo del cierre sea seguro para hilos (está escrito en un estilo funcional sin datos variables excepto la impresión en un registro seguro para subprocesos).

Sin embargo, me preocupa el consumo de memoria. ¿El .par hace que todo el conjunto de resultados se cargue en la RAM, o la operación en paralelo carga solo tantas filas como hilos activos? He asignado 4G a la JVM (64 bits con -Xmx4g) pero en el futuro lo ejecutaré en incluso más filas y me preocupará que eventualmente me salga la memoria.

¿Existe un mejor patrón para realizar este tipo de procesamiento paralelo de manera funcional? He estado mostrando esta aplicación a mis compañeros de trabajo como un ejemplo del valor de la programación funcional y las máquinas multi-core.

+0

Sólo courious. ¿Qué DBMS estás usando y qué API de Scala DB para consultar? – santiagobasulto

+0

Estoy accediendo a una base de datos de Microsoft SQL Server 2012 que se ejecuta en Windows Server 2008 R2 utilizando el controlador JDBC de Microsoft (http://msdn.microsoft.com/en-us/sqlserver/aa937724). – Ralph

Respuesta

4

Si nos fijamos en la scaladoc of Stream, se dará cuenta de que la clase definición de par es el rasgo Parallelizable ... y, si nos fijamos en la source code of this trait, se dará cuenta de que se necesita cada elemento de la colección original y poner ellos en un combinador, por lo tanto, que se cargarán cada fila en una ParSeq:

def par: ParRepr = { 
    val cb = parCombiner 
    for (x <- seq) cb += x 
    cb.result 
    } 

    /** The default `par` implementation uses the combiner provided by this method 
    * to create a new parallel collection. 
    * 
    * @return a combiner for the parallel collection of type `ParRepr` 
    */ 
    protected[this] def parCombiner: Combiner[A, ParRepr] 

una posible solución es poner en paralelo de forma explícita su cómputo, gracias a los actores, por ejemplo. Puede echar un vistazo a this example de la documentación de akka, por ejemplo, que podría ser útil en su contexto.

+0

Tenía miedo de eso. Pensé en encender un conjunto de hilos y luego hacer que cada uno tire filas del conjunto de resultados (sincronizados), pero eso no suena como una solución muy funcional. – Ralph

+0

Pida a un actor que ajuste la consulta y genere un enrutador con un resizer que indique extraer en trozos. –

-1

La nueva biblioteca akka stream es la solución que está buscando:

import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.{Source, Sink} 

def iterFromQuery() : Iterator[Record] = { 
    val resultSet = statement.executeQuery(...) 
    new Iterator[Record] { 
    def hasNext = resultSet.next() 
    def next = new Record(...) 
    } 
} 

def cpuIntensiveFunction(record : Record) = { 
... 
} 

implicit val actorSystem = ActorSystem() 
implicit val materializer = ActorMaterializer() 
implicit val execContext = actorSystem.dispatcher 

val poolSize = 10 //number of Records in memory at once 

val stream = 
    Source(iterFromQuery).runWith(Sink.foreachParallel(poolSize)(cpuIntensiveFunction)) 

stream onComplete {_ => actorSystem.shutdown()} 
Cuestiones relacionadas