2010-11-08 9 views
5

Esto está utilizando Scala 2.8 Actors. Tengo un trabajo de larga duración que se puede paralelizar. Consiste en alrededor de 650,000 unidades de trabajo. Divido en 2600 diferentes subtareas independientes, y para cada uno de ellos se crea un nuevo actor:¿Cómo prevenir la inanición de los actores en presencia de otros actores veteranos?

actor { 
    val range = (0L to total by limit) 
    val latch = new CountDownLatch(range.length) 
    range.foreach { offset => 
    actor { 
     doExpensiveStuff(offset,limit) 
     latch.countDown 
    } 
    } 
    latch.await 
} 

Esto funciona bastante bien, pero en general tarda de 2 + h para completar. El problema es que, mientras tanto, cualquier otro actor que cree para realizar tareas normales parece estar muerto de hambre por los 2600 actores iniciales que también esperan pacientemente su tiempo para ejecutar un hilo, pero han esperado más que cualquier nuevo actor que venir también.

¿Cómo puedo evitar esta inanición?

pensamientos iniciales:

  • En lugar de utilizar 2600 actores, un actor que secuencialmente arados a través de la gran pila de trabajo. No me gusta esto porque me gustaría que este trabajo termine antes dividiéndolo.
  • En lugar de 2600 actores, usa dos actores, cada uno procesando una mitad diferente del conjunto de trabajo total. Esto podría funcionar mejor, pero ¿y si mi máquina tiene 8 núcleos? Probablemente quiera utilizar más que eso.

ACTUALIZACIÓN

Algunas personas han cuestionado el uso de agentes a todos, sobre todo porque la capacidad de paso de mensajes no estaba siendo utilizada dentro de los trabajadores. Había asumido que el Actor era una abstracción muy liviana alrededor de un ThreadPool en el mismo nivel de rendimiento o cerca del mismo, simplemente codificando manualmente la ejecución basada en ThreadPool. Así que escribí un poco de referencia:

import testing._ 
import java.util.concurrent._ 
import actors.Futures._ 

val count = 100000 
val poolSize = 4 
val numRuns = 100 

val ActorTest = new Benchmark { 
    def run = { 
    (1 to count).map(i => future { 
     i * i 
    }).foreach(_()) 
    } 
} 

val ThreadPoolTest = new Benchmark { 
    def run = { 
    val queue = new LinkedBlockingQueue[Runnable] 
    val pool = new ThreadPoolExecutor(
      poolSize, poolSize, 1, TimeUnit.SECONDS, queue) 
    val latch = new CountDownLatch(count) 
    (1 to count).map(i => pool.execute(new Runnable { 
     override def run = { 
     i * i 
     latch.countDown 
     } 
    })) 
    latch.await 
    } 
} 

List(ActorTest,ThreadPoolTest).map { b => 
    b.runBenchmark(numRuns).sum.toDouble/numRuns 
} 

// List[Double] = List(545.45, 44.35) 

he utilizado la abstracción en el futuro ActorTest para evitar pasar un mensaje a otro actor para señalar el trabajo que se hizo. Me sorprendió descubrir que mi código Actor era 10 veces más lento. Tenga en cuenta que también creé mi ThreadPoolExecutor con un tamaño de grupo inicial con el que se crea el grupo de actores predeterminado.

Mirando hacia atrás, parece que he abusado de la abstracción Actor. Voy a considerar el uso de ThreadPools por separado para estas tareas distintas, costosas y de larga ejecución.

+0

Nada sobre el problema como se describe necesita actores en absoluto. Como solo está dividiendo el trabajo en una cantidad de fragmentos idénticos, puede usar futuros: vea mi respuesta debajo de –

Respuesta

6

No importa cuántos actores que tenga, si usted no está de configurar su programación de forma explícita, todos ellos están respaldados con una sola tenedor /unirse programador (que se ejecuta contra un grupo de subprocesos con una capacidad 4, si no estoy equivocado). De ahí viene el hambre.

  1. Usted debe tratar diferentes programadores para su piscina de actores, para encontrar el que muestra el mejor rendimiento (tratar ResizableThreadPoolScheduler, si se desea maximizar el paralelismo usando tanto hilos como sea posible)
  2. Tienes que un planificador separado para el gran grupo de actores (otros actores en su sistema no deberían usarlo)
  3. Como lo sugirió @DaGGeRRz, puede probar el framework Akka que ofrece despachadores configurables (por ejemplo, despachador de equilibrio de carga, despachador de movimientos, eventos de los buzones de los actores ocupados a los actores ociosos)

De los comentarios a los valores de Actor aplicación:

El sistema de tiempo de ejecución se puede configurar usar un tamaño más grande grupo de subprocesos (por ejemplo , estableciendo la propiedad actors.corePoolSize JVM). El método de la característica Actor scheduler puede ser anulado para devolver un ResizableThreadPoolScheduler, que cambia el tamaño de su grupo de subprocesos para evitar inanición causada por actores que invocar métodos de bloqueo arbitrarias. La actors.enableForkJoin propiedad JVM se puede establecer en false, en cuyo caso se utiliza ResizableThreadPoolScheduler de forma predeterminada para ejecutar actores.

Además: un interesante thread on schedulers en scala-lang.

+2

Vasil tiene razón sobre el uso de subprocesos. Intuí incorrectamente que los actores creados con la forma abreviada de bloque de hilos generaban un hilo por actor, pero como él dice, todos se ejecutan desde el grupo de subprocesos de Scala Actor. Eliminar mi respuesta, ya que Vasil lo cubre mejor. – DaGGeRRz

+0

Gracias Vasil. Decidí ir con un hilo de trabajo (ver editar a OP) para el rendimiento a la luz del hecho de que realmente no necesitaba usar actores en este caso. – Collin

3

No he usado actores con esa sintaxis, pero de manera predeterminada, creo que todos los actores en scala usan un grupo de hilos.

Ver How to designate a thread pool for actors

+0

Sí, quiere evitar que los 2600 actores trabajadores pasen hambre con los demás que realmente necesita para ponerlos en grupos de hilos separados. –

4

Por su ejemplo, parece que en realidad no necesita usar actores en absoluto, ya que no está pasando mensajes a sus unidades de trabajo, ni respondiendo, ni siquiera haciendo bucles.

¿Por qué no crear una carga de Future sy esperar a que terminen? De esta manera, el Tenedor subyacente Ingreso piscina está completamente libre de decidir sobre el nivel adecuado de paralelismo (es decir, # de hilos) para su sistema:

import actors.Futures._ 
def mkFuture(i : Int) = future { 
    doExpensiveStuff(i, limit) 
} 
val fs = (1 to range by limit).map(mkFuture) 
awaitAll(timeout, fs) //wait on the work all finishing 

Tenga en cuenta que sólo se va a beneficio del paralelismo de procesamiento más tareas al mismo tiempo que su sistema tiene núcleos si el trabajo costoso no está vinculado a la CPU (tal vez es IO obligado).

+0

Futuros en scala.actors.Futures son solo abstracciones sobre los actores, por lo que al final se obtiene el mismo problema. El conjunto inicial consume todos los hilos en el grupo y los otros se mueren de hambre. Si tiene tareas con características de comportamiento dramáticamente diferentes (por ejemplo, correr muy largo contra correr muy poco) es una buena idea separarlas.Puedo imaginar un grupo de hilos más inteligente que se dividiría automáticamente, pero no conozco ninguno. –

+1

¿No es un actor algo tan pesado para crear solo para ejecutar algo en el grupo de subprocesos subyacente? –

+0

Gracias por la respuesta. De hecho, había probado futuros como una alternativa antes de publicar, pero encontré el mismo comportamiento de inanición debido a la relación con los actores, como señaló Erik. – Collin

Cuestiones relacionadas