2012-05-04 28 views
15

Tengo varios hilos de consumo esperando en un CountDownLatch del tamaño 1 usando await(). Tengo un solo hilo productor que llama al countDown() cuando termina con éxito.¿Cómo puedo "cancelar" un CountDownLatch?

Esto funciona muy bien cuando no hay errores.

Sin embargo, si el productor detecta un error, me gustaría que pueda señalar el error a los hilos del consumidor. Idealmente, podría hacer que el productor llame al abortCountDown() y que todos los consumidores reciban una excepción interrumpida u otra excepción. No deseo llamar al countDown(), porque esto requiere que todos mis hilos de consumo realicen una verificación manual adicional para verificar el éxito después de llamar al await(). Prefiero que reciban una excepción, que ya saben cómo manejar.

Sé que una instalación de aborto no está disponible en CountDownLatch. ¿Existe otra primitiva de sincronización que pueda adaptar fácilmente para crear efectivamente un CountDownLatch que admita abortar la cuenta atrás?

Respuesta

11

JB Nizet tenía una gran respuesta. Tomé la suya y la pulí un poco. El resultado es una subclase de CountDownLatch llamada AbortableCountDownLatch, que agrega un método "abort()" a la clase que hará que todos los subprocesos que esperan en el enganche reciban una AbortException (una subclase de InterruptedException).

Además, a diferencia de la clase de JB, AbortableCountDownLatch abortará todos los hilos de bloqueo inmediatamente en un aborto, en lugar de esperar a que la cuenta regresiva llegue a cero (para situaciones en las que usa un recuento> 1).

import java.util.concurrent.CountDownLatch; 
import java.util.concurrent.TimeUnit; 

public class AbortableCountDownLatch extends CountDownLatch { 
    protected boolean aborted = false; 

    public AbortableCountDownLatch(int count) { 
     super(count); 
    } 


    /** 
    * Unblocks all threads waiting on this latch and cause them to receive an 
    * AbortedException. If the latch has already counted all the way down, 
    * this method does nothing. 
    */ 
    public void abort() { 
     if(getCount()==0) 
      return; 

     this.aborted = true; 
     while(getCount()>0) 
      countDown(); 
    } 


    @Override 
    public boolean await(long timeout, TimeUnit unit) throws InterruptedException { 
     final boolean rtrn = super.await(timeout,unit); 
     if (aborted) 
      throw new AbortedException(); 
     return rtrn; 
    } 

    @Override 
    public void await() throws InterruptedException { 
     super.await(); 
     if (aborted) 
      throw new AbortedException(); 
    } 


    public static class AbortedException extends InterruptedException { 
     public AbortedException() { 
     } 

     public AbortedException(String detailMessage) { 
      super(detailMessage); 
     } 
    } 
} 
+0

Cómo utilizar esta clase, Mi situación es Tengo una lista y la lista se actualiza dinámicamente en tiempo real. Hay una lista de eventos automáticos en la que tengo que esperar 2 minutos, pero si entre el tiempo de espera el evento del Manual del Usuario entra en la lista, tengo que interrumpir la espera y proceder de inmediato. –

12

Encapsulate este comportamiento dentro de una clase específica, de nivel superior, mediante el CountDownLatch internamente:

public class MyLatch { 
    private CountDownLatch latch; 
    private boolean aborted; 
    ... 

    // called by consumers 
    public void await() throws AbortedException { 
     latch.await(); 
     if (aborted) { 
      throw new AbortedException(); 
     } 
    } 

    // called by producer 
    public void abort() { 
     this.aborted = true; 
     latch.countDown(); 
    } 

    // called by producer 
    public void succeed() { 
     latch.countDown(); 
    } 
} 
+1

Para que este thread-safe, no debe 'abortar' ser' volátil'? –

+5

No, porque la seguridad de subprocesos está asegurada por CountDownLatch: el método countDown asegura que haya una barrera de memoria entre los subprocesos. El javadoc dice: "Las acciones en un hilo antes de llamar a CountDown() ocurren -antes de las acciones después de un retorno exitoso de un await correspondiente()" –

+0

Sí, ahora recuerdo haber leído eso (hace un tiempo). Gracias. –

3

Puede crear una envoltura alrededor de CountDownLatch que proporciona la capacidad de cancelar los camareros. Tendrá que rastrear los hilos de espera y soltarlos cuando se agote el tiempo de espera, así como recordar que el enganche se canceló, por lo que las llamadas futuras al await se interrumpirán inmediatamente.

public class CancellableCountDownLatch 
{ 
    final CountDownLatch latch; 
    final List<Thread> waiters; 
    boolean cancelled = false; 

    public CancellableCountDownLatch(int count) { 
     latch = new CountDownLatch(count); 
     waiters = new ArrayList<Thread>(); 
    } 

    public void await() throws InterruptedException { 
     try { 
      addWaiter(); 
      latch.await(); 
     } 
     finally { 
      removeWaiter(); 
     } 
    } 

    public boolean await(long timeout, TimeUnit unit) throws InterruptedException { 
     try { 
      addWaiter(); 
      return latch.await(timeout, unit); 
     } 
     finally { 
      removeWaiter(); 
     } 
    } 

    private synchronized void addWaiter() throws InterruptedException { 
     if (cancelled) { 
      Thread.currentThread().interrupt(); 
      throw new InterruptedException("Latch has already been cancelled"); 
     } 
     waiters.add(Thread.currentThread()); 
    } 

    private synchronized void removeWaiter() { 
     waiters.remove(Thread.currentThread()); 
    } 

    public void countDown() { 
     latch.countDown(); 
    } 

    public synchronized void cancel() { 
     if (!cancelled) { 
      cancelled = true; 
      for (Thread waiter : waiters) { 
       waiter.interrupt(); 
      } 
      waiters.clear(); 
     } 
    } 

    public long getCount() { 
     return latch.getCount(); 
    } 

    @Override 
    public String toString() { 
     return latch.toString(); 
    } 
} 
+0

¿Puede explicar con ejemplos cómo usar el mismo? Mi situación es que tengo una lista y la lista se actualiza dinámicamente en tiempo real. Hay una lista de eventos automáticos en la que tengo que esperar 2 minutos, pero si entre el tiempo de espera el evento del Manual del Usuario entra en la lista, tengo que interrumpir la espera y proceder de inmediato. –

0

Se podría rodar su propia CountDownLatch a cabo utilizando un ReentrantLock que permite el acceso a su getWaitingThreads método protegido.

Ejemplo:

public class FailableCountDownLatch { 
    private static class ConditionReentrantLock extends ReentrantLock { 
     private static final long serialVersionUID = 2974195457854549498L; 

     @Override 
     public Collection<Thread> getWaitingThreads(Condition c) { 
      return super.getWaitingThreads(c); 
     } 
    } 

    private final ConditionReentrantLock lock = new ConditionReentrantLock(); 
    private final Condition countIsZero = lock.newCondition(); 
    private long count; 

    public FailableCountDownLatch(long count) { 
     this.count = count; 
    } 

    public void await() throws InterruptedException { 
     lock.lock(); 
     try { 
      if (getCount() > 0) { 
       countIsZero.await(); 
      } 
     } finally { 
      lock.unlock(); 
     } 
    } 

    public boolean await(long time, TimeUnit unit) throws InterruptedException { 
     lock.lock(); 
     try { 
      if (getCount() > 0) { 
       return countIsZero.await(time, unit); 
      } 
     } finally { 
      lock.unlock(); 
     } 
     return true; 
    } 

    public long getCount() { 
     lock.lock(); 
     try { 
      return count; 
     } finally { 
      lock.unlock(); 
     } 
    } 

    public void countDown() { 
     lock.lock(); 
     try { 
      if (count > 0) { 
       count--; 

       if (count == 0) { 
        countIsZero.signalAll(); 
       } 
      } 
     } finally { 
      lock.unlock(); 
     } 
    } 

    public void abortCountDown() { 
     lock.lock(); 
     try { 
      for (Thread t : lock.getWaitingThreads(countIsZero)) { 
       t.interrupt(); 
      } 
     } finally { 
      lock.unlock(); 
     } 
    } 
} 

Es posible que desee cambiar esta clase para lanzar una InterruptedException de nuevas llamadas a await después de que haya sido cancelado. Incluso podría tener esta clase ampliar CountDownLatch si necesitara esa funcionalidad.

Cuestiones relacionadas