2012-10-10 53 views
19

Tengo una lista de ID, y necesito ejecutar varios procedimientos almacenados en cada ID.Paralelo no funciona con Entity Framework

Cuando estoy usando un bucle Foreach estándar, funciona bien, pero cuando tengo muchos registros, funciona bastante lento.

Quería convertir el código para trabajar con EF, pero recibo una excepción: "El proveedor subyacente falló en Abrir".

estoy utilizando este código, dentro de la Parallel.ForEach:

using (XmlEntities osContext = new XmlEntities()) 
{ 
    //The code 
} 

Pero todavía produce la excepción.

¿Alguna idea de cómo puedo usar Parallel with EF? ¿Necesito crear un nuevo contexto para cada procedimiento que estoy ejecutando? Tengo alrededor de 10 procedimientos, así que creo que es muy malo crear 10 contextos, uno para cada uno.

+1

No soy un gurú de subprocesos múltiples, pero si está utilizando transacciones o su lectura/escritura se bloquea, es posible que no obtenga un mejor rendimiento que simplemente hacerlo en serie. – Matthew

+0

Dudo martillar el servidor sql estresado con más hilos no ayudará al rendimiento ... – rene

+2

El sql no está estresado en absoluto, por eso quiero usar un paralelo. y seguramente correrá más rápido. –

Respuesta

31

The underlying database connections that the Entity Framework are using are not thread-safe. Usted tendrá necesidad de crear un nuevo contexto para cada operación en otro hilo que va a realizar.

Su preocupación sobre cómo paralelizar la operación es válida; que muchos contextos van a ser costosos de abrir y cerrar.

En su lugar, es posible que desee invertir cómo piensa sobre la paralelización del código. Parece que está pasando por encima de una serie de elementos y luego llama a los procedimientos almacenados en serie para cada elemento.

Si es posible, crear un nuevo Task<TResult> (o Task, si usted no necesita un resultado) para cada procedimiento y entonces en ese Task<TResult>, abre un solo contexto, recorrer todos los elementos, y luego ejecuta el procedimiento almacenado. De esta forma, solo tiene un número de contextos igual al número de procedimientos almacenados que está ejecutando en paralelo.

Supongamos que usted tiene una MyDbContext con dos procedimientos almacenados, DoSomething1 y DoSomething2, los cuales tienen una instancia de una clase, MyItem.

ejecución de lo anterior sería algo como:

// You'd probably want to materialize this into an IList<T> to avoid 
// warnings about multiple iterations of an IEnumerable<T>. 
// You definitely *don't* want this to be an IQueryable<T> 
// returned from a context. 
IEnumerable<MyItem> items = ...; 

// The first stored procedure is called here. 
Task t1 = Task.Run(() => { 
    // Create the context. 
    using (var ctx = new MyDbContext()) 
    // Cycle through each item. 
    foreach (MyItem item in items) 
    { 
     // Call the first stored procedure. 
     // You'd of course, have to do something with item here. 
     ctx.DoSomething1(item); 
    } 
}); 

// The second stored procedure is called here. 
Task t2 = Task.Run(() => { 
    // Create the context. 
    using (var ctx = new MyDbContext()) 
    // Cycle through each item. 
    foreach (MyItem item in items) 
    { 
     // Call the first stored procedure. 
     // You'd of course, have to do something with item here. 
     ctx.DoSomething2(item); 
    } 
}); 

// Do something when both of the tasks are done. 

Si no puede ejecutar los procedimientos almacenados en paralelo (cada uno es dependiente de que se ejecute en un cierto orden), entonces usted puede todavía paraleliza tus operaciones, es solo un poco más complejo.

Miraría creating custom partitions en sus artículos (usando el Create method estático en el Partitioner class). Esto le dará los medios para obtener implementaciones de IEnumerator<T> (tenga en cuenta que esto es noIEnumerable<T> por lo que no puede foreach sobre él).

Para cada instancia IEnumerator<T> vuelvas, que crearía un nuevo Task<TResult> (si necesita un resultado), y en el cuerpo Task<TResult>, debe crear el contexto y luego desplazarse a través de los artículos devueltos por el IEnumerator<T>, llamando los procedimientos almacenados en orden

Ese sería el siguiente:

// Get the partitioner. 
OrdinalPartitioner<MyItem> partitioner = Partitioner.Create(items); 

// Get the partitions. 
// You'll have to set the parameter for the number of partitions here. 
// See the link for creating custom partitions for more 
// creation strategies. 
IList<IEnumerator<MyItem>> paritions = partitioner.GetPartitions(
    Environment.ProcessorCount); 

// Create a task for each partition. 
Task[] tasks = partitions.Select(p => Task.Run(() => { 
     // Create the context. 
     using (var ctx = new MyDbContext()) 
     // Remember, the IEnumerator<T> implementation 
     // might implement IDisposable. 
     using (p) 
     // While there are items in p. 
     while (p.MoveNext()) 
     { 
      // Get the current item. 
      MyItem current = p.Current; 

      // Call the stored procedures. Process the item 
      ctx.DoSomething1(current); 
      ctx.DoSomething2(current); 
     } 
    })). 
    // ToArray is needed (or something to materialize the list) to 
    // avoid deferred execution. 
    ToArray(); 
0

Es un poco difícil de solucionar éste sin saber lo que el resultado es la excepción interna, en su caso. Esto podría ser simplemente un problema con la forma en que se configura la cadena de conexión o la configuración del proveedor.

En general, debe tener cuidado con el código paralelo y EF. Sin embargo, lo que estás haciendo debe funcionar. Una pregunta en mi mente; ¿Se está realizando algún trabajo en otra instancia de ese contexto antes de el paralelo? De acuerdo con su publicación, está haciendo un contexto separado en cada hilo. Eso es bueno. Parte de mí se pregunta, sin embargo, si hay una cierta contención constructora interesante entre los contextos múltiples. Si no está utilizando ese contexto en cualquier lugar antes de esa llamada paralela, sugeriría tratar de ejecutar incluso una consulta simple contra el contexto para abrirlo y asegurarse de que todos los bits EF se activen antes de ejecutar el método paralelo. Lo admitiré, no he intentado exactamente qué hiciste aquí, pero lo hice cerca y funcionó.

+1

, usted sabe que esta pregunta se publicó en 2012, ¿verdad? – Claies

2

Esto es lo que uso y funciona muy bien. Es, además, es compatible con el manejo de las excepciones de error y tiene un modo de depuración que hace que sea mucho más fácil de realizar un seguimiento de las cosas

public static ConcurrentQueue<Exception> Parallel<T>(this IEnumerable<T> items, Action<T> action, int? parallelCount = null, bool debugMode = false) 
{ 
    var exceptions = new ConcurrentQueue<Exception>(); 
    if (debugMode) 
    { 
     foreach (var item in items) 
     { 
      try 
      { 
       action(item); 
      } 
      // Store the exception and continue with the loop.      
      catch (Exception e) 
      { 
       exceptions.Enqueue(e); 
      } 
     } 
    } 
    else 
    { 
     var partitions = Partitioner.Create(items).GetPartitions(parallelCount ?? Environment.ProcessorCount).Select(partition => Task.Factory.StartNew(() => 
     { 
      while (partition.MoveNext()) 
      { 
       try 
       { 
        action(partition.Current); 
       } 
       // Store the exception and continue with the loop.      
       catch (Exception e) 
       { 
        exceptions.Enqueue(e); 
       } 
      } 
     })); 
     Task.WaitAll(partitions.ToArray()); 
    } 
    return exceptions; 
} 

Se usa como la siguiente donde como db es la DbContext original y db.CreateInstance() crea una nueva instancia utilizando la misma cadena de conexión.

 var batch = db.Set<SomeListToIterate>().ToList(); 
     var exceptions = batch.Parallel((item) => 
     { 
      using (var batchDb = db.CreateInstance()) 
      { 
       var batchTime = batchDb.GetDBTime(); 
       var someData = batchDb.Set<Permission>().Where(x=>x.ID = item.ID).ToList(); 
       //do stuff to someData 
       item.WasMigrated = true; //note that this record is attached to db not batchDb and will only be saved when db.SaveChanges() is called 
       batchDb.SaveChanges();   
      }     
     }); 
     if (exceptions.Count > 0) 
     { 
      logger.Error("ContactRecordMigration : Content: Error processing one or more records", new AggregateException(exceptions)); 
      throw new AggregateException(exceptions); //optionally throw an exception 
     } 
     db.SaveChanges(); //save the item modifications 
Cuestiones relacionadas