2011-10-11 17 views
34

Tengo un archivo .csv que contiene más de 70 millones de líneas de las cuales cada línea generará un Runnable y luego se ejecutará por threadpool. Este Runnable insertará un registro en Mysql.¿Cómo se puede reutilizar un threadpool después del cierre

Además, quiero registrar una posición del archivo csv para RandomAccessFile para localizar. La posición se escribe en un archivo . Quiero escribir este registro cuando todos los subprocesos en el subproceso se terminan. Se invoca ThreadPoolExecutor.shutdown(). Pero cuando lleguen más líneas, necesito un subproceso de nuevo. ¿Cómo puedo reutilizar este grupo de temas actual en lugar de crear uno nuevo?

El código es el siguiente:

public static boolean processPage() throws Exception { 

    long pos = getPosition(); 
    long start = System.currentTimeMillis(); 
    raf.seek(pos); 
    if(pos==0) 
     raf.readLine(); 
    for (int i = 0; i < PAGESIZE; i++) { 
     String lineStr = raf.readLine(); 
     if (lineStr == null) 
      return false; 
     String[] line = lineStr.split(","); 
     final ExperienceLogDO log = CsvExperienceLog.generateLog(line); 
     //System.out.println("userId: "+log.getUserId()%512); 

     pool.execute(new Runnable(){ 
      public void run(){ 
       try { 
        experienceService.insertExperienceLog(log); 
       } catch (BaseException e) { 
        e.printStackTrace(); 
       } 
      } 
     }); 

     long end = System.currentTimeMillis(); 
    } 

    BufferedWriter resultWriter = new BufferedWriter(
      new OutputStreamWriter(new FileOutputStream(new File(
        RESULT_FILENAME), true))); 
    resultWriter.write("\n"); 
    resultWriter.write(String.valueOf(raf.getFilePointer())); 
    resultWriter.close(); 
    long time = System.currentTimeMillis()-start; 
    System.out.println(time); 
    return true; 
} 

Gracias!

Respuesta

31

Como se indica en la documentation, no se puede volver a utilizar una ExecutorService que ha sido cerrado. Recomiendo contra cualquier soluciones, ya que (a) es posible que no funcionen como se esperaba en todas las situaciones; y (b) puede lograr lo que quiere utilizando clases estándar.

Debe ya sea

  1. instantiate un nuevo ExecutorService; o

  2. no termina el ExecutorService.

La primera solución se implementa fácilmente, por lo que no la detallaré.

Por el segundo, dado que desea ejecutar una acción una vez que todas las tareas enviadas hayan finalizado, puede echar un vistazo al ExecutorCompletionService y usarlo en su lugar. Se coloca una ExecutorService que hará la gestión de hilos, pero los runnables conseguirá envuelto en algo que le dirá al ExecutorCompletionService cuando han terminado, por lo que puede informar a usted:

ExecutorService executor = ...; 
ExecutorCompletionService ecs = new ExecutorCompletionService(executor); 

for (int i = 0; i < totalTasks; i++) { 
    ... ecs.submit(...); ... 
} 

for (int i = 0; i < totalTasks; i++) { 
    ecs.take(); 
} 

El método take() en el ExecutorCompletionService la clase se bloqueará hasta que una tarea haya terminado (ya sea de forma normal o abrupta). Devolverá un Future, por lo que puede verificar los resultados si lo desea.

Espero que esto pueda ayudarlo, ya que no entendí completamente su problema.

+0

¿Cómo fuerzo detener los hilos o cancelar todas las tareas? usando exectuer.shutdownNow()? ¿Funcionará bien incluso con este envoltorio? –

3

crear y agrupar todas las tareas y someterlos a la piscina con invokeAll (que sólo regresa cuando todas las tareas se han completado con éxito)

+0

¿Cuál es el mejor enfoque para usar 'ExecutorCompletionService' o' invokeAll'? –

1

Después de cerrar la llamada en un ExecutorService, no new Task will be accepted. Esto debe crear un nuevo ExecutorService para cada ronda de tareas.

Sin embargo, con Java 8 ForkJoinPool.awaitQuiescence se introdujo. Si puede cambiar de un ExecutorService normal a ForkJoinPool, puede usar este método para esperar hasta que no se ejecuten más tareas en ForkJoinPool sin tener que llamar al apagado. Esto significa que puede llenar un ForkJoinPool con Tareas, esperar hasta que esté vacío (en reposo) y luego comenzar a llenarlo con Tareas, y así sucesivamente.

Cuestiones relacionadas