Как видно из названия, я зашел в тупик и не знаю, почему. У меня есть несколько производителей и только один потребитель. Метод schedule_task
будет вызван несколькими процессами после того, как поток называется get
метод очередитупик из-за блокировки Queue.get() метод
from logging import getLogger
from time import sleep
from threading import Event, Thread
from multiprocessing import Process
from Queue import Queue
class TaskExecutor(object):
def __init__(self):
print("init taskExecutor")
self.event = Event()
self.taskInfos = Queue()
task_thread = Thread(target=self._run_worker_thread)
self._instantEnd = False
self._running = True
task_thread.daemon = True
task_thread.start()
def _run_worker_thread(self):
print("start running taskExcecutor worker Thread")
while self.is_running():
try:
print("try to get queued task from %s" % str(self.taskInfos))
msg, task = self.taskInfos.get()
print("got task %s for msg: %s" % str(task), str(msg))
task.execute(msg)
self.taskInfos.task_done()
except Exception, e:
logger.error("Error: %s" % e.message)
print("shutting down TaskExecutor!")
def is_running(self):
return True
def schedule_task(self, msg, task):
try:
print("appending task '%s' for msg: %s" % (str(task), str(msg)))
self.taskInfos.put((msg, task))
print("into queue: %s " % str(self.taskInfos))
except Exception, e:
print("queue is probably full: %s" % str(e))
class Task(object):
def execute(self, msg):
print(msg)
executor = TaskExecutor()
def produce():
cnt = 0
while True:
executor.schedule_task("Message " + str(cnt), Task())
cnt += 1
sleep(1)
for i in range(4):
p = Process(target=produce)
p.start()
Из моих журналов я получаю:
init taskExecutor
start running taskExcecutor worker Thread
try to get queued task from <Queue.Queue instance at 0x7fdd09830cb0>
appending task '<__main__.Task object at 0x7fdd098f8f10>' for msg: Message 0
into queue: <Queue.Queue instance at 0x7fdd09830cb0>
appending task '<__main__.Task object at 0x7fdd098f8f10>' for msg: Message 0
into queue: <Queue.Queue instance at 0x7fdd09830cb0>
appending task '<__main__.Task object at 0x7fdd098f8f10>' for msg: Message 0
into queue: <Queue.Queue instance at 0x7fdd09830cb0>
appending task '<__main__.Task object at 0x7fdd098f8f10>' for msg: Message 0
into queue: <Queue.Queue instance at 0x7fdd09830cb0>
appending task '<__main__.Task object at 0x7fdd086f35d0>' for msg: Message 1
into queue: <Queue.Queue instance at 0x7fdd09830cb0>
appending task '<__main__.Task object at 0x7fdd086f3490>' for msg: Message 1
into queue: <Queue.Queue instance at 0x7fdd09830cb0>
appending task '<__main__.Task object at 0x7fdd086f3b10>' for msg: Message 1
into queue: <Queue.Queue instance at 0x7fdd09830cb0>
appending task '<__main__.Task object at 0x7fdd086f3b10>' for msg: Message 1
into queue: <Queue.Queue instance at 0x7fdd09830cb0>
Может кто-то пожалуйста, объясните, что мне не хватает?
Я проверил, ваш, и он работает явно. Поэтому я сделал свой собственный самодостаточный пример. Может быть, вы можете проверить это –
Спасибо! Я действительно не знал, будет ли это работать для вас, поэтому я рад, что вы это пробовали. Например, может быть, у вас есть поврежденная версия 'Queue.py' и т. Д. И т. Д. И т. Д. - просто методично здесь ;-) –
Ах, это бесконечно сложнее, чем вы сказали в начале: вы не просто играете с потоками, вы также пытаетесь общаться через * процессы *. Далее: какую операционную систему вы используете? Ваш код вообще не будет запущен, например, Windows. –