no creo que realmente necesita para conseguir abajo y sucio con TPL directa Tasks
para esto. Para empezar, configuraría un BlockingCollection
alrededor de un ConcurrentQueue
(el valor predeterminado) sin BoundedCapacity
establecido en el BlockingCollection
para almacenar los ID que deben procesarse.
// Setup the blocking collection somewhere when your process starts up (OnStart for a Windows service)
BlockingCollection<string> idsToProcess = new BlockingCollection<string>();
A partir de ahí me acaba de utilizar Parallel::ForEach
en la enumeración de regresar de la BlockingCollection::GetConsumingEnumerable
. En la llamada ForEach
configurará su ParallelOptions::MaxDegreeOfParallelism
Dentro del cuerpo del ForEach
, ejecutará su procedimiento almacenado.
Ahora, una vez que se completa la ejecución del procedimiento almacenado, está diciendo que no desea volver a programar la ejecución para por lo menos dos segundos. No hay problema, programe un System.Threading.Timer
con una devolución de llamada que simplemente agregará la ID al BlockingCollection
en la devolución de llamada proporcionada.
Parallel.ForEach(
idsToProcess.GetConsumingEnumerable(),
new ParallelOptions
{
MaxDegreeOfParallelism = 4 // read this from config
},
(id) =>
{
// ... execute sproc ...
// Need to declare/assign this before the delegate so that we can dispose of it inside
Timer timer = null;
timer = new Timer(
_ =>
{
// Add the id back to the collection so it will be processed again
idsToProcess.Add(id);
// Cleanup the timer
timer.Dispose();
},
null, // no state, id wee need is "captured" in the anonymous delegate
2000, // probably should read this from config
Timeout.Infinite);
}
Por último, cuando el proceso se está cerrando que llamarían BlockingCollection::CompleteAdding
para que el enumerable que se procesa con el bloqueo de parada y completa y el paralelo :: ParaCada se cerrará. Si esto fuera un servicio de Windows, por ejemplo, haría esto en OnStop
.
// When ready to shutdown you just signal you're done adding
idsToProcess.CompleteAdding();
actualización
usted ha planteado una preocupación válida en su comentario de que es posible que se procesa una gran cantidad de identificadores en un momento dado y temen que no habría demasiada sobrecarga en un contador de tiempo por ID .Estoy absolutamente de acuerdo con eso. Entonces, en el caso de que esté lidiando con una gran lista de ID al mismo tiempo, cambiaría de utilizar un temporizador por ID a usar otra cola para mantener los ID "dormidos" que son monitoreados por un único temporizador de intervalo corto. En primer lugar se necesita un ConcurrentQueue
en el que colocar los identificadores que están dormidos:
ConcurrentQueue<Tuple<string, DateTime>> sleepingIds = new ConcurrentQueue<Tuple<string, DateTime>>();
Ahora, estoy usando una de dos partes Tuple
aquí con fines ilustrativos, pero es posible que desee crear un mundo más fuertemente tipado struct for it (o al menos alias con una instrucción using
) para una mejor legibilidad. La tupla tiene la identificación y un DateTime que representa cuando se colocó en la cola.
Ahora usted también querrá configurar el temporizador que supervisará esta cola:
Timer wakeSleepingIdsTimer = new Timer(
_ =>
{
DateTime utcNow = DateTime.UtcNow;
// Pull all items from the sleeping queue that have been there for at least 2 seconds
foreach(string id in sleepingIds.TakeWhile(entry => (utcNow - entry.Item2).TotalSeconds >= 2))
{
// Add this id back to the processing queue
idsToProcess.Enqueue(id);
}
},
null, // no state
Timeout.Infinite, // no due time
100 // wake up every 100ms, probably should read this from config
);
Entonces sólo tendría que cambiar el Parallel::ForEach
hacer lo siguiente en lugar de establecer un temporizador para cada uno:
(id) =>
{
// ... execute sproc ...
sleepingIds.Enqueue(Tuple.Create(id, DateTime.UtcNow));
}
reactiva Programación –