2012-08-30 12 views
5

Necesito publicar mensajes de diferentes tipos para la secuencia de eventos, y esos mensajes deben tener prioridades diferentes, por ejemplo, si se han publicado 10 mensajes de tipo A, y un mensaje de tipo B se publica después de todo, y la prioridad de B es más alta que la prioridad de A - el mensaje B debe recogerse por el próximo actor incluso si hay 10 mensajes de tipo A en cola.Akka :: Uso de mensajes con diferentes prioridades sobre secuencia de eventos en ActorSystem

He leído acerca de los mensajes priorizados here y ha creado mi sencilla aplicación de ese buzón:

class PrioritizedMailbox(settings: Settings, cfg: Config) extends UnboundedPriorityMailbox(

    PriorityGenerator { 
     case ServerPermanentlyDead => println("Priority:0"); 0 
     case ServerDead => println("Priority:1"); 1 
     case _ => println("Default priority"); 10 
    } 

) 

luego de haberla configurado en application.conf

akka { 

    actor { 

     prio-dispatcher { 
      type = "Dispatcher" 
      mailbox-type = "mailbox.PrioritizedMailbox" 
     } 

    } 

} 

y por cable en mi actor:

private val myActor = actors.actorOf(
    Props[MyEventHandler[T]]. 
    withRouter(RoundRobinRouter(HIVE)). 
    withDispatcher("akka.actor.prio-dispatcher"). 
    withCreator(
    new Creator[Actor] { 
     def create() = new MyEventHandler(storage) 
    }), name = "eventHandler") 

Estoy usando ActorSystem.eventStream.publish in para enviar mensajes, y mi actor está suscrito (puedo ver en los registros que se procesan los mensajes, pero en orden FIFO).

Sin embargo, parece que no es suficiente, porque en los registros/consola nunca he visto los mensajes como "Prioridad predeterminada". ¿Me estoy perdiendo de algo? ¿El enfoque descrito funciona con secuencias de eventos o simplemente con invocaciones directas de enviando un mensaje al actor? ¿Y cómo obtengo mensajes priorizados con eventStream?

Respuesta

10

Su problema es que sus actores son increíblemente rápidos para que los mensajes se procesen antes de que tengan tiempo para hacer cola, por lo que no puede haber ninguna priorización hecha por el buzón de correo. El siguiente ejemplo demuestra el punto:

trait Foo 
    case object X extends Foo 
    case object Y extends Foo 
    case object Z extends Foo 

    class PrioritizedMailbox(settings: ActorSystem.Settings, cfg: Config) 
extends UnboundedPriorityMailbox( 
    PriorityGenerator { 
     case X ⇒ 0 
     case Y ⇒ 1 
     case Z ⇒ 2 
     case _ ⇒ 10 
    }) 

val s = ActorSystem("prio", com.typesafe.config.ConfigFactory.parseString( 
     """ prio-dispatcher { 
     type = "Dispatcher" 
      mailbox-type = "%s" 
     }""".format(classOf[PrioritizedMailbox].getName))) 
     val latch = new java.util.concurrent.CountDownLatch(1) 
     val a = s.actorOf(Props(new akka.actor.Actor { 
     latch.await // Just wait here so that the messages are queued up 
inside the mailbox 
     def receive = { 
      case any ⇒ /*println("Processing: " + any);*/ sender ! any 
     } 
     }).withDispatcher("prio-dispatcher")) 
     implicit val sender = testActor 
     a ! "pig" 
     a ! Y 
     a ! Z 
     a ! Y 
     a ! X 
     a ! Z 
     a ! X 
     a ! "dog" 

     latch.countDown() 

     Seq(X, X, Y, Y, Z, Z, "pig", "dog") foreach { x => expectMsg(x) } 
     s.shutdown() 

Esta prueba pasa con gran éxito

Cuestiones relacionadas