2011-09-10 34 views
5

Estaba jugando con .par y me pregunto si el siguiente cálculo se puede paralelizar para obtener ganancias de rendimiento o si hay otras formas de calcular el resultado más rápido. No creo que el resultado final dependa del orden de la agrupación, así que espero que haya ganancias adicionales posibles.Cómo paralelizar groupBy

object Test { 
    val data = (1 to 500000) map { i => (i % 100) -> (i % 10000) } 

    def mutableIndex = { 
    val map = collection.mutable.Map[Int, Set[Int]]().withDefaultValue(
     Set[Int]()) 
    for ((k, v) <- data) { map(k) = map(k) + v } 
    map 
    } 

    def immutableIndex = data.groupBy(_._1).map{ case (k, seq) => 
    k -> seq.map(_._2).toSet 
    } 

    def immutableParIndex = data.par.groupBy(_._1).map{ case (k, seq) => 
    k -> seq.map(_._2).toSet 
    } 

    def main(args: Array[String]) { 
    def bench(id: String)(block: => Unit) { 
     val times = (new testing.Benchmark { def run() = block }).runBenchmark(10) 
     println(id + " " + times + " sum: " + times.sum) 
    } 
    println("avail procs " + Runtime.getRuntime.availableProcessors) 
    bench("mutable"){ mutableIndex } 
    bench("immutable"){ immutableIndex } 
    bench("immutable par"){ immutableParIndex } 
    } 

} 

Correr Imprime este - 2.9.1 usando:

$ scalac -d classes -optimize A.scala 
$ scala -cp classes Test 
avail procs 4 
mutable List(718, 343, 296, 297, 312, 312, 312, 312, 312, 312) sum: 3526 
immutable List(312, 266, 266, 265, 265, 265, 265, 265, 249, 265) sum: 2683 
immutable par List(546, 234, 234, 202, 187, 172, 188, 172, 187, 171) sum: 2293 

Algunas notas:

  • aunque el resultado anterior es bastante agradable, la versión paralela también es mucho más inconsistente dependiendo en las constantes que uso en data y la cantidad de iteración que configuro en bench (a veces es menos eficiente que la secuencia). Me pregunto si se espera de colecciones paralelas.
  • mutable se hace más rápido que el conjunto se hace más pequeño (al disminuir el último módulo de datos)
  • si mi referencia es defectuoso, que me haga saber cómo solucionarlo (por ejemplo, utilizo los mismos datos para todas las iteraciones, no estoy seguro si que sesga los resultados)

Editar: aquí hay una versión basada en hashMap concurrente y siguiendo el modelo del código de la biblioteca para groupBy:

def syncIndex = { 
    import collection.mutable.Builder 
    import java.util.concurrent.ConcurrentHashMap 
    import collection.JavaConverters._ 
    val m = new ConcurrentHashMap[Int, Builder[Int, Set[Int]]]().asScala 
    for ((k, v) <- data.par) { 
    val bldr = Set.newBuilder[Int] 
    m.putIfAbsent(k, bldr) match { 
     case Some(bldr) => bldr.synchronized(bldr += v) 
     case None => bldr.synchronized(bldr += v) 
    } 
    } 
    val b = Map.newBuilder[Int, Set[Int]] 
    for ((k, v) <- m) 
    b += ((k, v.result)) 
    b.result 
} 

se seee ms para dar una buena velocidad en 2 núcleos pero no en 4.

Respuesta

2

No es realmente una respuesta a su pregunta, pero encontré que .par da una aceleración especialmente en el cliente de Hotspot (32-bit?), y no tanto en el servidor de Hotspot. Lo ejecuté en el REPL y el índice de referencia se acelera en las siguientes ejecuciones, dado que ya está calentado.

Observé el uso del procesador en el Administrador de tareas y para cada uno, y va de alrededor del 54% en las tareas no paralelizadas al 75% en paralelo.

Java 7 también proporciona un aumento de velocidad bastante considerable.

Bienvenido a Scala versión 2.9.0.1 (Java HotSpot (TM) Client VM, Java 1.6.0_22).

scala> Test.main(Array[String]()) 
avail procs 2 
mutable List(1303, 1086, 1058, 1132, 1071, 1068, 1035, 1037, 1036, 1032) sum: 10858 
immutable List(874, 872, 869, 856, 858, 857, 855, 855, 857, 849) sum: 8602 
immutable par List(688, 502, 482, 479, 480, 465, 473, 473, 471, 472) sum: 4985 

scala> Test.main(Array[String]()) 
avail procs 2 
mutable List(1015, 1025, 1090, 1026, 1011, 1021, 1014, 1017, 1011, 1015) sum: 10245 
immutable List(863, 868, 867, 865, 864, 883, 865, 863, 864, 864) sum: 8666 
immutable par List(466, 468, 463, 466, 466, 469, 470, 467, 478, 467) sum: 4680 

Bienvenido a Scala versión 2.9.0.1 (Java HotSpot (TM) de 64 bits del servidor VM, Java 1.6.0_22).

scala> Test.main(Array[String]()) 
avail procs 2 
mutable List(841, 360, 348, 338, 337, 338, 338, 342, 336, 336) sum: 3914 
immutable List(320, 303, 302, 300, 304, 302, 305, 299, 305, 299) sum: 3039 
immutable par List(521, 284, 244, 244, 232, 267, 209, 219, 231, 203) sum: 2654 

scala> Test.main(Array[String]()) 
avail procs 2 
mutable List(370, 393, 351, 342, 336, 343, 342, 340, 334, 340) sum: 3491 
immutable List(301, 301, 302, 305, 300, 299, 303, 305, 304, 301) sum: 3021 
immutable par List(207, 240, 201, 194, 204, 194, 197, 211, 207, 208) sum: 2063 

scala> Test.main(Array[String]()) 
avail procs 2 
mutable List(334, 336, 338, 339, 340, 338, 341, 334, 336, 340) sum: 3376 
immutable List(300, 303, 297, 301, 298, 305, 302, 304, 296, 296) sum: 3002 
immutable par List(194, 200, 190, 201, 192, 191, 195, 196, 202, 189) sum: 1950 

Bienvenido a Scala versión 2.9.0.1 (Java HotSpot (TM) de 64 bits del servidor VM, Java 1.7.0).

scala> Test.main(Array[String]()) 
avail procs 2 
mutable List(763, 258, 227, 235, 238, 279, 245, 227, 227, 243) sum: 2942 
immutable List(274, 233, 228, 235, 238, 247, 243, 229, 233, 245) sum: 2405 
immutable par List(635, 303, 261, 258, 217, 291, 204, 248, 219, 184) sum: 2820 

scala> Test.main(Array[String]()) 
avail procs 2 
mutable List(229, 229, 229, 230, 234, 226, 227, 227, 227, 232) sum: 2290 
immutable List(228, 247, 231, 234, 210, 210, 209, 211, 210, 210) sum: 2200 
immutable par List(173, 209, 160, 157, 158, 177, 179, 164, 163, 159) sum: 1699 

scala> Test.main(Array[String]()) 
avail procs 2 
mutable List(222, 218, 216, 214, 216, 215, 215, 219, 219, 218) sum: 2172 
immutable List(211, 210, 211, 211, 212, 215, 215, 210, 211, 210) sum: 2116 
immutable par List(161, 158, 168, 158, 156, 161, 150, 156, 163, 175) sum: 1606 
+0

interesante, voy a tener que tratar con los tiempos 1.7.0. He agregado una versión usando un hashmap concurrente en la pregunta. Fue más rápido en 2 núcleos pero más lento en 4. Tengo curiosidad por ver qué hará en 1.7.0. También noté que a veces en REPL obtuve resultados más rápidos que cuando ejecutaba el código compilado. – huynhjl

+0

@huynhjl Creo que la máquina virtual que ejecuta su REPL se calentará hasta cierto punto, independientemente de lo que ejecute en él, y por lo tanto será más rápido que comenzar una nueva máquina virtual para hacer el benchmark desde el frío. Puede mostrar esto ejecutando un punto de referencia en el REPL, luego creando un nuevo objeto Test y ejecutándolo: para mí los tiempos fueron más rápidos que para la primera instancia. Además, debe experimentar con aumentar la memoria disponible de la VM. El primer punto de referencia que hice con un REPL "antiguo" fue como 10 veces más lento, antes de fallar con un error de falta de memoria. –

+0

Cambiaremos la implementación 'groupBy' para usar las nuevas pruebas concurrentes de hash, probablemente en la próxima versión. Eso debería aumentar la escalabilidad. – axel22

0

El consejo general es utilizar la pinza para microbecnhmarking: https://github.com/sirthias/scala-benchmarking-template

Además, tener en cuenta que a veces par realiza la copia de la estructura inicial (al menos en 2.9.1, véase https://issues.scala-lang.org/browse/SI-4984), por ejemplo

`

scala> val data = (1L to 50000000) par (100) 
java.lang.OutOfMemoryError: Java heap space 
     at scala.math.Integral$class.mkNumericOps(Integral.scala:25) 
     at scala.math.Numeric$LongIsIntegral$.mkNumericOps(Numeric.scala:115) 
     at scala.collection.immutable.NumericRange.foreach(NumericRange.scala:75) 
     at scala.collection.Parallelizable$class.par(Parallelizable.scala:41) 
     at scala.collection.immutable.NumericRange.par(NumericRange.scala:42) 

`

+0

El enlace está abajo para mí. – huynhjl

+0

El método 'par' no debe copiar datos para el mapa inmutable predeterminado - el' inmutable.HashMap' - que es el tipo de 'datos'. – axel22