2012-05-25 5 views
2

У меня есть два потока (производитель и потребитель), и я делюсь данными с Queue. Проблема в том, что, когда я решительно прерываю производителя, потребитель иногда блокирует.Python threading deadlock

В документах я прочитал, что отмена потока с очередью может привести к повреждению очереди и вызвать тупик. Я не получаю никаких блокировок явно, но чтение источника Queue.py говорит, что put и get делают это.

Возможно, кто-нибудь знает, что это может быть так, что когда я прерываю нить, это может быть в середине get/put, то есть с помощью блокировки, а затем не отпускать его? Что я могу сделать с этим? Иногда мне нужно преждевременно прекратить работу продюсера. Используют ли процессы, а не потоки, какие-либо различия?

+3

Как вы прервать поток? Ctrl + C? –

+1

Нравится это: http://stackoverflow.com/questions/323972/is-there-any-way-to-kill-a-thread-in-python –

+0

можете ли вы опубликовать код, чтобы мы могли знать, что происходит? – betabandido

ответ

0

Скорее всего, ваш тупик из-за не законченных потоков. Если у вас есть Linux вы можете использовать форсунку от pyrasite для печати трассировки (вы знаете, где вы программа повесили)

Если вы используете какую-либо блокировку в обработчике сигнала - то, возможно, это ваш тупиковый (это немного сложнее, пожалуйста, спросите, хотите ли вы пояснить это)

Создание процессов вместо потоков, безусловно, меняет ситуацию, но помните, что любой обмен данными и синхронизация очень сложны.

+0

Как мне «закончить» нить? Я думал, что когда run() дойдет до конца, все будет готово. И я сам не использую обработчик сигналов, может быть, Queue.py? http://hg.python.org/cpython/file/2.7/Lib/Queue.py –

+0

Да, если run() достигнет конца, тогда он закончен (для элегантности основной поток должен делать «join()» на нем) -> вы можете добавить отладочную информацию, чтобы убедиться, что ваш поток (продюсер/консумент) закончился, потому что он может быть помещен в queue.get() или даже queue.put() (!) – ddzialak

+0

Я думаю, что это «повешение» - это именно то, что происходит. Вопрос в том, что я отменяю один поток, поэтому он не достигнет конца - что я должен делать? Я имею в виду, мне нужно прервать его, но тогда очередь, кажется, повесила ..? –

0

Может быть, это поможет:

import threading 

class MyQueue: 
    def __init__(self): 
     self.tasks = [] 
     self.tlock = threading.Semaphore(0) 
     self.dlock = threading.Lock() 
     self.aborted = False 

    def put(self, arg): 
     try: 
      self.dlock.acquire() 
      self.tasks.append(arg) 
     finally: 
      self.dlock.release() 
      self.tlock.release() 

    def get(self): 
     if self.aborted: 
      return None 
     self.tlock.acquire() 
     if self.aborted: 
      self.tlock.release() 
      return None 
     try: 
      self.dlock.acquire() 
      if self.tasks: 
       return self.tasks.pop() 
      else: # executed abort 
       return None 
     finally: 
      self.dlock.release() 

    def abort(self): 
     self.aborted = True 
     self.tlock.release() 

# TESTING 

mq = MyQueue() 
import sys 

def tlog(line): 
    sys.stdout.write("[ %s ] %s\n" % (threading.currentThread().name, line)) 
    sys.stdout.flush() 

def reader(): 
    arg = 1 
    while arg is not None: 
     tlog("start reading") 
     arg = mq.get() 
     tlog("read: %s" % arg) 
    tlog("END") 

import time, random 
def writer(): 
    try: 
     pos = 1 
     while not mq.aborted: 
      x = random.random() * 5 
      tlog("writer sleep (%s)" % x) 
      pending = x 
      while pending > 0: 
       tosleep = min(0.5, pending) 
       if mq.aborted: 
        return 
       time.sleep(tosleep) 
       pending -= tosleep 

      tlog("write: %s" % x) 
      mq.put("POS %s val=%s" % (pos, x)) 
      pos += 1 
    finally: 
     tlog("writer END") 

def testStart(): 
    try: 
     for i in xrange(9): 
      th = threading.Thread(None, reader, "reader %s" % i,(), {}, None) 
      th.start() 
     for i in xrange(3): 
      th = threading.Thread(None, writer, "writer %s" % i,(), {}, None) 
      th.start() 
     time.sleep(30) # seconds for testing 
    finally: 
     print "main thread: abort()" 
     mq.abort() 

if __name__ == "__main__": 
    testStart() 
+0

Большое спасибо, но мне нужно прервать поток как указано выше, потому что задача может занять слишком много времени. –