2012-06-22 15 views
8

¿Alguien sabe cómo escribir una función de extensión devolviendo una ParallelQuery en PLINQ?¿Cómo escribo una función de extensión con hilos para PLINQ?

Más específicamente, tengo el siguiente problema: quiero realizar una transformación dentro de una consulta PLINQ que necesita un motor, cuya creación es costosa y a la que no se puede acceder simultáneamente.

que podía hacer lo siguiente:

var result = source.AsParallel().Select ((i) => { var e = new Engine(); return e.Process(i); }) 

En este caso, el motor se crea una vez por artículo, que es demasiado caro.

Quiero que el motor se cree una vez por hilo.

con agregado, que puedo llegar cerca de lo que quiero con algo así como

// helper class: engine to use plus list of results obtained in thread so far 
class EngineAndResults { 
    public Engine engine = null; 
    public IEnumerable<ResultType> results; 
} 

var result = source.AsParallel().Aggregate (

    // done once per block of items (=thread), 
    // returning an empty list, but a new engine 
    () => new EngineAndList() { 
     engine = new Engine(), 
     results = Enumerable.Empty<ResultType>() 
    }, 

    // we process a new item and put it to the thread-local list, 
    // preserving the engine for further use 
    (engineAndResults, item) => new EngineAndResults() { 
     engine = engineAndResults.engine, 
     results = Enumerable.Concat (
      engineAndResults.results, 
      new ResultType [] { engineAndResults.engine.Process (item) } 
     ) 
    }, 

    // tell linq how to aggregate across threads 
    (engineAndResults1, engineAndResults2) => new EngineAndResults() { 
     engine = engineAndResults1.engine, 
     results = Enumerable.Concat (engineAndResults1.results, engineAndResults2.results) 
    }, 

    // after all aggregations, how do we come to the result? 
    engineAndResults => engineAndResults.results 
); 

Como se puede ver, hago mal uso del acumulador de llevar un motor por hilo. El problema aquí es que PLINQ al final agrega los resultados en un solo IEnumerable, lo que hace que los hilos se sincronicen. Esto no es muy bueno si quiero añadir otra extensión PLINQ después.

apreciaría algo así como

var result = source.AsParallel() 
        .SelectWithThreadwiseInitWhichIAmLookingFor (
         () => new Engine(), 
         (engine, item) => engine.Process (item) 
      ) 

¿Alguien tiene alguna idea de cómo lograr esto?

Respuesta

5

Puede usar ThreadLocal<T> para hacerlo. Algo como:

var engine = new ThreadLocal<Engine>(() => new Engine()); 
var result = source.AsParallel() 
        .Select(item => engine.Value.Process(item)); 
+0

Gracias. Esta es una buena solución. Hice una prueba breve, y parece funcionar bien. Traté de encontrar una manera de poner la inicialización en la función de extensión, pero no tuvo éxito; obviamente, ThreadLocal debe crearse antes de que se llame a AsParallel. No veo la razón para eso, pero de todos modos, este no es un gran problema. – JohnB

+0

Creo que eso no funcionó, porque estaba creando un nuevo 'ThreadLocal' para cada iteración, por lo que no podría haber ningún intercambio para las iteraciones que se ejecutan en el mismo subproceso. Todas las iteraciones que se ejecutan en el mismo subproceso necesitan la misma instancia de 'ThreadLocal'. – svick

Cuestiones relacionadas