2011-02-02 15 views
5

Supongamos que tengo un ExecutorService (que puede ser un grupo de subprocesos, por lo que hay concurrencia) que ejecuta una tarea varias veces, ya sea periódicamente o en respuesta a alguna otra condición. La tarea que se ejecutará es el siguiente:java: ejecutores + tareas + bloqueos

  • si esta tarea ya está en marcha, no hacer nada (y dejar que el acabado de la tarea a ejecutar previamente).
  • si esta tarea aún no está en curso, ejecute Algorithm X, que puede llevar mucho tiempo.

Estoy tratando de pensar en una forma de implementar esto. Debe ser algo así como:

Runnable task = new Runnable() { 
    final SomeObj inProgress = new SomeObj(); 
    @Override public void run() { 
     if (inProgress.acquire()) 
     { 
      try 
      { 
      algorithmX(); 
      } 
      finally 
      { 
      inProgress.release(); 
      } 
     } 
    } 
} 

// re-use this task object whenever scheduling the task with the executor 

donde SomeObj es o bien un ReentrantLock (= tryLock() adquirir y liberar = unlock()) o un AtomicBoolean o algo, pero no estoy seguro de qué. ¿Necesito un ReentrantLock aquí? (Tal vez quiero un bloqueo no reentrante en caso de que algorithmX() haga que esta tarea se ejecute de forma recursiva). ¿O sería suficiente un AtomicBoolean?


editar: para un bloqueo sin reentrada, ¿es esto apropiado?

Runnable task = new Runnable() { 
    boolean inProgress = false; 
    final private Object lock = new Object(); 
    /** try to acquire lock: set inProgress to true, 
    * return whether it was previously false 
    */ 
    private boolean acquire() { 
     synchronized(this.lock) 
     { 
     boolean result = !this.inProgress; 
     this.inProgress = true; 
     return result; 
     } 
    } 
    /** release lock */ 
    private void release() { 
     synchronized(this.lock) 
     { 
     this.inProgress = false; 
     } 
    } 
    @Override public void run() { 
     if (acquire()) 
     { 
      // nobody else is running! let's do algorithmX() 
      try 
      { 
      algorithmX(); 
      } 
      finally 
      { 
      release(); 
      } 
     } 
     /* otherwise, we are already in the process of 
     * running algorithmX(), in this thread or in another, 
     * so don't do anything, just return control to the caller. 
     */ 
    } 
} 
+0

Una recomendación: en lugar de presentar un código complejo y pedirle a alguien que lo valide, pruébelo usted mismo y luego, si encuentra un comportamiento que no comprende, formule una pregunta específica. –

+0

gracias ... No es que busque validar el código, es que tengo una situación en la que necesito ejecutar una tarea en serie en un sistema multiproceso, y no estoy seguro de cómo implementarlo correctamente de cara a problemas de concurrencia. El código que presenté es simplemente un intento de ejemplo. Si, en su lugar, debería utilizar una clase existente, me gustaría saberlo, porque estoy buscando qué pregunta exacta formular. –

+1

Mi experiencia con los problemas de simultaneidad es que, si bien puede probar el código para ver si hay errores obvios, no puede simplemente intentar verificar si tiene éxito, porque puede haber condiciones extrañas e improbables que mostrarán que su programa es incorrecto. , y puede que no sea posible reproducir esas condiciones bajo demanda. –

Respuesta

2

La implementación de bloqueo que sugiere es débil en el sentido de que sería bastante fácil para alguien usarla incorrectamente.

A continuación se muestra una implementación mucho más eficiente con las mismas debilidades uso inadecuado de la implementación:

AtomicBoolean inProgress = new AtomicBoolean(false) 
    /* Returns true if we acquired the lock */ 
    private boolean acquire() { 
     return inProgress.compareAndSet(false, true); 
    } 
    /** Always release lock without determining if we in fact hold it */ 
    private void release() { 
     inProgress.set(false); 
    } 
+0

"Cuando su método de adquisición devuelve verdadero significa que otra persona ya tiene el bloqueo y de hecho no pudo obtener el bloqueo". no, leí el código nuevamente, invertí el antiguo valor existente. Pero tienes una implementación mucho más simple, ¡gracias! –

+0

@Jason S, sí, lo siento, no vi eso. Aún así, operaciones CAS> sincronizando –

+0

¿Puedes explicar un poco más lo que quieres decir con "sería bastante fácil para alguien usar [el código] incorrectamente"? – user359996

0

ReentrantLock parece bien a mí. La única situación en la que me resultaría interesante crear manualmente un bloqueo usando AtomicInteger será si tienes un algorithmX realmente corto que no es tu caso.

+0

El problema con * ReentrantLock * es que al OP le preocupa que * algorithmX * pueda desencadenar la ejecución del tarea mientras ya está en ejecución. – user359996

2

Su primer bit de código se ve muy bien, pero si usted está preocupado por algorithmX recursiva invocando el tarea, le sugiero que utilice un java.util.concurrent.Semaphore como el objeto de sincronización, en lugar de un ReentrantLock. Por ejemplo:

Runnable task = new Runnable() { 
    final Semaphore lock = new Semaphore(1); 
    @Override public void run() { 
     if (lock.tryAcquire()) 
     { 
      try 
      { 
      algorithmX(); 
      } 
      finally 
      { 
      lock.release(); 
      } 
     } 
    } 
} 

Nota en particular, el uso de intentar adquirir. Si se produce un error en la adquisición, el algoritmo X no se ejecuta.

0

Creo que el secreto para elegir la impl de bloqueo correcta es esta: * si esta tarea ya está en curso, no haga nada (y deje que la tarea que se estaba ejecutando finalice).

¿Qué significa "no hacer nada" en este contexto? El subproceso debería bloquear y reintentar la ejecución después de ejecutar el algoritmo X finalizado ?. Si este es el caso semáforo.acquire en lugar de tryAcquire se debe utilizar y la solución AtomicBoolean no funcionará como se esperaba.

Cuestiones relacionadas