He hecho casi exactamente lo mismo que usted describió con el siguiente código. Su hilo es seguro y tiene un método de enjuague al que puede llamar para nivelar y escribir pendientes. Una vez que alcanza un número de umbral de objetos para escribir, envía la cola (Lista en mi caso) a un hilo diferente para guardar. Tenga en cuenta que utiliza un manualResetEvent para manejar el vaciado de los datos al final (hay un límite de 64 eventos de reinicio que puede esperar, por eso es que espera manualmente si tenemos más de 64 subprocesos de fondo pendientes de escritura, pero eso debería casi nunca sucede a menos que su base de datos sea realmente lenta). Este código se usó para procesar decenas de millones de registros que se transmitieron (desde la memoria tardaron unos 5 minutos en escribir 20m filas, pero se ejecutaba en el servidor de guardar como base de datos, por lo que no hay salto de red ... SQL puede manejar miles de filas por segundo utilizando el objeto BulkSqlCopy e IDataReader), por lo que debe manejar su carga de solicitud (pero, por supuesto, eso dependerá de lo que escriba y de su base de datos, pero creo que el código está a la altura de la tarea).
Además, para redactar a granel las instalaciones, creo una implementación mínima de un IDataReader para transmitir mis datos. Tendrá que hacer eso para sus necesidades de usar el código a continuación.
public class DataImporter<T>
{
public DataImporter(string tableName, string readerName)
{
_tableName = tableName;
_readerName = readerName;
}
/// <summary>
/// This is the size of our bulk staging list.
/// </summary>
/// <remarks>
/// Note that the SqlBulkCopy object has a batch size property, which may not be the same as this value,
/// so records may not be going into the database in sizes of this staging value.
/// </remarks>
private int _bulkStagingListSize = 20000;
private List<ManualResetEvent> _tasksWaiting = new List<ManualResetEvent>();
private string _tableName = String.Empty;
private string _readerName = String.Empty;
public void QueueForImport(T record)
{
lock (_listLock)
{
_items.Add(record);
if (_items.Count > _bulkStagingListSize)
{
SaveItems(_items);
_items = new List<T>();
}
}
}
/// <summary>
/// This method should be called at the end of the queueing work to ensure to clear down our list
/// </summary>
public void Flush()
{
lock (_listLock)
{
SaveItems(_items);
_items = new List<T>();
while (_tasksWaiting.Count > 64)
{
Thread.Sleep(2000);
}
WaitHandle.WaitAll(_tasksWaiting.ToArray());
}
}
private void SaveItems(List<T> items)
{
ManualResetEvent evt = new ManualResetEvent(false);
_tasksWaiting.Add(evt);
IDataReader reader = DataReaderFactory.GetReader<T>(_readerName,_items);
Tuple<ManualResetEvent, IDataReader> stateInfo = new Tuple<ManualResetEvent, IDataReader>(evt, reader);
ThreadPool.QueueUserWorkItem(new WaitCallback(saveData), stateInfo);
}
private void saveData(object info)
{
using (new ActivityTimer("Saving bulk data to " + _tableName))
{
Tuple<ManualResetEvent, IDataReader> stateInfo = info as Tuple<ManualResetEvent, IDataReader>;
IDataReader r = stateInfo.Item2;
try
{
Database.DataImportStagingDatabase.BulkLoadData(r, _tableName);
}
catch (Exception ex)
{
//Do something
}
finally
{
_tasksWaiting.Remove(stateInfo.Item1);
stateInfo.Item1.Set();
}
}
}
private object _listLock = new object();
private List<T> _items = new List<T>();
}
El DataReaderFactory referido a continuación simplemente selecciona el implmentation IDataReader derecho a utilizar para la transmisión y se ve de la siguiente manera:
internal static class DataReaderFactory
{
internal static IDataReader GetReader<T>(string typeName, List<T> items)
{
IDataReader reader = null;
switch(typeName)
{
case "ProductRecordDataReader":
reader = new ProductRecordDataReader(items as List<ProductRecord>) as IDataReader;
break;
case "RetailerPriceRecordDataReader":
reader = new RetailerPriceRecordDataReader(items as List<RetailerPriceRecord>) as IDataReader;
break;
default:
break;
}
return reader;
}
}
La aplicación de lector de datos que utilicé en este caso (althoght este código va a funcionar con cualquier lector de datos) es la siguiente:
/// <summary>
/// This class creates a data reader for ProductRecord data. This is used to stream the records
/// to the SqlBulkCopy object.
/// </summary>
public class ProductRecordDataReader:IDataReader
{
public ProductRecordDataReader(List<ProductRecord> products)
{
_products = products.ToList();
}
List<ProductRecord> _products;
int currentRow;
int rowCounter = 0;
public int FieldCount
{
get
{
return 14;
}
}
#region IDataReader Members
public void Close()
{
//Do nothing.
}
public bool Read()
{
if (rowCounter < _products.Count)
{
currentRow = rowCounter;
rowCounter++;
return true;
}
else
{
return false;
}
}
public int RecordsAffected
{
get { throw new NotImplementedException(); }
}
public string GetName(int i)
{
switch (i)
{
case 0:
return "ProductSku";
case 1:
return "UPC";
case 2:
return "EAN";
case 3:
return "ISBN";
case 4:
return "ProductName";
case 5:
return "ShortDescription";
case 6:
return "LongDescription";
case 7:
return "DFFCategoryNumber";
case 8:
return "DFFManufacturerNumber";
case 9:
return "ManufacturerPartNumber";
case 10:
return "ManufacturerModelNumber";
case 11:
return "ProductImageUrl";
case 12:
return "LowestPrice";
case 13:
return "HighestPrice";
default:
return null;
}
}
public int GetOrdinal(string name)
{
switch (name)
{
case "ProductSku":
return 0;
case "UPC":
return 1;
case "EAN":
return 2;
case "ISBN":
return 3;
case "ProductName":
return 4;
case "ShortDescription":
return 5;
case "LongDescription":
return 6;
case "DFFCategoryNumber":
return 7;
case "DFFManufacturerNumber":
return 8;
case "ManufacturerPartNumber":
return 9;
case "ManufacturerModelNumber":
return 10;
case "ProductImageUrl":
return 11;
case "LowestPrice":
return 12;
case "HighestPrice":
return 13;
default:
return -1;
}
}
public object GetValue(int i)
{
switch (i)
{
case 0:
return _products[currentRow].ProductSku;
case 1:
return _products[currentRow].UPC;
case 2:
return _products[currentRow].EAN;
case 3:
return _products[currentRow].ISBN;
case 4:
return _products[currentRow].ProductName;
case 5:
return _products[currentRow].ShortDescription;
case 6:
return _products[currentRow].LongDescription;
case 7:
return _products[currentRow].DFFCategoryNumber;
case 8:
return _products[currentRow].DFFManufacturerNumber;
case 9:
return _products[currentRow].ManufacturerPartNumber;
case 10:
return _products[currentRow].ManufacturerModelNumber;
case 11:
return _products[currentRow].ProductImageUrl;
case 12:
return _products[currentRow].LowestPrice;
case 13:
return _products[currentRow].HighestPrice;
default:
return null;
}
}
#endregion
#region IDisposable Members
public void Dispose()
{
//Do nothing;
}
#endregion
#region IDataRecord Members
public bool NextResult()
{
throw new NotImplementedException();
}
public int Depth
{
get { throw new NotImplementedException(); }
}
public DataTable GetSchemaTable()
{
throw new NotImplementedException();
}
public bool IsClosed
{
get { throw new NotImplementedException(); }
}
public bool GetBoolean(int i)
{
throw new NotImplementedException();
}
public byte GetByte(int i)
{
throw new NotImplementedException();
}
public long GetBytes(int i, long fieldOffset, byte[] buffer, int bufferoffset, int length)
{
throw new NotImplementedException();
}
public char GetChar(int i)
{
throw new NotImplementedException();
}
public long GetChars(int i, long fieldoffset, char[] buffer, int bufferoffset, int length)
{
throw new NotImplementedException();
}
public IDataReader GetData(int i)
{
throw new NotImplementedException();
}
public string GetDataTypeName(int i)
{
throw new NotImplementedException();
}
public DateTime GetDateTime(int i)
{
throw new NotImplementedException();
}
public decimal GetDecimal(int i)
{
throw new NotImplementedException();
}
public double GetDouble(int i)
{
throw new NotImplementedException();
}
public Type GetFieldType(int i)
{
throw new NotImplementedException();
}
public float GetFloat(int i)
{
throw new NotImplementedException();
}
public Guid GetGuid(int i)
{
throw new NotImplementedException();
}
public short GetInt16(int i)
{
throw new NotImplementedException();
}
public int GetInt32(int i)
{
throw new NotImplementedException();
}
public long GetInt64(int i)
{
throw new NotImplementedException();
}
public string GetString(int i)
{
throw new NotImplementedException();
}
public int GetValues(object[] values)
{
throw new NotImplementedException();
}
public bool IsDBNull(int i)
{
throw new NotImplementedException();
}
public object this[string name]
{
get { throw new NotImplementedException(); }
}
public object this[int i]
{
get { throw new NotImplementedException(); }
}
#endregion
}
Finalmente el método datos de carga a granel se ve de la siguiente manera:
Sin embargo, una vez dicho todo esto, le recomiendo que NO utilice este código en asp.net por las razones que alguien señaló en otra respuesta (en particular el reciclaje de procesos de trabajo en IIS). Le sugiero que use una cola muy liviana para enviar primero los datos de solicitud a otro servicio que no se reiniciará (utilizamos ZeroMQ para transmitir la solicitud y registrar los datos de una aplicación ASP.NET que estoy escribiendo ... es muy eficiente).
Mike.
¿Por qué quieres hacer esto? – Marc
porque el acceso a los datos es lento, si hago una inserción o transacción a la vez. – DarthVader
Para el problema de subprocesamiento que enfrenta, ¿por qué no bloquea la colección, copia la colección original en una colección diferente (digamos nuevaColección), borre y elimine el bloqueo de la colección original y use nuevaColección para insertarla en la base de datos. Con este enfoque, las nuevas solicitudes no se bloquearían durante mucho tiempo. –