2014-12-11 2 views
0

У меня есть сценарий с 2 ​​нитями:Python threading with queue: как избежать использования соединения?

  1. поток ждет сообщений из сокета (встроенный в библиотеке C - блокирующий вызов «Barra.ricevi»), а затем положить элемент в очередь

  2. поток, ждущий, чтобы получить элемент из очереди и сделать что-то

Пример кода

import Barra 
import Queue  
import threading 

posQu = Queue.Queue(maxsize=0) 

def threadCAN(): 
    while True: 
     canMsg = Barra.ricevi("can0") 
     if canMsg[0] == 'ERR': 
      print (canMsg) 
     else: 
      print ("Enqueued message"), canMsg 
      posQu.put(canMsg) 

thCan = threading.Thread(target = threadCAN) 
thCan.daemon = True 
thCan.start() 

while True: 
    posMsg = posQu.get() 
    print ("Messagge from the queue"), posMsg 

В результате каждый раз, когда новое сообщение поступает из сокета, в очередь добавляется новый элемент, НО основной поток, который должен получать элементы из очереди, никогда не проснулся.

Выход следующим образом:

сообщения в очереди

сообщения в очереди

сообщения в очереди

сообщения в очереди

Я ожидал иметь:

сообщения в очереди

Messagge из очереди

сообщения в очереди

Messagge из очереди

Единственный способ решить эту проблему швы, чтобы добавить строку:

posQu.join() 

на конце нити, ожидая сообщения от розетки, а линия:

posQu.task_done() 

в конце главного потока.

В этом случае после этого новое сообщение принимается из сокета, поток блокируется, ожидая, пока основной поток обработает выделенный объект.

К сожалению, это неправильное поведение, так как я хочу, чтобы поток всегда был готов получать сообщения из сокета и не дожидался завершения задания из другого потока.

Что я делаю неправильно? Благодаря

Эндрю (Италия)

+1

Можете ли вы дать нам самодостаточный пример, который не требует библиотеки 'Barra'? Потому что, когда я заменяю код кодом, который просто дает ему случайное значение, он работает так, как вам бы хотелось. Поэтому я подозреваю, что есть проблема в другом коде, а не в этом коде. – abarnert

+0

Поскольку Pastebin, похоже, сейчас работает, вот что изменилось: замените 'import Barra'' import random' и 'canMsg = barra.cicevi (« can0 »)' с 'canMsg = ['ERR'], если он случайный. random() <.25 else [0, 1, 2] ', а затем запустите свой код, и вы увидите чередующиеся очереди и сообщения (часто в одной строке). – abarnert

+0

Кроме того, действительно ли CAN-bus имеет какое-либо отношение к этой программе? (Возможно, вы используете встроенную систему linux, у которой нет pthreads, поэтому она использует фиктивные потоки? Возможно, это может вызвать эту проблему ...) – abarnert

ответ

0

Это, вероятно, потому, что ваш Barra не освобождает глобальную блокировку интерпретатора (GIL) при Barra.ricevi. Однако вы можете проверить это.

GIL гарантирует, что только один поток может работать в любой момент времени (ограничивая полезность потоков в многопроцессорной системе). GIL переключает потоки каждые 100 «тиков» - тик свободно привязывается к инструкциям байткода. См. here для более подробной информации.

В вашей нити производителя не так много происходит за пределами вызова C-библиотеки. Это означает, что продюсерский поток будет звонить по телефону Barra.ricevi много раз, прежде чем GIL переключится на другой поток.

Решения этого являются, с точки зрения возрастающей сложности:

  • Вызов time.sleep(0) после добавления элемента в очередь. Это дает поток, так что другой поток может работать.
  • Используйте sys.setcheckinterval() для уменьшения количества «тиков», выполняемых перед переключением потоков. Это будет зависеть от того, что программа будет намного дороже вычислить.
  • Используйте multiprocessing, а не threading. Это включает использование multiprocessing.Queue вместо Queue.Queue.
  • Измените Barra так, чтобы он освобождал GIL при вызове его функций.

Пример использования multiprocessing. Имейте в виду, что при использовании многопроцессорности ваши процессы больше не имеют подразумеваемого общего состояния. Вам нужно будет взглянуть на многопроцессорность, чтобы узнать, как передавать информацию между процессами.

import Barra 
import multiprocessing 

def threadCAN(posQu): 
    while True: 
     canMsg = Barra.ricevi("can0") 
     if canMsg[0] == 'ERR': 
      print(canMsg) 
     else: 
      print("Enqueued message", canMsg) 
      posQu.put(canMsg) 

if __name__ == "__main__": 
    posQu = multiprocessing.Queue(maxsize=0) 
    procCan = multiprocessing.Process(target=threadCAN, args=(posQu,)) 
    procCan.daemon = True 
    procCan.start() 

    while True: 
     posMsg = posQu.get() 
     print("Messagge from the queue", posMsg) 
+0

Отлично! Большое спасибо. Это был ГИЛ! Я изменил свою библиотеку Barra, добавляя макросы: Py_BEGIN_ALLOW_THREADS и Py_END_ALLOW_THREADS, до и после блокирующего вызова ввода-вывода. – Atessadri

Смежные вопросы