2012-07-26 7 views
5

Aquí está mi escenario:¿El método Akka onReceive se ejecuta al mismo tiempo?

Tengo un actor principal, que recibe mensajes de múltiples actores secundarios. Estos mensajes contienen datos que se agregarán. En esta lógica de agregación, ¿debo ocuparme de los problemas de sincronización si utilizo una estructura de datos compartida para recopilar la agregación?

else if(arg0 instanceof ReducedMsg){ 

          ReducedMsg reduced = (ReducedMsg)arg0; 
     counter.decrementAndGet(); 

     synchronized(finalResult){ 

      finalResult.add((KeyValue<K, V>) reduced.getReduced()); 

      if(counter.get() == 0){ 
            if(checkAndReduce(finalResult)){ 

        finalResult.clear(); 
       } 
       else{ 
        stop(); 
        latch.countDown(); 
       } 

      } 

     } 



    } 

Así como se puede ver que tengo un finalResult, a la que se agregan cada mensaje, y después de una lógica de procesamiento de la colección debe ser limpiado también.

En realidad, lo que intento implementar es un mapreduce de reducción recursivo (asociativo). Entonces, ¿debo mantener el bloque sincronizado que supongo? ¿O es por casualidad que Akka ejecuta el subproceso onReceive a la vez?

Esta lógica produce resultados precisos y predecibles en pequeños conjuntos de datos. Mi problema es cuando mi conjunto de datos de entrada es un poco grande, el código se cuelga. Quiero estar seguro de que esto se debe a los interruptores de contexto para mi bloque de sincronización, de modo que pueda adoptar un diseño diferente.

Respuesta

14

onReceive() es nunca llamado concurrentemente. Esta es la garantía más fundamental que Akka te está dando.

Esto significa que si la variable de counter es un campo en un actor y ninguna otra pieza de código puede acceder a que el campo directamente, se puede utilizar con seguridad normales int/long en lugar de AtomicInteger/AtomicLong. Además, la sincronización en finalResult no es necesaria si se trata de un campo encapsulado y oculto en un actor.

Finalmente, el uso de CountDownLatch es sospechoso. En las aplicaciones Akka, no debe usar primitivas de sincronización. Los actores son esencialmente de un solo hilo y todas las comunicaciones (incluidos los datos de activación y transferencia) deben implementarse mediante el envío de mensajes.

esto es todo lo explicado en la documentación: http://doc.akka.io/docs/akka/2.0.2/general/jmm.html#Actors_and_the_Java_Memory_Model

+0

Gracias Tomasz. ¡Tu primera línea aclara muchas de mis dudas! Registre el uso del bloqueo, tengo que hacer eso para proporcionar una interfaz de cliente que debe esperar hasta que se complete el procesamiento del actor. Mi objetivo es desarrollar un framework java, que internamente usaría Akka/Scala para procesar. –

+0

@sutanudalui: puede llamar al actor * sincrónicamente *, lo que básicamente significa que Akka esperará la respuesta en alguna cola temporal. No es necesario hacer esto manualmente. Consulte los documentos de Akka sobre el patrón de mensaje 'ask' (en lugar de' tell'). –

+0

Bien. Voy a ser un poco más profundo. Tengo un enrutador round robin con N esclavos actores. Lo que pretendo es hacer un procesamiento paralelo y luego acumular el resultado. Entonces, el actor principal, al recibir cada entrada, se dirige a uno de los esclavos. Los esclavos, al procesar el mensaje, lo envían nuevamente al maestro que tiene que agregarse. Esta fase de agregación en la que estaba pensando en problemas de sincronización. Del enlace de doc provisto, veo que Akka no puede garantizar (¡bueno, no puedo adivinar!) Que la memoria compartida, en mi caso, el "resultado final" estará protegido. ¿Estoy entendiendo esto correctamente? –

Cuestiones relacionadas