Estaba jugando con .par
y me pregunto si el siguiente cálculo se puede paralelizar para obtener ganancias de rendimiento o si hay otras formas de calcular el resultado más rápido. No creo que el resultado final dependa del orden de la agrupación, así que espero que haya ganancias adicionales posibles.Cómo paralelizar groupBy
object Test {
val data = (1 to 500000) map { i => (i % 100) -> (i % 10000) }
def mutableIndex = {
val map = collection.mutable.Map[Int, Set[Int]]().withDefaultValue(
Set[Int]())
for ((k, v) <- data) { map(k) = map(k) + v }
map
}
def immutableIndex = data.groupBy(_._1).map{ case (k, seq) =>
k -> seq.map(_._2).toSet
}
def immutableParIndex = data.par.groupBy(_._1).map{ case (k, seq) =>
k -> seq.map(_._2).toSet
}
def main(args: Array[String]) {
def bench(id: String)(block: => Unit) {
val times = (new testing.Benchmark { def run() = block }).runBenchmark(10)
println(id + " " + times + " sum: " + times.sum)
}
println("avail procs " + Runtime.getRuntime.availableProcessors)
bench("mutable"){ mutableIndex }
bench("immutable"){ immutableIndex }
bench("immutable par"){ immutableParIndex }
}
}
Correr Imprime este - 2.9.1 usando:
$ scalac -d classes -optimize A.scala
$ scala -cp classes Test
avail procs 4
mutable List(718, 343, 296, 297, 312, 312, 312, 312, 312, 312) sum: 3526
immutable List(312, 266, 266, 265, 265, 265, 265, 265, 249, 265) sum: 2683
immutable par List(546, 234, 234, 202, 187, 172, 188, 172, 187, 171) sum: 2293
Algunas notas:
- aunque el resultado anterior es bastante agradable, la versión paralela también es mucho más inconsistente dependiendo en las constantes que uso en
data
y la cantidad de iteración que configuro enbench
(a veces es menos eficiente que la secuencia). Me pregunto si se espera de colecciones paralelas. - mutable se hace más rápido que el conjunto se hace más pequeño (al disminuir el último módulo de datos)
- si mi referencia es defectuoso, que me haga saber cómo solucionarlo (por ejemplo, utilizo los mismos datos para todas las iteraciones, no estoy seguro si que sesga los resultados)
Editar: aquí hay una versión basada en hashMap concurrente y siguiendo el modelo del código de la biblioteca para groupBy
:
def syncIndex = {
import collection.mutable.Builder
import java.util.concurrent.ConcurrentHashMap
import collection.JavaConverters._
val m = new ConcurrentHashMap[Int, Builder[Int, Set[Int]]]().asScala
for ((k, v) <- data.par) {
val bldr = Set.newBuilder[Int]
m.putIfAbsent(k, bldr) match {
case Some(bldr) => bldr.synchronized(bldr += v)
case None => bldr.synchronized(bldr += v)
}
}
val b = Map.newBuilder[Int, Set[Int]]
for ((k, v) <- m)
b += ((k, v.result))
b.result
}
se seee ms para dar una buena velocidad en 2 núcleos pero no en 4.
interesante, voy a tener que tratar con los tiempos 1.7.0. He agregado una versión usando un hashmap concurrente en la pregunta. Fue más rápido en 2 núcleos pero más lento en 4. Tengo curiosidad por ver qué hará en 1.7.0. También noté que a veces en REPL obtuve resultados más rápidos que cuando ejecutaba el código compilado. – huynhjl
@huynhjl Creo que la máquina virtual que ejecuta su REPL se calentará hasta cierto punto, independientemente de lo que ejecute en él, y por lo tanto será más rápido que comenzar una nueva máquina virtual para hacer el benchmark desde el frío. Puede mostrar esto ejecutando un punto de referencia en el REPL, luego creando un nuevo objeto Test y ejecutándolo: para mí los tiempos fueron más rápidos que para la primera instancia. Además, debe experimentar con aumentar la memoria disponible de la VM. El primer punto de referencia que hice con un REPL "antiguo" fue como 10 veces más lento, antes de fallar con un error de falta de memoria. –
Cambiaremos la implementación 'groupBy' para usar las nuevas pruebas concurrentes de hash, probablemente en la próxima versión. Eso debería aumentar la escalabilidad. – axel22