Sha256: 0e38d85068865af9ab2f75a7da7cf1cfe20d74353509cd0ee34e0b1d1d7fb2ce
Contents?: true
Size: 1.28 KB
Versions: 292
Compression:
Stored size: 1.28 KB
Contents
class BufferFullException(Exception): pass class BufferEmptyException(Exception): pass class CircularBuffer(object): def __init__(self, capacity): self.buffer = bytearray(capacity) self.read_point = 0 self.write_point = 0 # (protected) helper method to support python 2/3 def _update_buffer(self, data): try: self.buffer[self.write_point] = data except TypeError: self.buffer[self.write_point] = ord(data) def clear(self): self.buffer = bytearray(len(self.buffer)) def write(self, data): if all(self.buffer): raise BufferFullException self._update_buffer(data) self.write_point = (self.write_point + 1) % len(self.buffer) def overwrite(self, data): self._update_buffer(data) if all(self.buffer) and self.write_point == self.read_point: self.read_point = (self.read_point + 1) % len(self.buffer) self.write_point = (self.write_point + 1) % len(self.buffer) def read(self): if not any(self.buffer): raise BufferEmptyException data = chr(self.buffer[self.read_point]) self.buffer[self.read_point] = 0 self.read_point = (self.read_point + 1) % len(self.buffer) return data
Version data entries
292 entries across 292 versions & 1 rubygems