For the past few years I’ve been teaching few modules for Master students, such as programming or signal processing. In most of them, one of the main and important tasks is to write a data buffer. Usual requirement is to be able to return last N samples or T seconds. However, conceptually most of students understands, implementing this seems to be quite difficult. But it is not. Unfortunately their first line of action, rather than think themselves, is to search for the solution on the Internet. There are many attempts, better or worse, but since they are looking for it, I could also provide with my own solution. Hopefully it helps others as well.

Below is Data Buffer class that saves size of last data, update via update method. First implementation uses strictly Python’s native lists, whereas the other uses NumPy arrays.

Python’s list:

class DataBuffer(object): def __init__(self, size): " Creates 2D list buffer of tuple 'size' size. " self.size = size self.buffer = [ [0]*size[1] for i in range(size[0]) ] def update(self, data): " Updates buffer with input data." self.buffer = [self.buffer[i][1:] + [data[i]] for i in range(self.size[0])] def getData(self, d1=[0,-1], d2=[0,-1]): " Returns buffer in given ranges (default: whole buffer)." # Converts negative indices into absolute value if d1[0]<0: d1[0] = self.size[0] + int(d1[0]) + 1 if d1[1]<0: d1[1] = self.size[0] + int(d1[1]) + 1 if d2[0]<0: d2[0] = self.size[1] + int(d2[0]) + 1 if d2[1]<0: d2[1] = self.size[1] + int(d2[1]) + 1 # Make sure that indices are in increasing order d1 = [min(d1), max(d1)] d2 = [min(d2), max(d2)] _buf = [ self.buffer[i][d2[0]:d2[1]] for i in range(d1[0],d1[1])] return _buf

Numpy’s arrays:

class DataBuffer(object): def __init__(self, size): "Creates buffer of 'size' size." self.size = size self.buffer = np.zeros( size ) def update(self, data): "Updates buffer with data." self.buffer[:, :-1] = self.buffer[:, 1:] self.buffer[:, -1] = data def getData(self, d1=[0,-1], d2=[0,-1]): "Prints on screen content of buffer." d1 = np.array(d1) d1[d1<;0] += self.size[0] + 1 d1.sort() d2 = np.array(d2) d2[d2<0] += self.size[1] + 1 d2.sort() return self.buffer[d1[0]:d1[1], d2[0]:d2[1]]

Usage example:

if __name__ == "__main__" import numpy as np import random r = random.randint size = (5,10) B = DataBuffer( size ) for i in range(20): data = [r( 0, 10) for i in xrange(size[0])] B.update( data) print print B.getData([0,2], [1,-2]) print B.getData([2,-1], [1,6])