2010-02-25 12 views
6

Busco un diseño libre de bloqueo conforme a estos requisitos:Buscando una estructura de un solo lector de un solo escritor sin bloqueo RT-safe

  • un solo escritor escribe en una estructura y un solo lector lee de esta estructura (existe esta estructura ya y es seguro para su uso simultáneo de lectura/escritura)
  • pero en algún momento, la estructura tiene que ser cambiado por el escritor, que luego Inicializa, interruptores y escribe en una nueva estructura (del mismo tipo pero con contenido nuevo)
  • y en la próxima vez que el lector lea, cambia a esta nueva estructura (si el escritor cambia a una nueva estructura libre de bloqueos, el lector descarta estas estructuras, ignorando sus datos).
  • Las estructuras se deben reutilizar, es decir, no se permite la asignación de memoria de montón/libre durante la operación de escritura/lectura/conmutación, para RT fines.

Actualmente he implementado un ringbuffer que contiene varias instancias de estas estructuras; pero esta implementación adolece del hecho de que cuando el escritor ha utilizado todas las estructuras presentes en el ringbuffer, ya no hay lugar para cambiar de estructura ... Pero el resto del ringbuffer contiene algunos datos que no necesitan ser leídos por el lector, pero no puede ser reutilizado por el escritor. Como consecuencia, el Ringbuffer no se ajusta a este propósito.

¿Alguna idea (nombre o pseudoimplementación) de un diseño sin bloqueo? Gracias por haber considerado este problema.

+3

Demasiado énfasis no hace énfasis. – kennytm

+0

@KennyTM: tienes razón. Editado – moala

+1

Buena pregunta. Este es un problema común en los sistemas en tiempo real. Tengo curiosidad de saber si hay una forma disponible de hacerlo. – thebretness

Respuesta

0

Estás en el camino correcto.

Lock libre comunicación de mensajes fijos entre hilos/procesos/procesadores alt text http://img59.imageshack.us/img59/5273/basicq.gif

fijo tamaño búferes de anillo se pueden utilizar en comunicaciones libres de bloqueo entre los hilos, procesos o procesadores si hay un productor y un consumidor. Algunas comprobaciones a realizar:

cabeza variable se escribe sólo por el productor (como una acción atómica después de la escritura)

cola variable se escribe sólo por el consumidor (como una acción atómica después de la lectura)

Obstáculo: introducción de una variable de tamaño o indicador de llenado/vacío completo; estos son típicamente escritos tanto por el productor como por el consumidor y, por lo tanto, le darán un problema.

Generalmente utilizo amortiguadores de anillo para este propósito. La lección más importante que aprendí es que un buffer de anillos nunca puede contener más que elementos. De esta manera, cabeza y cola variable son escritos por el productor, respectivamente, el consumidor.

extensión para grandes bloques/variable de tamaño Para utilizar tampones en un entorno de tiempo real, puede utilizar agrupaciones de memoria (a menudo disponibles en forma optimizada en tiempo real los sistemas operativos) o desacoplar asignación de uso. Esto último se ajusta a la pregunta, creo.

extended queue http://img16.imageshack.us/img16/8725/extq.gif

Si necesita intercambiar grandes bloques, Es mejor utilizar una piscina con bloques de amortiguamiento y comunicar punteros a buffers utilizando una cola. Entonces usa una tercera cola con punteros de buffer. De esta forma, las asignaciones se pueden realizar en la aplicación (fondo) y la parte de tiempo real tiene acceso a una cantidad variable de memoria.

Aplicación

while (blockQueue.full != true) 
{ 
    buf = allocate block of memory from heap or buffer pool 
    msg = { .... , buf }; 
    blockQueue.Put(msg) 
} 

Producer: 
    pBuf = blockQueue.Get() 
    pQueue.Put() 

Consumer 
    if (pQueue.Empty == false) 
    { 
     msg=pQueue.Get() 
     // use info in msg, with buf pointer 
     // optionally indicate that buf is no longer used 
    } 
+0

No estoy siguiendo la explicación aquí. Necesita alguna corrección. – thebretness

+0

Gracias por haber pasado tanto tiempo en esto. Su diseño adolece del hecho de que si el consumidor no se ejecuta, el productor y los hilos de alimentación sufrirán inanición, como en el caso de un buffer de anillo, pero consumirán toda la memoria del montón. – moala

+0

Acabo de explicar la parte de tiempo real; el diseño en tiempo real es una cuestión compleja con la que estoy trabajando a diario. Depende de usted cómo maneja su manejo de excepciones (el cliente no se ejecuta, etc.). No puedo decidir por usted (ya sea descartar datos, realizar manejo de excepciones o bloquear). Como estos son buffers de anillo, puede controlar el número de elementos para que sea suficiente para la latencia esperada de la tarea de fondo. – Adriaan

0

Aquí hay uno. Las claves son que hay tres almacenamientos intermedios y el lector reserva el búfer del que está leyendo. El escritor escribe en uno de los otros dos buffers. El riesgo de colisión es mínimo. Además, esto se expande. Simplemente haga que sus matrices miembros sean un elemento más largo que el número de lectores, más el número de escritores.

class RingBuffer 
{ 
    RingBuffer():lastFullWrite(0) 
    { 
    //Initialize the elements of dataBeingRead to false 
    for(unsigned int i=0; i<DATA_COUNT; i++) 
    { 
     dataBeingRead[i] = false; 
    } 
    } 

    Data read() 
    { 
    // You may want to check to make sure write has been called once here 
    // to prevent read from grabbing junk data. Else, initialize the elements 
    // of dataArray to something valid. 
    unsigned int indexToRead = lastFullWriteIndex; 
    Data dataCopy; 
    dataBeingRead[indexToRead] = true; 
    dataCopy = dataArray[indexToRead]; 
    dataBeingRead[indexToRead] = false; 
    return dataCopy; 
    } 

    void write(const Data& dataArg) 
    { 
    unsigned int writeIndex(0); 

    //Search for an unused piece of data. 
    // It's O(n), but plenty fast enough for small arrays. 
    while(true == dataBeingRead[writeIndex] && writeIndex < DATA_COUNT) 
    { 
     writeIndex++; 
    } 

    dataArray[writeIndex] = dataArg; 

    lastFullWrite = &dataArray[writeIndex]; 
    } 

private: 
    static const unsigned int DATA_COUNT; 
    unsigned int lastFullWrite; 
    Data dataArray[DATA_COUNT]; 
    bool dataBeingRead[DATA_COUNT]; 
}; 

Nota: La forma en que está escrito aquí, hay dos copias para leer sus datos. Si transfiere sus datos de la función de lectura a través de un argumento de referencia, puede reducirlos a una copia.

+0

¿Qué pasa si * lastFullWriteIndex * in * read * se cambiará por * write * si * write * se llamará en medio de * read *? Luego tendrá dos instancias de * dataBeingRead * establecido en * true *. – werewindle

+0

Buena captura. Fijo. – thebretness

Cuestiones relacionadas