2011-08-20 6 views
5

Tengo un controlador http y estoy almacenando cada solicitud en una colección de colas simultáneas en la memoria. después de un cierto tiempo, estoy a granel insertando la colección en una base de datos.Conservando datos en la memoria

¿es esta una mala idea? Debido a que hay un gran volumen, este parece ser un mejor enfoque de OMI.

Veo algunas discrepancias (número de visitas frente a la cantidad de elementos almacenados en la base de datos), debido al enhebrado, mientras estoy descargando la colección simultánea, la bloqueo y a granel inserto su contenido y luego vacío la colección. luego quite el bloqueo de la colección.

¿hay una mejor práctica? o has hecho algo similar?

+0

¿Por qué quieres hacer esto? – Marc

+0

porque el acceso a los datos es lento, si hago una inserción o transacción a la vez. – DarthVader

+0

Para el problema de subprocesamiento que enfrenta, ¿por qué no bloquea la colección, copia la colección original en una colección diferente (digamos nuevaColección), borre y elimine el bloqueo de la colección original y use nuevaColección para insertarla en la base de datos. Con este enfoque, las nuevas solicitudes no se bloquearían durante mucho tiempo. –

Respuesta

1

He hecho casi exactamente lo mismo que usted describió con el siguiente código. Su hilo es seguro y tiene un método de enjuague al que puede llamar para nivelar y escribir pendientes. Una vez que alcanza un número de umbral de objetos para escribir, envía la cola (Lista en mi caso) a un hilo diferente para guardar. Tenga en cuenta que utiliza un manualResetEvent para manejar el vaciado de los datos al final (hay un límite de 64 eventos de reinicio que puede esperar, por eso es que espera manualmente si tenemos más de 64 subprocesos de fondo pendientes de escritura, pero eso debería casi nunca sucede a menos que su base de datos sea realmente lenta). Este código se usó para procesar decenas de millones de registros que se transmitieron (desde la memoria tardaron unos 5 minutos en escribir 20m filas, pero se ejecutaba en el servidor de guardar como base de datos, por lo que no hay salto de red ... SQL puede manejar miles de filas por segundo utilizando el objeto BulkSqlCopy e IDataReader), por lo que debe manejar su carga de solicitud (pero, por supuesto, eso dependerá de lo que escriba y de su base de datos, pero creo que el código está a la altura de la tarea).

Además, para redactar a granel las instalaciones, creo una implementación mínima de un IDataReader para transmitir mis datos. Tendrá que hacer eso para sus necesidades de usar el código a continuación.

public class DataImporter<T> 
{ 

    public DataImporter(string tableName, string readerName) 
    { 
     _tableName = tableName; 
     _readerName = readerName; 
    } 

    /// <summary> 
    /// This is the size of our bulk staging list. 
    /// </summary> 
    /// <remarks> 
    /// Note that the SqlBulkCopy object has a batch size property, which may not be the same as this value, 
    /// so records may not be going into the database in sizes of this staging value. 
    /// </remarks> 
    private int _bulkStagingListSize = 20000; 
    private List<ManualResetEvent> _tasksWaiting = new List<ManualResetEvent>(); 
    private string _tableName = String.Empty; 
    private string _readerName = String.Empty; 

    public void QueueForImport(T record) 
    { 
     lock (_listLock) 
     { 
      _items.Add(record); 
      if (_items.Count > _bulkStagingListSize) 
      { 
       SaveItems(_items); 
       _items = new List<T>(); 
      } 
     } 
    } 

    /// <summary> 
    /// This method should be called at the end of the queueing work to ensure to clear down our list 
    /// </summary> 
    public void Flush() 
    { 
     lock (_listLock) 
     { 
      SaveItems(_items); 
      _items = new List<T>(); 
      while (_tasksWaiting.Count > 64) 
      { 
       Thread.Sleep(2000); 
      } 
      WaitHandle.WaitAll(_tasksWaiting.ToArray()); 
     } 
    } 

    private void SaveItems(List<T> items) 
    { 
     ManualResetEvent evt = new ManualResetEvent(false); 
     _tasksWaiting.Add(evt); 
     IDataReader reader = DataReaderFactory.GetReader<T>(_readerName,_items); 
     Tuple<ManualResetEvent, IDataReader> stateInfo = new Tuple<ManualResetEvent, IDataReader>(evt, reader); 
     ThreadPool.QueueUserWorkItem(new WaitCallback(saveData), stateInfo); 

    } 

    private void saveData(object info) 
    { 
     using (new ActivityTimer("Saving bulk data to " + _tableName)) 
     { 
      Tuple<ManualResetEvent, IDataReader> stateInfo = info as Tuple<ManualResetEvent, IDataReader>; 
      IDataReader r = stateInfo.Item2; 
      try 
      { 
       Database.DataImportStagingDatabase.BulkLoadData(r, _tableName); 
      } 
      catch (Exception ex) 
      { 
       //Do something 
      } 
      finally 
      { 
       _tasksWaiting.Remove(stateInfo.Item1); 
       stateInfo.Item1.Set(); 
      } 
     } 
    } 

    private object _listLock = new object(); 

    private List<T> _items = new List<T>(); 
} 

El DataReaderFactory referido a continuación simplemente selecciona el implmentation IDataReader derecho a utilizar para la transmisión y se ve de la siguiente manera:

internal static class DataReaderFactory 
{ 
    internal static IDataReader GetReader<T>(string typeName, List<T> items) 
    { 
     IDataReader reader = null; 
     switch(typeName) 
     { 
      case "ProductRecordDataReader": 
       reader = new ProductRecordDataReader(items as List<ProductRecord>) as IDataReader; 
       break; 
      case "RetailerPriceRecordDataReader": 
       reader = new RetailerPriceRecordDataReader(items as List<RetailerPriceRecord>) as IDataReader; 
       break; 
      default: 
       break; 
     } 
     return reader; 
    } 
} 

La aplicación de lector de datos que utilicé en este caso (althoght este código va a funcionar con cualquier lector de datos) es la siguiente:

/// <summary> 
/// This class creates a data reader for ProductRecord data. This is used to stream the records 
/// to the SqlBulkCopy object. 
/// </summary> 
public class ProductRecordDataReader:IDataReader 
{ 
    public ProductRecordDataReader(List<ProductRecord> products) 
    { 
     _products = products.ToList(); 
    } 

    List<ProductRecord> _products; 

    int currentRow; 
    int rowCounter = 0; 
    public int FieldCount 
    { 
     get 
     { 
      return 14; 
     } 
    } 


    #region IDataReader Members 

    public void Close() 
    { 
     //Do nothing. 
    } 

    public bool Read() 
    { 
     if (rowCounter < _products.Count) 
     { 
      currentRow = rowCounter; 
      rowCounter++; 
      return true; 
     } 
     else 
     { 
      return false; 
     } 

    } 

    public int RecordsAffected 
    { 
     get { throw new NotImplementedException(); } 
    } 

    public string GetName(int i) 
    { 
     switch (i) 
     { 
      case 0: 
       return "ProductSku"; 
      case 1: 
       return "UPC"; 
      case 2: 
       return "EAN"; 
      case 3: 
       return "ISBN"; 
      case 4: 
       return "ProductName"; 
      case 5: 
       return "ShortDescription"; 
      case 6: 
       return "LongDescription"; 
      case 7: 
       return "DFFCategoryNumber"; 
      case 8: 
       return "DFFManufacturerNumber"; 
      case 9: 
       return "ManufacturerPartNumber"; 
      case 10: 
       return "ManufacturerModelNumber"; 
      case 11: 
       return "ProductImageUrl"; 
      case 12: 
       return "LowestPrice"; 
      case 13: 
       return "HighestPrice"; 
      default: 
       return null; 
     } 

    } 

    public int GetOrdinal(string name) 
    { 
     switch (name) 
     { 
      case "ProductSku": 
       return 0; 
      case "UPC": 
       return 1; 
      case "EAN": 
       return 2; 
      case "ISBN": 
       return 3; 
      case "ProductName": 
       return 4; 
      case "ShortDescription": 
       return 5; 
      case "LongDescription": 
       return 6; 
      case "DFFCategoryNumber": 
       return 7; 
      case "DFFManufacturerNumber": 
       return 8; 
      case "ManufacturerPartNumber": 
       return 9; 
      case "ManufacturerModelNumber": 
       return 10; 
      case "ProductImageUrl": 
       return 11; 
      case "LowestPrice": 
       return 12; 
      case "HighestPrice": 
       return 13; 
      default: 
       return -1; 
     } 

    } 

    public object GetValue(int i) 
    { 
     switch (i) 
     { 
      case 0: 
       return _products[currentRow].ProductSku; 
      case 1: 
       return _products[currentRow].UPC; 
      case 2: 
       return _products[currentRow].EAN; 
      case 3: 
       return _products[currentRow].ISBN; 
      case 4: 
       return _products[currentRow].ProductName; 
      case 5: 
       return _products[currentRow].ShortDescription; 
      case 6: 
       return _products[currentRow].LongDescription; 
      case 7: 
       return _products[currentRow].DFFCategoryNumber; 
      case 8: 
       return _products[currentRow].DFFManufacturerNumber; 
      case 9: 
       return _products[currentRow].ManufacturerPartNumber; 
      case 10: 
       return _products[currentRow].ManufacturerModelNumber; 
      case 11: 
       return _products[currentRow].ProductImageUrl; 
      case 12: 
       return _products[currentRow].LowestPrice; 
      case 13: 
       return _products[currentRow].HighestPrice; 
      default: 
       return null; 
     } 

    } 

    #endregion 

    #region IDisposable Members 

    public void Dispose() 
    { 
     //Do nothing; 
    } 

    #endregion 

    #region IDataRecord Members 

    public bool NextResult() 
    { 
     throw new NotImplementedException(); 
    } 

    public int Depth 
    { 
     get { throw new NotImplementedException(); } 
    } 

    public DataTable GetSchemaTable() 
    { 
     throw new NotImplementedException(); 
    } 

    public bool IsClosed 
    { 
     get { throw new NotImplementedException(); } 
    } 

    public bool GetBoolean(int i) 
    { 
     throw new NotImplementedException(); 
    } 

    public byte GetByte(int i) 
    { 
     throw new NotImplementedException(); 
    } 

    public long GetBytes(int i, long fieldOffset, byte[] buffer, int bufferoffset, int length) 
    { 
     throw new NotImplementedException(); 
    } 

    public char GetChar(int i) 
    { 
     throw new NotImplementedException(); 
    } 

    public long GetChars(int i, long fieldoffset, char[] buffer, int bufferoffset, int length) 
    { 
     throw new NotImplementedException(); 
    } 

    public IDataReader GetData(int i) 
    { 
     throw new NotImplementedException(); 
    } 

    public string GetDataTypeName(int i) 
    { 
     throw new NotImplementedException(); 
    } 

    public DateTime GetDateTime(int i) 
    { 
     throw new NotImplementedException(); 
    } 

    public decimal GetDecimal(int i) 
    { 
     throw new NotImplementedException(); 
    } 

    public double GetDouble(int i) 
    { 
     throw new NotImplementedException(); 
    } 

    public Type GetFieldType(int i) 
    { 
     throw new NotImplementedException(); 
    } 

    public float GetFloat(int i) 
    { 
     throw new NotImplementedException(); 
    } 

    public Guid GetGuid(int i) 
    { 
     throw new NotImplementedException(); 
    } 

    public short GetInt16(int i) 
    { 
     throw new NotImplementedException(); 
    } 

    public int GetInt32(int i) 
    { 
     throw new NotImplementedException(); 
    } 

    public long GetInt64(int i) 
    { 
     throw new NotImplementedException(); 
    } 

    public string GetString(int i) 
    { 
     throw new NotImplementedException(); 
    } 

    public int GetValues(object[] values) 
    { 
     throw new NotImplementedException(); 
    } 

    public bool IsDBNull(int i) 
    { 
     throw new NotImplementedException(); 
    } 

    public object this[string name] 
    { 
     get { throw new NotImplementedException(); } 
    } 

    public object this[int i] 
    { 
     get { throw new NotImplementedException(); } 
    } 

    #endregion 
} 

Finalmente el método datos de carga a granel se ve de la siguiente manera:

Sin embargo, una vez dicho todo esto, le recomiendo que NO utilice este código en asp.net por las razones que alguien señaló en otra respuesta (en particular el reciclaje de procesos de trabajo en IIS). Le sugiero que use una cola muy liviana para enviar primero los datos de solicitud a otro servicio que no se reiniciará (utilizamos ZeroMQ para transmitir la solicitud y registrar los datos de una aplicación ASP.NET que estoy escribiendo ... es muy eficiente).

Mike.

1

veo algunas discrepancias [...] debido a enhebrar

Lo fundamental aquí es el uso de 2 colas y ciclo de ellas. 1 para recibir y 1 para insertar. Solo necesita bloquear la recepción, con muy poca contención.

+0

Uso una cola para recibirla y la vuelvo a una lista con la que puedo insertar a granel. y utilizo la cola simultánea, así que no necesito bloquearla. Solo bloqueo la cola mientras la dejo de contenido para listarla y luego insertarla a granel y eliminar el bloqueo. ¿qué piensas? puedo hacerlo mejor? – DarthVader

+0

Bueno, el copy-to-list es más rápido que un insert db, pero aún así 2 colas podrían ser mejores. Depende. La cola concurrente hará que el cambio sea un poco más complicado. –

+0

Y hay un problema con ConcurrentQueue, ¿está dejando que sea realmente grande? Consulte http://connect.microsoft.com/VisualStudio/feedback/details/552868/memory-leak-in-concurrentqueue-t-class-dequeued-enteries-are-still-rooted –

2

Lo siento pero yo diría que es una mala idea. Existen los siguientes problemas:

  • Si el grupo de aplicaciones se recicla antes de datos se escriben en la base de datos que va a perder los datos
  • Mantener todos los datos en la misma colección lleva a la necesidad de bloquear esa colección cuando se insertan datos y cuando los datos se escriben en el disco y se borra la colección. Esto podría hacer que todo el sitio se detenga durante la inserción masiva.
  • Su código será más complicado con el paso adicional. La reparación de problemas de subprocesos es difícil

Hemos escrito aplicaciones web que escriben 1000 filas por segundo en una base de datos de SQL Server con una carga máxima.

Primero intente escribir su aplicación lo más simple posible y luego realice una prueba de rendimiento.

La velocidad a la que se pueden insertar en la base de datos depende mucho de su hardware, pero también hay cosas que usted puede hacer en su programa:

  • sólo tenemos un índice (agrupado) sobre la mesa. Autonúmero clave
  • asegúrese de liberar la conexión a la base de datos lo antes posible.
+0

Conozco sus viñetas y bloqueo mencionado. ¿Cómo escribiste 1000 filas por segundo al servidor sql? ¿Puedes hacer 10000 filas por segundo en el servidor sql? – DarthVader

0

Otra cosa que puedes hacer es enviarlo al disco en una base de datos como sqlite (para evitar el problema de recirculación de la agrupación) y enviarlo a tu base de datos del servidor sql.

Utilicé una extensión reactiva para crear colas de inserción y trabajar con una buena velocidad.

Cuestiones relacionadas