2010-11-11 14 views
7

Tratando de entender cómo pensar en términos de actores en lugar de hilos. Estoy un poco perplejo en el siguiente caso de uso:Transición del modelo de hilo a los actores

Considere un sistema que tiene un proceso productor que crea trabajo (por ejemplo, leyendo datos de un archivo) y una cantidad de procesos de trabajo que consumen el trabajo (por ejemplo, al analizar los datos y escribirlos en una base de datos). Las velocidades a las que se produce y consume el trabajo pueden variar, y el sistema debe seguir siendo robusto. Por ejemplo, si los trabajadores no pueden mantenerse al día, el productor debe detectar esto y, finalmente, reducir la velocidad o esperar.

Esto es bastante fácil de implementar con hilos:

val producer:Iterator[Work] = createProducer() 
val queue = new LinkedBlockingQueue[Work](QUEUE_SIZE) 
val workers = (0 until NUM_WORKERS) map { i => 
    new Thread() { 
    override def run() = { 
     while (true) { 
     try { 
      // take next unit of work, waiting if necessary 
      val work = queue.take() 
      process(work) 
     } 
     catch { 
      case e:InterruptedException => return 
     } 
     } 
    } 
    } 
} 

// start the workers 
workers.foreach(_.start()) 

while (producer.hasNext) { 
    val work = producer.next() 
    // add new unit of work, waiting if necessary 
    queue.put(work) 
} 

while (!queue.isEmpty) { 
    // wait until queue is drained 
    queue.wait() 
} 

// stop the workers 
workers.foreach(_.interrupt()) 

No hay nada realmente mal con este modelo, y yo he utilizado con éxito antes. Este ejemplo es probablemente excesivamente detallado, ya que usar un Ejecutor o un Servidor de Completación encajaría bien con esta tarea. Pero me gusta la abstracción del actor, y creo que es más fácil razonar en muchos casos. ¿Hay alguna manera de reescribir este ejemplo utilizando actores, especialmente asegurándose de que no haya desbordamientos de búfer (por ejemplo, buzones de correo completos, mensajes eliminados, etc.)?

Respuesta

3

Dado que los actores procesan mensajes "sin conexión" (es decir, el consumo de mensajes no está relacionado con su recepción), es difícil ver cómo podría tener un análogo exacto del "productor espera a que los consumidores lo alcancen".

La única cosa que puedo pensar es que los consumidores solicitan el trabajo del actor productor (que utiliza reply):

case object MoreWorkPlease 
class Consumer(prod : Producer) extends Actor { 
    def act = { 
    prod ! MoreWorkPlease 
    loop { 
     react { 
     case Work(payload) => doStuff(payload); reply(MoreWorkPlease) 
     } 
    } 
    } 
} 

class Producer extends Actor { 
    def act = loop { 
    react { 
     case MoreWorkPlease => reply(Work(getNextItem)) 
    } 
    } 
} 

esto no es perfecto, por supuesto, porque el productor no lo hace "leer reenviar "y solo obtiene trabajo cuando un consumidor está listo para ello. El uso sería algo así como:

val prod = new Producer 
(1 to NUM_ACTORS).map(new Consumer(prod)).foreach(_.start()) 
prod.start() 
+0

Hrm, esa es una solución que había pensado. Probablemente sea suficiente, pero mi preocupación con esto es que si los trabajadores superan al productor, entonces la falta de un buffer de trabajo resulta en un rendimiento degradado. – toluju

+0

@toluju - Comience por hacer que todos los consumidores pidan trabajo, y haga que el productor _no_ reaccione a estos mensajes, pero recíbalos y póngalos en una cola si todavía no hay más trabajo por hacer. (Entonces, una vez que hay trabajo, puede dividirlo en elementos en la cola). –

Cuestiones relacionadas