2012-09-20 21 views
21

Por lo tanto, tengo un proceso de servicio de Windows que realiza un proceso de flujo de trabajo. El back-end usa Repository y UnitofWork Pattern and Unity en la parte superior de Entity Framework con la clase de entidades generada desde el edmx. No voy a entrar en muchos detalles, ya que no es necesario, pero básicamente hay 5 pasos por los que pasa el flujo de trabajo. Un proceso particular podría ser en cualquier etapa en cualquier momento (en orden, por supuesto). El primer paso solo genera datos para el paso dos, que valida los datos a través de un proceso de ejecución larga a otro servidor. Luego paso a generar un pdf con esa información. Para cada etapa generamos un temporizador, sin embargo, es configurable para permitir que se genere más de un temporizador para cada etapa. Ahí está el problema. Cuando agrego un procesador a una etapa particular, aparece el siguiente error al azar:Entidad de subprocesamiento Entity Framework: la conexión no se cerró. El estado actual de la conexión se conecta al

La conexión no se cerró. El estado actual de la conexión es la conexión.

Al leer esto parece obvio que esto está sucediendo porque el contexto está tratando de acceder a la misma entidad desde dos hilos. Pero aquí es donde me está dando vueltas. Toda la información que puedo encontrar en este afirma que deberíamos usar un contexto de instancia por hilo. Que, por lo que puedo decir, estoy haciendo (ver el código a continuación). No estoy usando patrones únicos o estáticos ni nada, así que no estoy seguro de por qué está sucediendo esto o cómo evitarlo. He publicado los fragmentos relevantes de mi código a continuación para su revisión.

El repositorio de la base:

public class BaseRepository 
{ 
    /// <summary> 
    /// Initializes a repository and registers with a <see cref="IUnitOfWork"/> 
    /// </summary> 
    /// <param name="unitOfWork"></param> 
    public BaseRepository(IUnitOfWork unitOfWork) 
    { 
     if (unitOfWork == null) throw new ArgumentException("unitofWork"); 
     UnitOfWork = unitOfWork; 
    } 


    /// <summary> 
    /// Returns a <see cref="DbSet"/> of entities. 
    /// </summary> 
    /// <typeparam name="TEntity">Entity type the dbset needs to return.</typeparam> 
    /// <returns></returns> 
    protected virtual DbSet<TEntity> GetDbSet<TEntity>() where TEntity : class 
    { 

     return Context.Set<TEntity>(); 
    } 

    /// <summary> 
    /// Sets the state of an entity. 
    /// </summary> 
    /// <param name="entity">object to set state.</param> 
    /// <param name="entityState"><see cref="EntityState"/></param> 
    protected virtual void SetEntityState(object entity, EntityState entityState) 
    { 
     Context.Entry(entity).State = entityState; 
    } 

    /// <summary> 
    /// Unit of work controlling this repository.  
    /// </summary> 
    protected IUnitOfWork UnitOfWork { get; set; } 

    /// <summary> 
    /// 
    /// </summary> 
    /// <param name="entity"></param> 
    protected virtual void Attach(object entity) 
    { 
     if (Context.Entry(entity).State == EntityState.Detached) 
      Context.Entry(entity).State = EntityState.Modified; 
    } 

    protected virtual void Detach(object entity) 
    { 
     Context.Entry(entity).State = EntityState.Detached; 
    } 

    /// <summary> 
    /// Provides access to the ef context we are working with 
    /// </summary> 
    internal StatementAutoEntities Context 
    { 
     get 
     {     
      return (StatementAutoEntities)UnitOfWork; 
     } 
    } 
} 

StatementAutoEntities es la clase EF autogenerado.

La aplicación repositorio:

public class ProcessingQueueRepository : BaseRepository, IProcessingQueueRepository 
{ 

    /// <summary> 
    /// Creates a new repository and associated with a <see cref="IUnitOfWork"/> 
    /// </summary> 
    /// <param name="unitOfWork"></param> 
    public ProcessingQueueRepository(IUnitOfWork unitOfWork) : base(unitOfWork) 
    { 
    } 

    /// <summary> 
    /// Create a new <see cref="ProcessingQueue"/> entry in database 
    /// </summary> 
    /// <param name="Queue"> 
    ///  <see cref="ProcessingQueue"/> 
    /// </param> 
    public void Create(ProcessingQueue Queue) 
    { 
     GetDbSet<ProcessingQueue>().Add(Queue); 
     UnitOfWork.SaveChanges(); 
    } 

    /// <summary> 
    /// Updates a <see cref="ProcessingQueue"/> entry in database 
    /// </summary> 
    /// <param name="queue"> 
    ///  <see cref="ProcessingQueue"/> 
    /// </param> 
    public void Update(ProcessingQueue queue) 
    { 
     //Attach(queue); 
     UnitOfWork.SaveChanges(); 
    } 

    /// <summary> 
    /// Delete a <see cref="ProcessingQueue"/> entry in database 
    /// </summary> 
    /// <param name="Queue"> 
    ///  <see cref="ProcessingQueue"/> 
    /// </param> 
    public void Delete(ProcessingQueue Queue) 
    { 
     GetDbSet<ProcessingQueue>().Remove(Queue); 
     UnitOfWork.SaveChanges(); 
    } 

    /// <summary> 
    /// Gets a <see cref="ProcessingQueue"/> by its unique Id 
    /// </summary> 
    /// <param name="id"></param> 
    /// <returns></returns> 
    public ProcessingQueue GetById(int id) 
    { 
     return (from e in Context.ProcessingQueue_SelectById(id) select e).FirstOrDefault(); 
    } 

    /// <summary> 
    /// Gets a list of <see cref="ProcessingQueue"/> entries by status 
    /// </summary> 
    /// <param name="status"></param> 
    /// <returns></returns> 
    public IList<ProcessingQueue> GetByStatus(int status) 
    { 
     return (from e in Context.ProcessingQueue_SelectByStatus(status) select e).ToList(); 
    } 

    /// <summary> 
    /// Gets a list of all <see cref="ProcessingQueue"/> entries 
    /// </summary> 
    /// <returns></returns> 
    public IList<ProcessingQueue> GetAll() 
    { 
     return (from e in Context.ProcessingQueue_Select() select e).ToList(); 
    } 

    /// <summary> 
    /// Gets the next pending item id in the queue for a specific work   
    /// </summary> 
    /// <param name="serverId">Unique id of the server that will process the item in the queue</param> 
    /// <param name="workTypeId">type of <see cref="WorkType"/> we are looking for</param> 
    /// <param name="operationId">if defined only operations of the type indicated are considered.</param> 
    /// <returns>Next pending item in the queue for the work type or null if no pending work is found</returns> 
    public int GetNextPendingItemId(int serverId, int workTypeId, int? operationId) 
    { 
     var id = Context.ProcessingQueue_GetNextPending(serverId, workTypeId, operationId).SingleOrDefault(); 
     return id.HasValue ? id.Value : -1; 
    } 

    /// <summary> 
    /// Returns a list of <see cref="ProcessingQueueStatus_dto"/>s objects with all 
    /// active entries in the queue 
    /// </summary> 
    /// <returns></returns> 
    public IList<ProcessingQueueStatus_dto> GetActiveStatusEntries() 
    { 
     return (from e in Context.ProcessingQueueStatus_Select() select e).ToList(); 
    } 
    /// <summary> 
    /// Bumps an entry to the front of the queue 
    /// </summary> 
    /// <param name="processingQueueId"></param> 
    public void Bump(int processingQueueId) 
    { 
     Context.ProcessingQueue_Bump(processingQueueId); 
    } 
} 

Utilizamos la Unidad para la inyección de dependencias, algunas código de llamada, por ejemplo:

#region Members 
    private readonly IProcessingQueueRepository _queueRepository;  
    #endregion 

    #region Constructors 
    /// <summary>Initializes ProcessingQueue services with repositories</summary> 
    /// <param name="queueRepository"><see cref="IProcessingQueueRepository"/></param>   
    public ProcessingQueueService(IProcessingQueueRepository queueRepository) 
    { 
     Check.Require(queueRepository != null, "processingQueueRepository is required"); 
     _queueRepository = queueRepository; 

    } 
    #endregion 

El código en el servicio de Windows que arranca los temporizadores es el siguiente:

  _staWorkTypeConfigLock.EnterReadLock(); 
     foreach (var timer in from operation in (from o in _staWorkTypeConfig.WorkOperations where o.UseQueueForExecution && o.AssignedProcessors > 0 select o) 
           let interval = operation.SpawnInternval < 30 ? 30 : operation.SpawnInternval 
           select new StaTimer 
          { 
           Interval = _runImmediate ? 5000 : interval*1000, 
           Operation = (ProcessingQueue.RequestedOperation) operation.OperationId 
          }) 
     { 
      timer.Elapsed += ApxQueueProcessingOnElapsedInterval; 
      timer.Enabled = true; 
      Logger.DebugFormat("Queue processing for operations of type {0} will execute every {1} seconds", timer.Operation, timer.Interval/1000);     
     } 
     _staWorkTypeConfigLock.ExitReadLock(); 

StaTimer es solo una envoltura en el temporizador que agrega el tipo de operación. ApxQueueProcessingOnElapsedInterval básicamente solo asigna trabajo al proceso en función de la operación.

También agregaré un poco del código ApxQueueProcessingOnElapsedInterval donde estamos generando tareas.

  _staTasksLock.EnterWriteLock(); 
     for (var x = 0; x < tasksNeeded; x++) 
     { 
      var t = new Task(obj => ProcessStaQueue((QueueProcessConfig) obj), 
          CreateQueueProcessConfig(true, operation), _cancellationToken); 


      _staTasks.Add(new Tuple<ProcessingQueue.RequestedOperation, DateTime, Task>(operation, DateTime.Now,t)); 

      t.Start(); 
      Thread.Sleep(300); //so there are less conflicts fighting for jobs in the queue table 
     } 
     _staTasksLock.ExitWriteLock(); 
+0

¿Tiene el contenedor de IoC 'Dispose' las instancias de contexto? –

Respuesta

26

Parece que su servicio, repositorio y contexto se supone que deben vivir durante toda la vida útil de su aplicación, pero eso es incorrecto. Puede tener varios temporizadores activados al mismo tiempo. Esto significa que varios hilos utilizarán su servicio en paralelo y ejecutarán el código de su servicio en su thread = contexto que se comparte entre varios threads => excepción porque el contexto no es seguro para subprocesos.

La única opción es utilizar una nueva instancia de contexto para cada operación que desee ejecutar. Por ejemplo, puede cambiar sus clases para aceptar context factory en lugar de contexto y obtener un nuevo contexto para cada operación.

+0

Agregué un poco del código para mostrar cómo se generan los servicios. Estoy usando una tarea que genera ProcessStaQueue que luego determina qué operación debe ejecutar. Sé que el contexto no es seguro para subprocesos, pero dado que no estoy usando statics o un patrón singleton o algo así, ¿no debería instanciarse un nuevo contexto de forma independiente para cada tarea realizada? – Brandon

+0

El servicio finalmente se está instanciando así: ServiceLocator.Current.GetInstance (); ¿Esto tiene algo que ver? Creo que tal vez es que ServiceLocator tiene algo que ver con esto. – Brandon

+0

Pero depende de la implementación del localizador de servicios si crea una nueva instancia del servicio cada vez que la solicita o si crea una instancia solo para la primera solicitud y luego reutiliza la instancia. –

3

En caso de que esto ayude a:

En mi caso, se aseguró de que la no-thread-safe DbContext tenían un TransientLifetime (usando Ninject), pero todavía estaba causando problemas de concurrencia! Resulta que en algunos de mis ActionFilters personalizados usé Dependency Injection para obtener acceso al DbContext en el constructor, pero ActionFilters tienen una duración que los mantiene instanciados en varias solicitudes, por lo que el contexto no se volvió a crear.

Lo solucioné resolviendo manualmente la dependencia en el método OnActionExecuting en lugar de en el constructor para que sea una instancia nueva cada vez.

-1

En mi caso, estaba obteniendo este problema porque olvidé la palabra clave await antes de una de mis llamadas a la función DAL. Poniendo await allí lo resolvió.

+0

Awaik hace una rosca no multihilo – Merta

Cuestiones relacionadas