3

Как видно из названия, я зашел в тупик и не знаю, почему. У меня есть несколько производителей и только один потребитель. Метод schedule_task будет вызван несколькими процессами после того, как поток называется get метод очередитупик из-за блокировки Queue.get() метод

from logging import getLogger 
from time import sleep 
from threading import Event, Thread 
from multiprocessing import Process 
from Queue import Queue 


class TaskExecutor(object): 
    def __init__(self): 
     print("init taskExecutor") 
     self.event = Event() 
     self.taskInfos = Queue() 
     task_thread = Thread(target=self._run_worker_thread) 
     self._instantEnd = False 
     self._running = True 
     task_thread.daemon = True 
     task_thread.start() 

    def _run_worker_thread(self): 
     print("start running taskExcecutor worker Thread") 
     while self.is_running(): 
      try: 
       print("try to get queued task from %s" % str(self.taskInfos)) 
       msg, task = self.taskInfos.get() 
       print("got task %s for msg: %s" % str(task), str(msg)) 
       task.execute(msg) 
       self.taskInfos.task_done() 
      except Exception, e: 
       logger.error("Error: %s" % e.message) 
     print("shutting down TaskExecutor!") 

    def is_running(self): 
     return True 

    def schedule_task(self, msg, task): 
     try: 
      print("appending task '%s' for msg: %s" % (str(task), str(msg))) 
      self.taskInfos.put((msg, task)) 
      print("into queue: %s " % str(self.taskInfos)) 
     except Exception, e: 
      print("queue is probably full: %s" % str(e)) 


class Task(object): 

    def execute(self, msg): 
     print(msg) 


executor = TaskExecutor() 

def produce(): 
    cnt = 0 
    while True: 
     executor.schedule_task("Message " + str(cnt), Task()) 
     cnt += 1 
     sleep(1) 

for i in range(4): 
    p = Process(target=produce) 
    p.start() 

Из моих журналов я получаю:

init taskExecutor 
start running taskExcecutor worker Thread 
try to get queued task from <Queue.Queue instance at 0x7fdd09830cb0> 
appending task '<__main__.Task object at 0x7fdd098f8f10>' for msg: Message 0 
into queue: <Queue.Queue instance at 0x7fdd09830cb0> 
appending task '<__main__.Task object at 0x7fdd098f8f10>' for msg: Message 0 
into queue: <Queue.Queue instance at 0x7fdd09830cb0> 
appending task '<__main__.Task object at 0x7fdd098f8f10>' for msg: Message 0 
into queue: <Queue.Queue instance at 0x7fdd09830cb0> 
appending task '<__main__.Task object at 0x7fdd098f8f10>' for msg: Message 0 
into queue: <Queue.Queue instance at 0x7fdd09830cb0> 
appending task '<__main__.Task object at 0x7fdd086f35d0>' for msg: Message 1 
into queue: <Queue.Queue instance at 0x7fdd09830cb0> 
appending task '<__main__.Task object at 0x7fdd086f3490>' for msg: Message 1 
into queue: <Queue.Queue instance at 0x7fdd09830cb0> 
appending task '<__main__.Task object at 0x7fdd086f3b10>' for msg: Message 1 
into queue: <Queue.Queue instance at 0x7fdd09830cb0> 
appending task '<__main__.Task object at 0x7fdd086f3b10>' for msg: Message 1 
into queue: <Queue.Queue instance at 0x7fdd09830cb0> 

Может кто-то пожалуйста, объясните, что мне не хватает?

ответ

3

Хотя другие люди не могут запускать этот код (это не самодостаточно), в нет очевидной проблемы, что вы показали. Итак, проблема в том, что вы не указали - возможно, в коде создания и использования экземпляров TaskExecutor.

И когда я подключил отсутствующие части, которые я сочинял из воздуха, этот код работал нормально.

Поэтому вам нужно показать больше, чем просто это. Как насчет замены:

logger.debug("try to get queued task") 

с

logger.debug("try to get queued task from queue %s", self.taskInfos) 

? Тогда, по крайней мере, мы могли видеть, используют ли ваши производители ту же очередь, что и ваш потребитель.

Следующая

Спасибо за добавив, что. Далее: вот вам отдельная программа, которую вы можете попробовать. Это очень похоже на ваш код. Смотрите, работает ли это правильно для вас (это не для меня):

from threading import Thread, Lock 
from Queue import Queue 

class Logger: 
    def __init__(self): 
     self.iolock = Lock() 

    def debug(self, str, *msg): 
     with self.iolock: 
      print str % msg 

    error = debug 

logger = Logger() 

class TaskExecutor(object): 
    def __init__(self): 
     logger.debug("init taskExecutor") 
     self.taskInfos = Queue() 
     task_thread = Thread(target=self._run_worker_thread) 
     task_thread.daemon = True 
     task_thread.start() 

    def is_running(self): 
     return True 

    def _run_worker_thread(self): 
     logger.debug("start running taskExcecutor worker Thread") 
     while self.is_running(): 
      try: 
       logger.debug("try to get queued task from queue %s", self.taskInfos) 
       msg, task = self.taskInfos.get() 
       logger.debug("got task %s for msg: %s", str(task), str(msg)) 
       #task.execute(msg) 
       self.taskInfos.task_done() 
      except Exception, e: 
       logger.error("Error: %s", e.message) 
     logger.debug("shutting down TaskExecutor!") 

    def schedule_task(self, msg, task): 
     try: 
      logger.debug("appending task '%s' for msg: %s", str(task), str(msg)) 
      self.taskInfos.put((msg, task)) 
      logger.debug("into queue: %s ", str(self.taskInfos)) 
     except Exception, e: 
      logger.debug("queue is probably full: %s", str(e)) 

te = TaskExecutor() 

def runit(): 
    for i in range(10): 
     te.schedule_task("some task", i) 

main = Thread(target=runit) 
main.start() 

Следующая

OK, этот код не может работать. На Linux-у системы, точно один экземпляр TaskExecutor создается здесь:

executor = TaskExecutor() 

Это происходит в основной программе. Каждый раз, когда вы делаете:

p = Process(target=produce) 

вашей основной программой fork() «под ред. В то время как разветвленные процессы также видят executor, это адресное пространство копия основной программы executor основной программы и не имеет ничего общего с executor в основной программе (обычная копия-на-запись fork()).

процесс Каждый ребенок также имеет копию членов данных executor «s, включая ее Queue.Все дочерние процессы помещают данные на свою собственную уникальную копию executor, но потребительский поток работает только в основной программе и ничего Рабочие процессы делают с их копией executor могут иметь какое-либо влияние на то, что потребитель главной программы поток видит.

Так что это действительно смущает. Я должен остановиться сейчас, чтобы попытаться выяснить, что вы можете сделать действительно хотите здесь делать ;-) Если вы хотите играть с идеями, исследуйте, используя multiprocessing.Queue. Только способ связи между процессами - использовать объекты, которые построены с нуля для поддержки межпроцессного взаимодействия. Queue.Queue никогда не сработает.

И еще один

Вот один, который хорошо работает в различных процессах, и даже на окнах ;-)

from time import sleep 
from threading import Thread 
from multiprocessing import Process, JoinableQueue 

class TaskExecutor(Thread): 
    def __init__(self): 
     print("init taskExecutor") 
     Thread.__init__(self) 
     self.taskInfos = JoinableQueue() 

    def getq(self): 
     return self.taskInfos 

    def run(self): 
     print("start running taskExcecutor worker Thread") 
     while self.is_running(): 
      try: 
       print("try to get queued task from %s" % self.taskInfos) 
       msg, task = self.taskInfos.get() 
       print("got task %s for msg: %s" % (task, msg)) 
       task.execute(msg) 
       self.taskInfos.task_done() 
      except Exception, e: 
       print("Error: %s" % e.message) 
     print("shutting down TaskExecutor!") 

    def is_running(self): 
     return True 

class Task(object): 
    def execute(self, msg): 
     print(msg) 

def produce(q): 
    cnt = 0 
    while True: 
     q.put(("Message " + str(cnt), Task())) 
     cnt += 1 
     sleep(1) 

if __name__ == "__main__": 
    executor = TaskExecutor() 
    executor.start() 
    for i in range(4): 
     p = Process(target=produce, args=(executor.getq(),)) 
     p.start() 

КРП __name__ == "__main__" часть не только позволяет ему работать на Windows, он имеет большое значение «документации», что делает очевидным с первого взгляда, что executor действительно запускает только в основной программе.

Вопрос в том, является ли это разделение труда, которое вы хотите. Вы действительно хотите, чтобы основной процесс - и только основной процесс - сделать все?

task.execute(msg) 

работа? Никоим образом отсюда не догадаться, хотите ли вы этого. Это то, что делает код.

Пункт стиля: обратите внимание, что это избавляется от метода schedule_task(). Параллельная обработка может быть сложной, и в течение десятилетий я считаю чрезвычайно полезным поддерживать межпоточную/межпроцессную связь как можно более простую и мертвую. Это означает, среди прочего, использование очередей сообщений непосредственно, а не, например, скрытие их в методах. Слои абстракции в этом контексте часто делают правильный код сложнее для создания, расширения, отладки и обслуживания.

+0

Я проверил, ваш, и он работает явно. Поэтому я сделал свой собственный самодостаточный пример. Может быть, вы можете проверить это –

+0

Спасибо! Я действительно не знал, будет ли это работать для вас, поэтому я рад, что вы это пробовали. Например, может быть, у вас есть поврежденная версия 'Queue.py' и т. Д. И т. Д. И т. Д. - просто методично здесь ;-) –

+1

Ах, это бесконечно сложнее, чем вы сказали в начале: вы не просто играете с потоками, вы также пытаетесь общаться через * процессы *. Далее: какую операционную систему вы используете? Ваш код вообще не будет запущен, например, Windows. –

Смежные вопросы