2011-08-31 16 views
19

Estoy ejecutando un proceso en un hilo separado con un tiempo de espera, usando un ExecutorService y un Future (código de ejemplo here) (el hilo "spawning" tiene lugar en un Aspecto AOP)Propagando ThreadLocal a un nuevo Thread obtenido de un ExecutorService

Ahora, el hilo principal es una solicitud Resteasy. Resteasy usa una o más variables de ThreadLocal para almacenar cierta información de contexto que necesito recuperar en algún momento de mi llamada al método Rest. El problema es que, dado que el hilo Resteasy se está ejecutando en un nuevo hilo, las variables de ThreadLocal se pierden.

¿Cuál sería la mejor manera de "propagar" cualquier variable ThreadLocal utilizada por Resteasy al nuevo hilo? Parece que Resteasy usa más de una variable ThreadLocal para realizar un seguimiento de la información de contexto y me gustaría transferir "ciegamente" toda la información al nuevo hilo.

He examinado la subclase ThreadPoolExecutor y el método beforeExecute para pasar el hilo actual al grupo, pero no pude encontrar una forma de pasar las variables ThreadLocal al grupo.

¿Alguna sugerencia?

Gracias

+0

Podría volver a escribir el segundo párrafo ligeramente? Me confunde Además, ¿qué pasa con beforeExecute? ¿No puede hacer que funcione correctamente o se da cuenta de que no puede satisfacer sus necesidades? – toto2

Respuesta

13

El conjunto de ThreadLocal casos asociados con un hilo se llevan a cabo en los miembros privados de cada Thread. Su única posibilidad de enumerar estos es hacer una reflexión sobre el Thread; de esta manera, puede anular las restricciones de acceso en los campos del hilo.

Una vez que se puede obtener el conjunto de ThreadLocal, puede copiar en los subprocesos de fondo utilizando los beforeExecute() y afterExecute() ganchos de ThreadPoolExecutor, o mediante la creación de un envoltorio Runnable para sus tareas que intercepta la llamada run() para establecer un desarmar el necesario ThreadLocal instancias. En realidad, la última técnica podría funcionar mejor, ya que le daría un lugar conveniente para almacenar los valores ThreadLocal en el momento en que la tarea está en cola.


Actualización: Aquí hay un ejemplo más concreto de la segunda aproximación. A diferencia de mi descripción original, todo lo que se almacena en el contenedor es el hilo de llamada, que se interroga cuando se ejecuta la tarea.

static Runnable wrap(Runnable task) 
{ 
    Thread caller = Thread.currentThread(); 
    return() -> { 
    Iterable<ThreadLocal<?>> vars = copy(caller); 
    try { 
     task.run(); 
    } 
    finally { 
     for (ThreadLocal<?> var : vars) 
     var.remove(); 
    } 
    }; 
} 

/** 
* For each {@code ThreadLocal} in the specified thread, copy the thread's 
* value to the current thread. 
* 
* @param caller the calling thread 
* @return all of the {@code ThreadLocal} instances that are set on current thread 
*/ 
private static Collection<ThreadLocal<?>> copy(Thread caller) 
{ 
    /* Use a nasty bunch of reflection to do this. */ 
    throw new UnsupportedOperationException(); 
} 
+0

¿Podría dar un ejemplo de cómo funcionaría esta última técnica? – Viraj

+0

@Viraj Agregué un código. ¿Eso ayuda? – erickson

+0

Sí. Seguro. – Viraj

2

Como entiendo su problema, puede echar un vistazo a InheritableThreadLocal que está destinado a pasar ThreadLocal las variables de contexto hilo de Padres de hilo hijo Contexto

+15

No funciona, antes que nada, el OP no tiene control sobre la creación de 'ThreadLocal' en la biblioteca de terceros. En segundo lugar, 'ExecutorService' reutiliza hilos, mientras que' InheritableThreadLocal' solo funciona cuando genera un hilo nuevo directamente. –

-1

Si nos fijamos en el código ThreadLocal se puede ver :

public T get() { 
     Thread t = Thread.currentThread(); 
     ... 
    } 

no se puede sobrescribir el hilo actual.

soluciones posibles:

  1. mirada a Java 7 tenedor/unirse mecanismo (pero creo que es una mala manera)

  2. Mira endorsed mecanismo para sobrescribir ThreadLocal clase en su JVM.

  3. Trate de reescribir RESTEasy (se puede utilizar herramientas Refactor en su IDE para reemplazar todo el uso ThreadLocal, se parezca fácil)

1

Basado en @erickson answer Escribí este código. Está funcionando para inheritableThreadLocals. Construye una lista de inheritableThreadLocals usando el mismo método que se usa en Thread contructor. Por supuesto que uso el reflejo para hacer esto. También anulo la clase ejecutor.

public class MyThreadPoolExecutor extends ThreadPoolExecutor 
{ 
    @Override 
    public void execute(Runnable command) 
    { 
     super.execute(new Wrapped(command, Thread.currentThread())); 
    } 
} 

Wrapper:

private class Wrapped implements Runnable 
    { 
     private final Runnable task; 

     private final Thread caller; 

     public Wrapped(Runnable task, Thread caller) 
     { 
     this.task = task; 
     this.caller = caller; 
     } 

     public void run() 
     { 
     Iterable<ThreadLocal<?>> vars = null; 
     try 
     { 
      vars = copy(caller); 
     } 
     catch (Exception e) 
     { 
      throw new RuntimeException("error when coping Threads", e); 
     } 
     try { 
      task.run(); 
     } 
     finally { 
      for (ThreadLocal<?> var : vars) 
       var.remove(); 
     } 
     } 
    } 

método de copia:

public static Iterable<ThreadLocal<?>> copy(Thread caller) throws Exception 
    { 
     List<ThreadLocal<?>> threadLocals = new ArrayList<>(); 
     Field field = Thread.class.getDeclaredField("inheritableThreadLocals"); 
     field.setAccessible(true); 
     Object map = field.get(caller); 
     Field table = Class.forName("java.lang.ThreadLocal$ThreadLocalMap").getDeclaredField("table"); 
     table.setAccessible(true); 

     Method method = ThreadLocal.class 
       .getDeclaredMethod("createInheritedMap", Class.forName("java.lang.ThreadLocal$ThreadLocalMap")); 
     method.setAccessible(true); 
     Object o = method.invoke(null, map); 

     Field field2 = Thread.class.getDeclaredField("inheritableThreadLocals"); 
     field2.setAccessible(true); 
     field2.set(Thread.currentThread(), o); 

     Object tbl = table.get(o); 
     int length = Array.getLength(tbl); 
     for (int i = 0; i < length; i++) 
     { 
     Object entry = Array.get(tbl, i); 
     Object value = null; 
     if (entry != null) 
     { 
      Method referentField = Class.forName("java.lang.ThreadLocal$ThreadLocalMap$Entry").getMethod(
        "get"); 
      referentField.setAccessible(true); 
      value = referentField.invoke(entry); 
      threadLocals.add((ThreadLocal<?>) value); 
     } 
     } 
     return threadLocals; 
    } 
0

Aquí se muestra un ejemplo para pasar el LocaleContext actual en hilo padre a la rosca niño abarcado por CompletableFuture [Por defecto se utiliza ForkJoinPool].

Simplemente defina todas las cosas que quería hacer en un subproceso secundario dentro de un bloque Runnable. Por lo tanto, cuando el CompletableFuture ejecuta el bloque Runnable, es el hilo hijo el que tiene el control y listo para que tenga el elemento ThreadLocal del padre en Child's ThreadLocal.

El problema aquí no es que se haya copiado todo el ThreadLocal. Solo se copia el LocaleContext. Dado que ThreadLocal tiene acceso privado solo al subproceso al que pertenece también, usar Reflection y tratar de obtener y establecer en Child es demasiada cosa alocada que puede ocasionar fugas de memoria o rendimiento.

Así que si conoce los parámetros que le interesan de ThreadLocal, entonces esta solución funciona de manera más limpia.

public void parentClassMethod(Request request) { 
     LocaleContext currentLocale = LocaleContextHolder.getLocaleContext(); 
     executeInChildThread(() -> { 
       LocaleContextHolder.setLocaleContext(currentLocale); 
       //Do whatever else you wanna do 
      })); 

     //Continue stuff you want to do with parent thread 
} 


private void executeInChildThread(Runnable runnable) { 
    try { 
     CompletableFuture.runAsync(runnable) 
      .get(); 
    } catch (Exception e) { 
     LOGGER.error("something is wrong"); 
    } 
} 
0

No me gusta Reflexión acercamiento. Una solución alternativa sería implementar el wrapper del ejecutor y pasar el objeto directamente como un contexto ThreadLocal a todos los hilos secundarios que propagan un contexto principal.

public class PropagatedObject { 

    private ThreadLocal<ConcurrentHashMap<AbsorbedObjectType, Object>> data = new ThreadLocal<>(); 

    //put, set, merge methods, etc 

} 

==>

public class ObjectAwareExecutor extends AbstractExecutorService { 

    private final ExecutorService delegate; 
    private final PropagatedObject objectAbsorber; 

    public ObjectAwareExecutor(ExecutorService delegate, PropagatedObject objectAbsorber){ 
     this.delegate = delegate; 
     this.objectAbsorber = objectAbsorber; 
    } 
    @Override 
    public void execute(final Runnable command) { 

     final ConcurrentHashMap<String, Object> parentContext = objectAbsorber.get(); 
     delegate.execute(() -> { 
      try{ 
       objectAbsorber.set(parentContext); 
       command.run(); 
      }finally { 
       parentContext.putAll(objectAbsorber.get()); 
       objectAbsorber.clean(); 
      } 
     }); 
     objectAbsorber.merge(parentContext); 
    } 
Cuestiones relacionadas