2010-02-22 8 views
17

que tienen un hilo de Java como la siguiente:Valores de retorno de Java Temas

public class MyThread extends Thread { 
     MyService service; 
     String id; 
     public MyThread(String id) { 
      this.id = node; 
     } 
     public void run() { 
      User user = service.getUser(id) 
     } 
    } 

Tengo cerca de 300 ids, y cada par de segundos - yo fuego hasta hilos para hacer un llamado a cada uno de los ID. P.ej.

for(String id: ids) { 
    MyThread thread = new MyThread(id); 
    thread.start(); 
} 

Ahora, me gustaría recoger los resultados de cada hilos, y hacer un inserto de lote a la base de datos, en vez de hacer 300 de base de datos inserta cada 2 segundos.

¿Alguna idea de cómo puedo lograr esto?

Respuesta

17

Si desea recopilar todos los resultados antes de hacer la actualización de la base de datos, puede utilizar el método invokeAll. Esto se ocupa de la contabilidad que se requeriría si envía tareas de una en una, como sugiere daveb.

private static final ExecutorService workers = Executors.newCachedThreadPool(); 

... 

Collection<Callable<User>> tasks = new ArrayList<Callable<User>>(); 
for (final String id : ids) { 
    tasks.add(new Callable<User>() 
    { 

    public User call() 
     throws Exception 
    { 
     return svc.getUser(id); 
    } 

    }); 
} 
/* invokeAll blocks until all service requests complete, 
* or a max of 10 seconds. */ 
List<Future<User>> results = workers.invokeAll(tasks, 10, TimeUnit.SECONDS); 
for (Future<User> f : results) { 
    User user = f.get(); 
    /* Add user to batch update. */ 
    ... 
} 
/* Commit batch. */ 
... 
+0

En mi caso, es posible que algunas de las llamadas de servicio nunca vuelvan o tarden demasiado en regresar. Así que 'invokeAll' junto con 'awaitTermination (long timeout)' parece ser el camino a seguir. Por lo tanto, estoy aceptando esta respuesta. Ojalá pudiera aceptar la respuesta de @ daveb también. – Langali

+0

En ese caso, puede usar una versión sobrecargada de 'invokeAll' que toma un parámetro de tiempo de espera. Actualizaré mi respuesta para mostrar cómo. – erickson

+0

Gracias. De alguna manera lo pasé por alto. – Langali

34

El enfoque canónico es usar un Callable y un ExecutorService. submit ting a Callable a ExecutorService devuelve un (tipo seguro) Future del que puede get el resultado.

class TaskAsCallable implements Callable<Result> { 
    @Override 
    public Result call() { 
     return a new Result() // this is where the work is done. 
    } 
} 

ExecutorService executor = Executors.newFixedThreadPool(300); 
Future<Result> task = executor.submit(new TaskAsCallable()); 
Result result = task.get(); // this blocks until result is ready 

En su caso, es probable que quieren usar invokeAll que devuelve un List de Futures, o crear esa lista a sí mismo a medida que agrega a las tareas que el ejecutor. Para recopilar resultados, simplemente llame al get en cada uno.

+0

Buen post, 1 para el detalle. – fastcodejava

1

Necesita almacenar el resultado en algo así como singleton. Esto tiene que estar correctamente sincronizado.

EDIT: Sé que este no es el mejor consejo, ya que no es una buena idea manejar raw Threads. Pero dada la pregunta, esto funcionará, ¿no? No puedo ser votada, pero ¿por qué votar abajo?

1

Puede crear una cola o lista que pase a los hilos que cree, los hilos añaden su resultado a la lista que se vacía por un consumidor que realiza la inserción del lote.

1

El enfoque más simple es pasar un objeto a cada hilo (un objeto por hilo) que contendrá el resultado más tarde. El hilo principal debe mantener una referencia a cada objeto de resultado. Cuando se unen todos los hilos, puede usar los resultados.

1
public class TopClass { 
    List<User> users = new ArrayList<User>(); 
    void addUser(User user) { 
     synchronized(users) { 
      users.add(user); 
     } 
    } 
    void store() throws SQLException { 
     //storing code goes here 
    } 
    class MyThread extends Thread { 
      MyService service; 
      String id; 
      public MyThread(String id) { 
       this.id = node; 
      } 
      public void run() { 
       User user = service.getUser(id) 
       addUser(user); 
      } 
     } 
} 
1

Puede hacer una clase que amplíe Observable. Entonces su hilo puede llamar a un método en la clase Observable que notificaría a cualquier clase que se registró en ese observador llamando a Observable.notifyObservers (Object).

La clase de observación implementará Observer y se registrará con el Observable. Luego implementaría un método de actualización (observable, objeto) que se llama cuando se llama a Observerable.notifyObservers (Object).

4

Almacene su resultado en su objeto. Cuando se complete, haga que caiga en una colección sincronizada (se le viene a la mente una cola sincronizada).

Cuando desee recoger los resultados para enviar, tome todo de la cola y lea los resultados de los objetos.Incluso puede hacer que cada objeto sepa cómo "publicar" sus propios resultados en la base de datos, de esta forma se pueden enviar diferentes clases y todas se manejan con el mismo bucle pequeño y elegante.

Hay un montón de herramientas en el JDK para ayudar con esto, pero es muy fácil una vez que comience a pensar en el hilo como un objeto real y no sólo un montón de basura en torno a un método de "funcionamiento". Una vez que comienzas a pensar en los objetos, la programación se vuelve mucho más simple y satisfactoria.

+0

+1. Mi idea inicial fue similar, ¡pero Java 5 lo hizo mucho más simple ahora! – Langali

+0

+1. Eso es hermoso. –

+0

Lo hice para un sistema que conectó un rango de direcciones clase B. Fue muy elegante y redujo el tiempo de horas a minutos. (Bueno, también cambió de un Ping a un SNMP get, Java realmente no es compatible con Ping) –

2

En Java8 hay una mejor manera de hacerlo usando CompletableFuture. Digamos que tenemos que llegar clase de Identificación de la base de datos, por razones de simplicidad sólo podemos devolver un número como abajo,

static class GenerateNumber implements Supplier<Integer>{ 

    private final int number; 

    GenerateNumber(int number){ 
     this.number = number; 
    } 
    @Override 
    public Integer get() { 
     try { 
      TimeUnit.SECONDS.sleep(1); 
     }catch (InterruptedException e){ 
      e.printStackTrace(); 
     } 
     return this.number; 
    } 
} 

Ahora podemos añadir el resultado a una colección simultánea una vez que los resultados de cada futuro está listo.

Collection<Integer> results = new ConcurrentLinkedQueue<>(); 
int tasks = 10; 
CompletableFuture<?>[] allFutures = new CompletableFuture[tasks]; 
for (int i = 0; i < tasks; i++) { 
    int temp = i; 
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()-> new GenerateNumber(temp).get(), executor); 
    allFutures[i] = future.thenAccept(results::add); 
} 

Ahora podemos añadir una devolución de llamada cuando todos los futuros están listos,

CompletableFuture.allOf(allFutures).thenAccept(c->{ 
    System.out.println(results); // do something with result 
});