2016-12-02 3 views
0

Что такое эффективный способ, с помощью которого я могу вызвать некоторую функцию после изменения длины списка на определенную сумму?Как вызвать функцию после выполнения условия

У меня есть вложенный список, в который я добавляю данные 100 раз в секунду, и я хочу вызвать функцию после того, как длина списка увеличится на некоторое значение. Я попытался сделать это с помощью инструкции if внутри цикла while (см. Ниже my_loop()). Это работает, но эта, казалось бы, простая операция занимает 100% от одного из моих ядер процессора. Мне кажется, что постоянный запрос размера списка является лимитирующим фактором скрипта (добавление данных в список в цикле while не является ресурсоемким).

Вот что я пытался до сих пор:

from threading import Event, Thread 
import time 

def add_indefinitely(list_, kill_signal): 
    """ 
    list_ : list 
     List to which data is added. 
    kill_signal : threading.Event 
    """ 
    while not kill_signal.is_set(): 
     list_.append([1] * 32) 
     time.sleep(0.01) # Equivalent to 100 Hz. 


def my_loop(buffer_len, kill_signal): 
    """ 
    buffer_len: int, float 
     Size of the data buffer in seconds. Gets converted to n_samples 
     by multiplying by the sampling frequency (i.e., 100). 
    kill_signal : threading.Event 
    """ 
    buffer_len *= 100 
    b0 = len(list_) 
    while not kill_signal.is_set(): 
     if len(list_) - b0 > buffer_len: 
      b0 = len(list_) 
      print("Len of list_ is {}".format(b0)) 


list_ = [] 
kill_signal = Event() 
buffer_len = 2 # Print something every 2 seconds. 


data_thread = Thread(target=add_indefinitely, args=(list_, kill_signal)) 
data_thread.start() 

loop_thread = Thread(target=my_loop, args=(buffer_len, kill_signal)) 
loop_thread.start() 


def stop_all(): 
    """Stop appending to and querying the list. 
    SO users, call this function to clean up! 
    """ 
    kill_signal.set() 
    data_thread.join() 
    loop_thread.join() 

Пример вывода:

Len of list_ is 202 
Len of list_ is 403 
Len of list_ is 604 
Len of list_ is 805 
Len of list_ is 1006 
+1

Python многопоточность не не переключает активных потоков до ввода-вывода принимает/место или 'time.sleep()' называется тем, который в данный момент запущен. Это означает, что ваш код тратит большую часть времени на выполнение одного потока или другого. – martineau

+0

Спасибо. Это работало даже тогда, когда я использовал продолжительность сна 1 мс ('time.sleep (0.001)'). – Jakub

ответ

1

Это not very safe, чтобы получить доступ к списку из двух потоков, поэтому я предлагаю безопасный способ обмена данных между потоками. В CPython ваш код не повредит содержимое списка, но вы можете не получить ровно 200 элементов каждый раз, когда обрабатываете пакет. Если вы начали удалять элементы из списка в my_loop(), вы можете столкнуться с проблемами. Если вы используете другие версии Python без GIL, у вас может быть больше проблем.

До этого, однако, вот самое маленькое изменение, которое я могу решить, чтобы решить проблему, о которой вы спрашивали: использование ЦП. Я просто добавил сон my_loop() и очищены расчеты буфера, так что теперь сообщает в основном стабильный 201, 401, 601. Иногда я вижу 1002.

from threading import Event, Thread 
import time 

def add_indefinitely(list_, kill_signal): 
    """ 
    list_ : list 
     List to which data is added. 
    kill_signal : threading.Event 
    """ 
    while not kill_signal.is_set(): 
     list_.append([1] * 32) 
     time.sleep(0.01) # Equivalent to 100 Hz. 


def my_loop(buffer_len, kill_signal): 
    """ 
    buffer_len: int, float 
     Size of the data buffer in seconds. Gets converted to n_samples 
     by multiplying by the sampling frequency (i.e., 100). 
    kill_signal : threading.Event 
    """ 
    buffer_len *= 100 
    b0 = len(list_) 
    while not kill_signal.is_set(): 
     time.sleep(0.01) 
     if len(list_) - b0 >= buffer_len: 
      b0 += buffer_len 
      print("Len of list_ is {}".format(len(list_))) 


list_ = [] 
kill_signal = Event() 
buffer_len = 2 # Print something every 2 seconds. 


data_thread = Thread(target=add_indefinitely, args=(list_, kill_signal)) 
data_thread.start() 

loop_thread = Thread(target=my_loop, args=(buffer_len, kill_signal)) 
loop_thread.start() 


def stop_all(): 
    """Stop appending to and querying the list. 
    SO users, call this function to clean up! 
    """ 
    kill_signal.set() 
    data_thread.join() 
    loop_thread.join() 

time.sleep(30) 
stop_all() 

Теперь, чтобы сделать это безопасно, я предлагаю вам используйте queue. Это позволит многим потокам читать или записывать в очередь и обрабатывать сообщение. Если поток пытается прочитать из пустой очереди, он просто блокируется, пока какой-либо другой поток не добавит элемент в очередь.

Я не был уверен точно, что вы хотели делать с товарами, поэтому я просто поместил их в список и оставил их там. Однако теперь, когда список доступен только одним потоком, безопасно его очищать после обработки каждой партии из 100 элементов.

Поскольку блокировка my_loop() блокируется, при установке сигнала уничтожения это не обязательно будет замечаться. Вместо этого в очереди запросов я использовал часовое значение «Нет», чтобы сообщить ему, чтобы он отключился. Если это не сработает для вас, вы можете использовать таймаут при получении элемента из очереди, проверить сигнал об уничтожении, а затем попытаться получить элемент снова.

from threading import Event, Thread 
from queue import Queue 
import time 

def add_indefinitely(request_queue, kill_signal): 
    """ 
    list_ : list 
     List to which data is added. 
    kill_signal : threading.Event 
    """ 
    while not kill_signal.is_set(): 
     request_queue.put([1] * 32) 
     time.sleep(0.01) # Equivalent to 100 Hz. 
    request_queue.put(None) # Signal to shut down 


def my_loop(buffer_len, kill_signal): 
    """ 
    buffer_len: int, float 
     Size of the data buffer in seconds. Gets converted to n_samples 
     by multiplying by the sampling frequency (i.e., 100). 
    kill_signal : threading.Event 
    """ 
    received_items = [] # replaces list_ 
    buffer_len *= 100 
    while True: 
     item = request_queue.get() 
     if item is None: 
      break 
     received_items.append(item) 
     if len(received_items) % buffer_len == 0: 
      print("Len of received_items is {}".format(len(received_items))) 


request_queue = Queue() 
kill_signal = Event() 
buffer_len = 2 # Print something every 2 seconds. 


data_thread = Thread(target=add_indefinitely, args=(request_queue, kill_signal)) 
data_thread.start() 

loop_thread = Thread(target=my_loop, args=(buffer_len, kill_signal)) 
loop_thread.start() 


def stop_all(): 
    """Stop appending to and querying the list. 
    SO users, call this function to clean up! 
    """ 
    kill_signal.set() 
    data_thread.join() 
    loop_thread.join() 

time.sleep(30) 
stop_all() 
0

Я не уверен, если я полностью получить то, что вы хотите сделать, но вот моя идея:

add_indefinitely() реализован в классе MyThread, который принимает экземпляр класса Manager как аргумент parent. Когда встречается increment, MyThread экземпляр вызовов parent's on_increment() способ делать вещи.

from threading import Event, Thread 
import time 


class Manager(object): 

    def on_increment(self, thread): 
     print thread.get_data_len() 


class MyThread(Thread): 

    def __init__(self, parent, increment, kill_signal): 
     super(MyThread, self).__init__() 
     self.parent = parent 
     self.data = [] 
     self.increment = increment 
     self.kill_signal = kill_signal 

    def run(self): 
     while not self.kill_signal.is_set(): 
      self.data.append([1] * 32) 
      time.sleep(0.01) 
      if len(self.data) % self.increment == 0: 
       self.parent.on_increment(self) 

    def get_data_len(self): 
     return len(self.data) 


kill_signal = Event() 
manager = Manager() 
thread = MyThread(manager, 200, kill_signal) 
thread.deamon = True 


try: 
    thread.start() 
    while True: 
     time.sleep(1) 
except (KeyboardInterrupt, SystemExit): 
    kill_signal.set() 
    thread.join() 
0

Было бы намного эффективнее не иметь отдельное зацикливание нити и проверить длину списка, но вместо того, чтобы только один из них, которые будут заблокированы и ждали это событие произойдет. Вот пример, иллюстрирующий, как сделать что-то подобное с дополнительными threading.Lock объект, добавленный для управления параллельным доступом к глобальным переменным или ресурсам (как печать на stdout):

from threading import Event, Lock, Thread 
import time 

THRESHOLD = 100 # minimum number of items in list_ before event triggered 
list_ = [] 
list_lock = Lock() # to control access to global list_ 
length_signal = Event() 
print_lock = Lock() # to control concurrent printing 

def add_indefinitely(): 
    while True: 
     with list_lock: 
      list_.append([1] * 32) 
      if len(list_) >= THRESHOLD: 
       with print_lock: 
        print('setting length_signal') 
       length_signal.set() 
     time.sleep(.01) # give other threads a change to run 

def length_reached(): 
    """ 
    Waits until list_ has reached a certain length, and then print message 
    """ 
    with print_lock: 
     print('waiting for list_ to reach threshold length') 
    length_signal.wait() # blocks until event is set 
    with print_lock: 
     with list_lock: 
      print('list_ now contains at least {} items'.format(len(list_))) 

# first start thread that will wait for the length to be reached 
length_reached_thread = Thread(target=length_reached) 
length_reached_thread.start() 

data_thread = Thread(target=add_indefinitely) 
data_thread.daemon = True 
data_thread.start() 

length_reached_thread.join() 
with print_lock: 
    print('finished') 
Смежные вопросы