Я создаю многопоточное приложение.python не может создать новую тему
У меня есть установка threadPool. [Очередь размера N и N рабочих, которые получают данные из очереди]
Когда все задачи выполняются я использую
tasks.join()
где задачи является очередью.
приложение, кажется, бежит гладко, пока suddently в какой-то момент (после 20 минут в примере) не завершается с ошибкой
thread.error: can't start new thread
Любые идеи?
Edit: Нити демон Потоки и код, как:
while True:
t0 = time.time()
keyword_statuses = DBSession.query(KeywordStatus).filter(KeywordStatus.status==0).options(joinedload(KeywordStatus.keyword)).with_lockmode("update").limit(100)
if keyword_statuses.count() == 0:
DBSession.commit()
break
for kw_status in keyword_statuses:
kw_status.status = 1
DBSession.commit()
t0 = time.time()
w = SWorker(threads_no=32, network_server='http://192.168.1.242:8180/', keywords=keyword_statuses, cities=cities, saver=MySqlRawSave(DBSession), loglevel='debug')
w.work()
print 'finished'
Когда нити демона убивают? Когда приложение заканчивается или когда работа() заканчивается?
Посмотрите на пул потоков и работник (это по рецепту)
from Queue import Queue
from threading import Thread, Event, current_thread
import time
event = Event()
class Worker(Thread):
"""Thread executing tasks from a given tasks queue"""
def __init__(self, tasks):
Thread.__init__(self)
self.tasks = tasks
self.daemon = True
self.start()
def run(self):
'''Start processing tasks from the queue'''
while True:
event.wait()
#time.sleep(0.1)
try:
func, args, callback = self.tasks.get()
except Exception, e:
print str(e)
return
else:
if callback is None:
func(args)
else:
callback(func(args))
self.tasks.task_done()
class ThreadPool:
"""Pool of threads consuming tasks from a queue"""
def __init__(self, num_threads):
self.tasks = Queue(num_threads)
for _ in range(num_threads): Worker(self.tasks)
def add_task(self, func, args=None, callback=None):
''''Add a task to the queue'''
self.tasks.put((func, args, callback))
def wait_completion(self):
'''Wait for completion of all the tasks in the queue'''
self.tasks.join()
def broadcast_block_event(self):
'''blocks running threads'''
event.clear()
def broadcast_unblock_event(self):
'''unblocks running threads'''
event.set()
def get_event(self):
'''returns the event object'''
return event
также может быть, проблема это потому, что я создаю объекты SWorker в цикле? Что происходит со старым SWorker (сборщик мусора?)?
Выглядит так, как будто вы нерестили нити, не собирая их должным образом. Может быть, ваш поток начнет и присоединится к коду и немного его окружения поможет? –
Все еще не хватает кода для локализации проблемы. – seriyPS