2011-11-02 14 views
7

Tengo un grupo de subprocesos fijos ExecutorService de ancho 10 y una lista de 100 Callable, cada uno esperando 20 segundos y registrando sus interrupciones.Java ExecutorService invokeAll() interrupción

Estoy llamando a invokeAll en esa lista en un hilo separado, e interrumpiendo casi inmediatamente este hilo. ExecutorService ejecución se interrumpe como se esperaba, pero el número real de interrupciones registradas por Callable s es mucho más de lo esperado 10 - alrededor de 20-40. ¿Por qué es así, si ExecutorService puede ejecutar no más de 10 hilos simultáneamente?

fuente completo: (Puede que tenga que ejecutar más de una vez debido a la concurrencia)

@Test 
public void interrupt3() throws Exception{ 
    int callableNum = 100; 
    int executorThreadNum = 10; 
    final AtomicInteger interruptCounter = new AtomicInteger(0); 
    final ExecutorService executorService = Executors.newFixedThreadPool(executorThreadNum); 
    final List <Callable <Object>> executeds = new ArrayList <Callable <Object>>(); 
    for (int i = 0; i < callableNum; ++i) { 
     executeds.add(new Waiter(interruptCounter)); 
    } 
    Thread watcher = new Thread(new Runnable() { 

     @Override 
     public void run(){ 
      try { 
       executorService.invokeAll(executeds); 
      } catch(InterruptedException ex) { 
       // NOOP 
      } 
     } 
    }); 
    watcher.start(); 
    Thread.sleep(200); 
    watcher.interrupt(); 
    Thread.sleep(200); 
    assertEquals(10, interruptCounter.get()); 
} 

// This class just waits for 20 seconds, recording it's interrupts 
private class Waiter implements Callable <Object> { 
    private AtomicInteger interruptCounter; 

    public Waiter(AtomicInteger interruptCounter){ 
     this.interruptCounter = interruptCounter; 
    } 

    @Override 
    public Object call() throws Exception{ 
     try { 
      Thread.sleep(20000); 
     } catch(InterruptedException ex) { 
      interruptCounter.getAndIncrement(); 
     } 
     return null; 
    } 
} 

Usando WinXP 32 bits, Oracle JRE 1.6.0_27 y junit4

+0

Hmm ... convirtiéndolo en un programa con un método principal, siempre recibo 10 ... (Java 7 en Windows) –

+0

Hecho lo mismo, tengo 37 (1.6.0_27, Windows XP). No tiene que probar Java 7, ¿alguien puede confirmarlo? –

+0

Lo intentaré en el trabajo. Tal vez sea un error de Java 6 ... –

Respuesta

4

estoy de acuerdo con la hipótesis que solo deberías recibir 10 interrupciones.

Assume the CPU has 1 core. 
1. Main thread starts Watcher and sleeps 
2. Watcher starts and adds 100 Waiters then blocks 
3. Waiter 1-10 start and sleep in sequence 
4. Main wakes and interrupts Watcher then sleeps 
5. Watcher cancels Waiter 1-5 then is yielded by the OS (now we have 5 interrupts) 
6. Waiter 11-13 start and sleep 
7. Watcher cancels Waiter 6-20 then is yielded by the OS (now we have 13 interrupts) 
8. Waiter 14-20 are "started" resulting in a no-op 
9. Waiter 21-24 start and sleep 
.... 

En esencia, mi argumento es que no hay ninguna garantía de que el hilo Vigía se pueda cancelar todos los 100 casos RunnableFuture "Camarero" antes de que tenga que ceder el segmento de tiempo y permitir que los subprocesos de trabajo de la ExecutorService para iniciar más Tareas de camarero

Actualización: Mostrando código de AbstractExecutorService

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 
    throws InterruptedException { 
    if (tasks == null) 
     throw new NullPointerException(); 
    List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); 
    boolean done = false; 
    try { 
     for (Callable<T> t : tasks) { 
      RunnableFuture<T> f = newTaskFor(t); 
      futures.add(f); 
      execute(f); 
     } 
     for (Future<T> f : futures) { 
      if (!f.isDone()) { 
       try { 
        f.get(); //If interrupted, this is where the InterruptedException will be thrown from 
       } catch (CancellationException ignore) { 
       } catch (ExecutionException ignore) { 
       } 
      } 
     } 
     done = true; 
     return futures; 
    } finally { 
     if (!done) 
      for (Future<T> f : futures) 
       f.cancel(true); //Specifying "true" is what allows an interrupt to be sent to the ExecutorService's worker threads 
    } 
} 

El bloque finally que contiene f.cancel(true) es cuando la interrupción se propaga a la tarea que se está ejecutando actualmente. Como puede ver, este es un ciclo cerrado, pero no hay garantía de que el hilo que ejecuta el ciclo pueda iterar a través de todas las instancias de Future en un segmento de tiempo.

+0

Entonces, ¿dices que la interrupción de 'invokeAll()' no implica la cancelación inmediata de todas las tareas en cola antes de la interrupción de las que se están ejecutando? Para mí, parece un principio de ruptura directa de menos asombro. –

+1

Correcto. Los hilos del trabajador que procesan las tareas son distintos del hilo que ejecuta 'invokeAll()'. Llamar a la interrupción de un hilo no implica ningún otro hilo debe ser interrumpido, por lo que no me sorprende en absoluto que pueda recibir más de 10 interrupciones de hilos de trabajo en ocasiones.Como mencioné en el código anotado que publiqué, una interrupción solo se envía al hilo de trabajo que procesa la tarea como una virtud del argumento booleano pasado al método 'Future.cancel'. –

0
PowerMock.mockStatic (Executors.class); 
EasyMock.expect (Executors.newFixedThreadPool (9)).andReturn (executorService); 

Future<MyObject> callableMock = (Future<MyObject>) 
EasyMock.createMock (Future.class); 
EasyMock.expect (callableMock.get (EasyMock.anyLong(), EasyMock.isA (TimeUnit.class))).andReturn (ccs).anyTimes(); 

List<Future<MyObject>> futures = new ArrayList<Future<MyObject>>(); 
futures.add (callableMock); 
EasyMock.expect (executorService.invokeAll (EasyMock.isA (List.class))).andReturn (futures).anyTimes(); 

executorService.shutdown(); 
EasyMock.expectLastCall().anyTimes(); 

EasyMock.expect (mock.getMethodCall ()).andReturn (result).anyTimes(); 

PowerMock.replayAll(); 
EasyMock.replay (callableMock, executorService, mock); 

Assert.assertEquals (" ", answer.get (0)); 
PowerMock.verifyAll(); 
1

Si usted quiere lograr mismo comportamiento

ArrayList<Runnable> runnables = new ArrayList<Runnable>(); 
    executorService.getQueue().drainTo(runnables); 

La adición de este bloque antes de interrumpir el subprocesos.

Vaciará toda la cola de espera en una nueva lista.

Por lo tanto, solo interrumpirá la ejecución de subprocesos.

Cuestiones relacionadas