2012-01-18 12 views
6

Estoy desarrollando un cliente que recibirá los datos [EEG] a través de tcp y los escribirá en el buffer de anillo. Pensé que podría ser muy conveniente tener el búfer como una matriz ctypes o numpy porque es posible crear una "vista" numpy en cualquier ubicación de dicho búfer y leer/escribir/procesar los datos sin ninguna operación de copia. ¿O es una mala idea en general?buffer de anillo con numpy/ctypes

Sin embargo, no veo cómo implementar un búfer circular de un tamaño fijo de esta manera. Supongamos que he creado un objeto buffer que está contiguo en la memoria. ¿Cuál es la mejor manera de escribir los datos cuando se llega al final del búfer?

Una posible forma es comenzar a sobrescribir los bytes (ya antiguos) desde el inicio cuando el puntero de escritura llega al final de la matriz de almacenamiento intermedio. Sin embargo, cerca de los límites, la vista numpy de algún fragmento (para el procesamiento) no se puede crear (¿o puede ser así?) En este caso, porque parte de él puede estar ubicado al final de la matriz de almacenamiento intermedio mientras que otro ya está su principio. He leído que es imposible crear tales sectores circulares. ¿Cómo resolver esto?

UPD: Gracias a todos por las respuestas. En caso de que alguien también tenga el mismo problema, here es el código final que tengo.

+1

He escrito el [ 'paquete numpy_ringbuffer'] (https://pypi.python.org/pypi/numpy_ringbuffer) para resolver este problema, lo que proporciona una interfaz' deque' parecido a un búfer subyacente – Eric

Respuesta

4

Si necesita una ventana de N bytes, configure su búfer 2 * N bytes y escriba todas las entradas en dos ubicaciones: i % N y i % N + N, donde i es un contador de bytes. De esta forma, siempre tendrá N bytes consecutivos en el búfer.

data = 'Data to buffer' 
N = 4 
buf = 2*N*['\00'] 

for i,c in enumerate(data): 
    j = i % N 
    buf[j] = c 
    buf[j+N] = c 
    if i >= N-1: 
     print ''.join(buf[j+1:j+N+1]) 

grabados

Data 
ata 
ta t 
a to 
to 
to b 
o bu 
buf 
buff 
uffe 
ffer 
+0

yepp. Esto es lo que intento escribir ahora. En lugar de 2 * N buffer estoy teniendo uno de alguna longitud arbitraria + N y sigo la misma idea. ¡Gracias de cualquier manera! – dmytro

+0

Esto está bien si el rendimiento no es una preocupación en absoluto; pero lo dudo dado tu aplicación. Probablemente sea mejor que uses una solución vectorizada para tu problema –

2

Una posible manera es comenzar a sobrescribir los bytes (ya antiguos) desde el inicio cuando el puntero de escritura llega al final de la matriz de almacenamiento intermedio.

Esa es la única opción en un buffer de anillo de tamaño fijo.

He leído que es imposible crear tales sectores circulares.

Es por eso que no haría esto con una vista de Numpy. Puede crear un contenedor class alrededor de un ndarray en su lugar, manteniendo el búfer/matriz, la capacidad y un puntero (índice) en el punto de inserción. Si desea obtener el contenido como una matriz Numpy, que tendrá que hacer una copia de este modo:

buf = np.array([1,2,3,4]) 
indices = [3,0,1,2] 
contents = buf[indices] # copy 

Aún se pueden establecer los valores de los elementos in situ si se implementa __setitem__ y __setslice__.

+0

gracias. Pero, si ese trozo tiene que ser copiado de todos modos, ¿no será mejor usar collections.deque como un buffer en su lugar y luego hacer 'numpy.array (list (itertools.islice (buf, chstart, chend)))' ? ¿O es mucho más lento? – dmytro

+0

Quería evitar copiar porque al hacer una ventana deslizante FFT sobre esos datos, copiaría casi el mismo fragmento de datos cada vez que llegara un nuevo punto de datos – dmytro

+0

@dmytro: tendrás que medir si 'deque' es más rápido. Me temo que no será fácil evitar la copia si desea obtener datos almacenados en el búfer de anillo en una matriz. –

-1

Una variante de la respuesta de @Janne Karila, para C, pero no numpy:
Si el búfer de anillo es muy amplio, como N x 1G, entonces en vez de duplicar el conjunto cosa, duplicar una matriz de 2 * N punteros a sus filas. P. ej. para N = 3, inicializar

bufp = { buf[0], buf[1], buf[2], buf[0], buf[1], buf[2] }; 

A continuación, escribir datos solamente una vez, y anyfunc(bufp[j:j+3]) ve en las filas buf el fin del tiempo.

2

Creo que es necesario dar un paso atrás en el pensamiento C-estilo aquí. Actualizar un Ringbuffer para cada inserción nunca será eficiente. Un buffer de anillo es fundamentalmente diferente de la interfaz de bloque de memoria contigua que las matrices numpy demandan; incluyendo el nombre que mencionas que quieres hacer.

Una solución natural es sacrificar un poco de memoria por el bien del rendimiento. Por ejemplo, si la cantidad de elementos que necesita contener en su búfer es N, asigne una matriz de N + 1024 (o algún número sensible). Entonces solo necesita mover N elementos alrededor de cada 1024 inserciones, y siempre tiene una vista contigua de N elementos para actuar directamente disponible.

EDITAR: aquí hay un fragmento de código que implementa lo anterior, y debe proporcionar un buen rendimiento. Sin embargo, tenga en cuenta que se recomienda añadir en fragmentos, en lugar de por elemento. De lo contrario, las ventajas de rendimiento de usar numpy se anulan rápidamente, independientemente de cómo implemente su ringbuffer.

import numpy as np 

class RingBuffer(object): 
    def __init__(self, size, padding=None): 
     self.size = size 
     self.padding = size if padding is None else padding 
     self.buffer = np.zeros(self.size+self.padding) 
     self.counter = 0 

    def append(self, data): 
     """this is an O(n) operation""" 
     data = data[-self.padding:] 
     n = len(data) 
     if self.remaining < n: self.compact() 
     self.buffer[self.counter+self.size:][:n] = data 
     self.counter += n 

    @property 
    def remaining(self): 
     return self.padding-self.counter 
    @property 
    def view(self): 
     """this is always an O(1) operation""" 
     return self.buffer[self.counter:][:self.size] 
    def compact(self): 
     """ 
     note: only when this function is called, is an O(size) performance hit incurred, 
     and this cost is amortized over the whole padding space 
     """ 
     print 'compacting' 
     self.buffer[:self.size] = self.view 
     self.counter = 0 

rb = RingBuffer(10) 
for i in range(4): 
    rb.append([1,2,3]) 
    print rb.view 

rb.append(np.arange(15)) 
print rb.view #test overflow 
+0

Gracias Eelco. ¿Entonces una serie de vistas simplemente no funciona? ¿Pero cómo se detecta eso, y todo el Aptr reemplazado por una copia plana? Aptr [0] .flags tiene OWNDATA False, confuso. – denis

+0

No estoy seguro de lo que quiere decir con una variedad de vistas; las vistas son fáciles de construir sobre la marcha, por lo que no es necesario que las guardes en una matriz. El quid de la cuestión es que si intenta utilizar sus datos como una matriz numpy en cualquier punto, debe estar en la memoria contiguamente. O incurre en un golpe de rendimiento O (N) cada vez que necesita acceder a su ringbuffer contiguamente después de un apéndice, o necesita asignar memoria adicional, por lo que puede demorar y amortizar dichas operaciones según sea necesario para mantener su memoria contigua. –