2010-11-11 3 views
72

Я хочу создать эффективный circular buffer в python (с целью получения средних значений целочисленных значений в буфере).эффективный круговой буфер?

Это эффективный способ использования списка для сбора значений?

def add_to_buffer(self, num): 
    self.mylist.pop(0) 
    self.mylist.append(num) 

Что было бы более эффективным (и почему)?

ответ

137

Я хотел бы использовать collections.deque с maxlen агд

>>> import collections 
>>> d = collections.deque(maxlen=10) 
>>> d 
deque([], maxlen=10) 
>>> for i in xrange(20): 
...  d.append(i) 
... 
>>> d 
deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10) 

Существует в документации для deque, что похоже на то, что вы хотите recipe. Мое утверждение о том, что это наиболее эффективно, полностью зависит от того, что он реализован на C невероятно опытным экипажем, который привык проворачивать верхний код.

+5

+1 Да, это хорошие батареи, включенные в дорогу. Операции для кругового буфера - O (1), и, как вы говорите, дополнительные накладные расходы находятся на C, поэтому все равно должно быть довольно быстро –

+1

Мне не нравится это решение, потому что документы не гарантируют O (1) случайный доступ, когда ' maxlen'. O (n) понятна, когда 'deque' может вырасти до бесконечности, но если задан' maxlen', индексирование элемента должно быть постоянным. – lvella

+0

Я предполагаю, что он реализован как связанный список, а не массив. –

9

выскакивает из головы списка вызывает весь список, который нужно скопировать, так неэффективен

Вы должны вместо этого использовать список/массив фиксированного размера и индекс, который перемещается через буфер, как вы добавить/удалить пункты

+2

Согласен.Независимо от того, насколько элегантный или неэлегантный он может выглядеть или какой-либо язык используется. В действительности, чем меньше вы беспокоитесь о сборщике мусора (или диспетчер кучи, или механизмы подкачки/картографии или что-то, что делает реальную магию памяти), тем лучше. – 2010-11-11 04:56:58

+0

@RocketSurgeon Это не волшебство, просто это массив, первый элемент которого удален. Таким образом, для массива размером n это означает операции копирования n-1. Здесь не задействован сборщик мусора или подобное устройство. – Christian

+2

Согласен. Делать это также намного проще, чем думают некоторые люди. Просто используйте постоянно увеличивающийся счетчик и используйте оператор modulo (% arraylen) при доступе к элементу. –

5

нормально с использованием класса DEQUE, но для requeriments вопроса (в среднем), это мое решение:

>>> from collections import deque 
>>> class CircularBuffer(deque): 
...  def __init__(self, size=0): 
...    super(CircularBuffer, self).__init__(maxlen=size) 
...  @property 
...  def average(self): # TODO: Make type check for integer or floats 
...    return sum(self)/len(self) 
... 
>>> 
>>> cb = CircularBuffer(size=10) 
>>> for i in range(20): 
...  cb.append(i) 
...  print "@%s, Average: %s" % (cb, cb.average) 
... 
@deque([0], maxlen=10), Average: 0 
@deque([0, 1], maxlen=10), Average: 0 
@deque([0, 1, 2], maxlen=10), Average: 1 
@deque([0, 1, 2, 3], maxlen=10), Average: 1 
@deque([0, 1, 2, 3, 4], maxlen=10), Average: 2 
@deque([0, 1, 2, 3, 4, 5], maxlen=10), Average: 2 
@deque([0, 1, 2, 3, 4, 5, 6], maxlen=10), Average: 3 
@deque([0, 1, 2, 3, 4, 5, 6, 7], maxlen=10), Average: 3 
@deque([0, 1, 2, 3, 4, 5, 6, 7, 8], maxlen=10), Average: 4 
@deque([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], maxlen=10), Average: 4 
@deque([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], maxlen=10), Average: 5 
@deque([2, 3, 4, 5, 6, 7, 8, 9, 10, 11], maxlen=10), Average: 6 
@deque([3, 4, 5, 6, 7, 8, 9, 10, 11, 12], maxlen=10), Average: 7 
@deque([4, 5, 6, 7, 8, 9, 10, 11, 12, 13], maxlen=10), Average: 8 
@deque([5, 6, 7, 8, 9, 10, 11, 12, 13, 14], maxlen=10), Average: 9 
@deque([6, 7, 8, 9, 10, 11, 12, 13, 14, 15], maxlen=10), Average: 10 
@deque([7, 8, 9, 10, 11, 12, 13, 14, 15, 16], maxlen=10), Average: 11 
@deque([8, 9, 10, 11, 12, 13, 14, 15, 16, 17], maxlen=10), Average: 12 
@deque([9, 10, 11, 12, 13, 14, 15, 16, 17, 18], maxlen=10), Average: 13 
@deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10), Average: 14 
+0

Я получаю 'TypeError: 'numpy.float64' объект не вызываем' при попытке вызвать метод' average' – scls

+4

Это для моего сообщения ?, код не использует numpy в любом месте. ¿? – SmartElectron

+0

Да ... на самом деле я предполагаю, что deque использует numpy массивы внутри себя (после удаления @property это работает нормально) – scls

3

Вы также можете увидеть это довольно старый Python recipe.

Вот мой вариант с NumPy массива:

#!/usr/bin/env python 

import numpy as np 

class RingBuffer(object): 
    def __init__(self, size_max, default_value=0.0, dtype=float): 
     """initialization""" 
     self.size_max = size_max 

     self._data = np.empty(size_max, dtype=dtype) 
     self._data.fill(default_value) 

     self.size = 0 

    def append(self, value): 
     """append an element""" 
     self._data = np.roll(self._data, 1) 
     self._data[0] = value 

     self.size += 1 

     if self.size == self.size_max: 
      self.__class__ = RingBufferFull 

    def get_all(self): 
     """return a list of elements from the oldest to the newest""" 
     return(self._data) 

    def get_partial(self): 
     return(self.get_all()[0:self.size]) 

    def __getitem__(self, key): 
     """get element""" 
     return(self._data[key]) 

    def __repr__(self): 
     """return string representation""" 
     s = self._data.__repr__() 
     s = s + '\t' + str(self.size) 
     s = s + '\t' + self.get_all()[::-1].__repr__() 
     s = s + '\t' + self.get_partial()[::-1].__repr__() 
     return(s) 

class RingBufferFull(RingBuffer): 
    def append(self, value): 
     """append an element when buffer is full""" 
     self._data = np.roll(self._data, 1) 
     self._data[0] = value 
+0

+1 для использования numpy – shx2

+0

+1 для использования numpy, но -1 для реализации циклического буфера. Как вы его реализовали, вы меняете все данные каждый раз, когда добавляете один элемент, это стоит «O (n)» время. Чтобы реализовать правильный [круглый буфер] (https://en.wikipedia.org/wiki/Circular_buffer), вы должны иметь как индекс, так и переменную размера, и вам нужно правильно обрабатывать случай, когда данные «обертываются», конец буфера. При извлечении данных вам может понадобиться конкатенация двух разделов в начале и в конце буфера. –

1

Это одна не требует какой-либо библиотеки. Он увеличивает список, а затем циклически изменяется по индексу.

Отпечаток очень маленький (без библиотеки), и он работает в два раза быстрее, чем по меньшей мере. Это полезно для вычисления скользящих средних, но помните, что элементы не сортируются по возрасту, как указано выше.

class CircularBuffer(object): 
    def __init__(self, size): 
     """initialization""" 
     self.index= 0 
     self.size= size 
     self._data = [] 

    def record(self, value): 
     """append an element""" 
     if len(self._data) == self.size: 
      self._data[self.index]= value 
     else: 
      self._data.append(value) 
     self.index= (self.index + 1) % self.size 

    def __getitem__(self, key): 
     """get element by index like a regular array""" 
     return(self._data[key]) 

    def __repr__(self): 
     """return string representation""" 
     return self._data.__repr__() + ' (' + str(len(self._data))+' items)' 

    def get_all(self): 
     """return a list of all the elements""" 
     return(self._data) 

Чтобы получить среднее значение, например .:

q= CircularBuffer(1000000); 
for i in range(40000): 
    q.record(i); 
print "capacity=", q.size 
print "stored=", len(q.get_all()) 
print "average=", sum(q.get_all())/len(q.get_all()) 

Результаты в:

capacity= 1000000 
stored= 40000 
average= 19999 

real 0m0.024s 
user 0m0.020s 
sys 0m0.000s 

Это примерно 1/3 времени эквивалента с DEQUEUE.

0

Оригинальный вопрос: "эффективность" кольцевой буфер. В соответствии с такой эффективностью, ответ от aaronasterling кажется окончательно правильным. Использование выделенного класса, запрограммированного в Python, и сравнение обработки времени с коллекциями.deque показывает ускорение x5.2 раз с deque! Вот очень простой код, чтобы проверить это:

class cb: 
    def __init__(self, size): 
     self.b = [0]*size 
     self.i = 0 
     self.sz = size 
    def append(self, v): 
     self.b[self.i] = v 
     self.i = (self.i + 1) % self.sz 

b = cb(1000) 
for i in range(10000): 
    b.append(i) 
# called 200 times, this lasts 1.097 second on my laptop 

from collections import deque 
b = deque([], 1000) 
for i in range(10000): 
    b.append(i) 
# called 200 times, this lasts 0.211 second on my laptop 

Чтобы превратить Deque в список, просто использовать:

my_list = [v for v in my_deque] 

Вы тогда получите O (1) случайный доступ к DEQUE пунктов. Конечно, это полезно только в том случае, если вам нужно сделать много произвольных обращений к deque после того, как вы установили его один раз.

4

Основано на MoonCactus's answer, здесь circularlist класс. Разница с его версией заключается в том, что здесь c[0] всегда будет иметь старейший прилагаемый элемент, c[-1] последний добавленный элемент, c[-2] предпоследний ... Это более естественно для приложений.

c = circularlist(4) 
c.append(1); print c, c[0], c[-1] #[1] (1 items)    first 1, last 1 
c.append(2); print c, c[0], c[-1] #[1, 2] (2 items)   first 1, last 2 
c.append(3); print c, c[0], c[-1] #[1, 2, 3] (3 items)  first 1, last 3 
c.append(8); print c, c[0], c[-1] #[1, 2, 3, 8] (4 items)  first 1, last 8 
c.append(10); print c, c[0], c[-1] #[10, 2, 3, 8] (4 items) first 2, last 10 
c.append(11); print c, c[0], c[-1] #[10, 11, 3, 8] (4 items) first 3, last 11 

Класс:

class circularlist(object): 
    def __init__(self, size): 
     """Initialization""" 
     self.index = 0 
     self.size = size 
     self._data = [] 

    def append(self, value): 
     """Append an element""" 
     if len(self._data) == self.size: 
      self._data[self.index] = value 
     else: 
      self._data.append(value) 
     self.index = (self.index + 1) % self.size 

    def __getitem__(self, key): 
     """Get element by index, relative to the current index""" 
     if len(self._data) == self.size: 
      return(self._data[(key + self.index) % self.size]) 
     else: 
      return(self._data[key]) 

    def __repr__(self): 
     """Return string representation""" 
     return self._data.__repr__() + ' (' + str(len(self._data))+' items)' 
1

Я имел эту проблему, прежде чем делать серийные программирования. В то время чуть больше года назад я не мог найти никаких эффективных реализаций, поэтому я закончил писать one as a C extension, а также доступен on pypi по лицензии MIT. Он супер базовый, обрабатывает только буферы 8-разрядных подписанных символов, но имеет гибкую длину, поэтому вы можете использовать Struct или что-то поверх него, если вам нужно что-то другое, кроме символов. Теперь я вижу, что в Google google есть несколько вариантов, поэтому вы можете также взглянуть на них.

0

Вы отвечаете неправильно. Кольцевой буфер Основной имеют две основой работы наноэлектронных (https://en.wikipedia.org/wiki/Circular_buffer)

  1. Lenth буфера выставиться;
  2. Первый в первом;
  3. При добавлении или удалении элемента, остальные элементы не должны переместить их положение

код ниже:

def add_to_buffer(self, num): 
    self.mylist.pop(0) 
    self.mylist.append(num) 

Давайте рассмотрим ситуацию, что список полон, при использовании кода :

self.mylist = [1, 2, 3, 4, 5] 

Теперь мы добавляем 6, список изменяется на

self.mylist = [2, 3, 4, 5, 6] 

элементы ожидают 1 в списке изменилось их положение

ваш код является очереди, а не круг буфера.

Ответ Basj, я думаю, является наиболее эффективным.

К слову, буфер окружности может повысить эффективность операции , чтобы добавить элемент.

0

Это применяется к тому же принципу для некоторых буферов, предназначенных для хранения последних текстовых сообщений.

import time 
import datetime 
import sys, getopt 

class textbffr(object): 
    def __init__(self, size_max): 
     #initialization 
     self.posn_max = size_max-1 
     self._data = [""]*(size_max) 
     self.posn = self.posn_max 

    def append(self, value): 
     #append an element 
     if self.posn == self.posn_max: 
      self.posn = 0 
      self._data[self.posn] = value 
     else: 
      self.posn += 1 
      self._data[self.posn] = value 

    def __getitem__(self, key): 
     #return stored element 
     if (key + self.posn+1) > self.posn_max: 
      return(self._data[key - (self.posn_max-self.posn)]) 
     else: 
      return(self._data[key + self.posn+1]) 


def print_bffr(bffr,bffer_max): 
    for ind in range(0,bffer_max): 
     stored = bffr[ind] 
     if stored != "": 
      print(stored) 
    print ('\n') 

def make_time_text(time_value): 
    return(str(time_value.month).zfill(2) + str(time_value.day).zfill(2) 
     + str(time_value.hour).zfill(2) + str(time_value.minute).zfill(2) 
     + str(time_value.second).zfill(2)) 


def main(argv): 
    #Set things up 
    starttime = datetime.datetime.now() 
    log_max = 5 
    status_max = 7 
    log_bffr = textbffr(log_max) 
    status_bffr = textbffr(status_max) 
    scan_count = 1 

    #Main Loop 
    # every 10 secounds write a line with the time and the scan count. 
    while True: 

     time_text = make_time_text(datetime.datetime.now()) 
     #create next messages and store in buffers 
     status_bffr.append(str(scan_count).zfill(6) + " : Status is just fine at : " + time_text) 
     log_bffr.append(str(scan_count).zfill(6) + " : " + time_text + " : Logging Text ") 

     #print whole buffers so far 
     print_bffr(log_bffr,log_max) 
     print_bffr(status_bffr,status_max) 

     time.sleep(2) 
     scan_count += 1 

if __name__ == '__main__': 
    main(sys.argv[1:]) 
Смежные вопросы