2012-06-28 5 views
1

Я создаю многопоточное приложение.python не может создать новую тему

У меня есть установка threadPool. [Очередь размера N и N рабочих, которые получают данные из очереди]

Когда все задачи выполняются я использую

tasks.join() 

где задачи является очередью.

приложение, кажется, бежит гладко, пока suddently в какой-то момент (после 20 минут в примере) не завершается с ошибкой

thread.error: can't start new thread 

Любые идеи?

Edit: Нити демон Потоки и код, как:

while True: 
    t0 = time.time() 
    keyword_statuses = DBSession.query(KeywordStatus).filter(KeywordStatus.status==0).options(joinedload(KeywordStatus.keyword)).with_lockmode("update").limit(100) 
    if keyword_statuses.count() == 0: 
     DBSession.commit() 
     break 

    for kw_status in keyword_statuses: 
     kw_status.status = 1 
     DBSession.commit() 

    t0 = time.time() 
    w = SWorker(threads_no=32, network_server='http://192.168.1.242:8180/', keywords=keyword_statuses, cities=cities, saver=MySqlRawSave(DBSession), loglevel='debug') 

    w.work() 

print 'finished' 

Когда нити демона убивают? Когда приложение заканчивается или когда работа() заканчивается?

Посмотрите на пул потоков и работник (это по рецепту)

from Queue import Queue 
from threading import Thread, Event, current_thread 
import time 

event = Event() 

class Worker(Thread): 
    """Thread executing tasks from a given tasks queue""" 

    def __init__(self, tasks): 
     Thread.__init__(self) 

     self.tasks = tasks 
     self.daemon = True 
     self.start() 


    def run(self): 
     '''Start processing tasks from the queue''' 
     while True: 
      event.wait() 
      #time.sleep(0.1) 
      try: 
       func, args, callback = self.tasks.get() 
      except Exception, e: 
       print str(e) 
       return 
      else: 
       if callback is None: 
        func(args) 
       else: 
        callback(func(args)) 

       self.tasks.task_done() 

class ThreadPool: 
    """Pool of threads consuming tasks from a queue""" 

    def __init__(self, num_threads): 
     self.tasks = Queue(num_threads) 
     for _ in range(num_threads): Worker(self.tasks) 


    def add_task(self, func, args=None, callback=None): 
     ''''Add a task to the queue''' 
     self.tasks.put((func, args, callback)) 


    def wait_completion(self): 
     '''Wait for completion of all the tasks in the queue''' 
     self.tasks.join() 


    def broadcast_block_event(self): 
     '''blocks running threads''' 
     event.clear() 


    def broadcast_unblock_event(self): 
     '''unblocks running threads''' 
     event.set() 


    def get_event(self): 
     '''returns the event object''' 
     return event 

также может быть, проблема это потому, что я создаю объекты SWorker в цикле? Что происходит со старым SWorker (сборщик мусора?)?

+5

Выглядит так, как будто вы нерестили нити, не собирая их должным образом. Может быть, ваш поток начнет и присоединится к коду и немного его окружения поможет? –

+0

Все еще не хватает кода для локализации проблемы. – seriyPS

ответ

4

Недостаточно кода для локализации проблемы, но я уверен, что это связано с тем, что вы не используете потоки и запускаете слишком много из них. Вы видели канонический пример из документации python Queue http://docs.python.org/library/queue.html (внизу страницы)?

я могу воспроизвести проблему с помощью следующего кода:

import threading 
import Queue 

q = Queue.Queue() 

def worker(): 
    item = q.get(block=True) # sleeps forever for now 
    do_work(item) 
    q.task_done() 

# create infinite number of workers threads and fails 
# after some time with "error: can't start new thread" 
while True: 
    t = threading.Thread(target=worker) 
    t.start() 
q.join() # newer reached this 

Вместо этого вы должны создать опрос потоков с известным числом нитей и поместить данные в очередь как:

q = Queue() 

def worker(): 
    while True: 
     item = q.get() 
     do_work(item) 
     q.task_done() 

for i in range(num_worker_threads): 
    t = Thread(target=worker) 
    t.daemon = True 
    t.start() 

for item in source(): 
    q.put(item) 

q.join()  # block until all tasks are done 

UPD : Если вам нужно остановить какой-либо поток, вы можете добавить к нему флаг или отправить специальную метку «останов» для разрыва while петля:

class Worker(Thread): 
    break_msg = object() # just uniq mark sign 

    def __init__(self): 
     self.continue = True 

    def run(): 
     while self.continue: # can stop and destroy thread, (var 1) 
      msg = queue.get(block=True) 
      if msg == self.break_msg: 
       return # will stop and destroy thread (var 2) 
      do_work() 
      queue.task_done() 

workers = [Worker() for _ in xrange(num_workers)] 
for w in workers: 
    w.start() 
for task in tasks: 
    queue.put(task) 

for _ in xrange(num_workers): 
    queue.put(Worker.break_msg) # stop thread after all tasks done. Need as many messages as many threads you have 
OR 
queue.join() # wait until all tasks done 
for w in workers: 
    w.continue = False 
    w.put(None) 
+0

Я уже использую 32 потока. Я не думаю, что их слишком много –

+0

Итак, я не знаю, что делает 'SWorker' и как он взаимодействует с' Worker' и 'WorkerPool', но если ваш' while True: t0 = time.time() .. .' цикл делает больше одного цикла - у вас есть утечка потоков, потому что у вас нет «break» в методе run(). Я буду обновлять мой ответ, пожалуйста, подождите – seriyPS

+0

'queue.join() # ждать, пока все задачи не сделали для ш в рабочих: w.continue = False w.put (None)' сделал трюк. –

Смежные вопросы