Я использую pandas для расчета статистики и т. Д. На большом количестве данных, но в итоге работает в течение нескольких часов, и я часто получаю новые данные. Я пытался оптимизировать уже, но я хотел бы сделать это быстрее, поэтому я пытаюсь использовать его в нескольких процессах. Проблема, с которой я сталкиваюсь, заключается в том, что мне нужно выполнить некоторую промежуточную работу с результатами по мере их завершения, а примеры, которые я видел для multiprocessing.Process
и Pool
, ждут завершения всего, прежде чем работать с результатами.python multiprocessing - как действовать на промежуточные результаты
Это сильно обрезанный код, который я использую сейчас. Часть, которую я хочу включить в отдельные процессы, - это generateAnalytics().
for counter, symbol in enumerate(queuelist): # queuelist
if needQueueLoad: # set by another thread that's monitoring for new data (in the form of a new file that arrives a couple times a day)
log.info('Shutting down analyticsRunner thread')
break
dfDay = generateAnalytics(symbol) # slow running function (15s+)
astore[analyticsTable(symbol)] = dfDay # astore is a pandas store (HDF5). analyticsTable() returns the name of the appropriate table, which gets overwritten
dfLatest.loc[symbol] = dfDay.iloc[-1] # update with the latest results (dfLatest is the latest results for each symbol, which is loaded as a global at startup and periodically saved back to the store in another thread)
log.info('Processed {}/{} securities in queue.'.format(counter+1, len(queuelist)))
# do some stuff to update progress GUI
Я не могу понять, как заставить последние линии работать с результатами, пока они продолжаются, и будут благодарны за предложения.
Я рассматриваю все это работает в Pool
и с процессами добавить результаты в Queue
(вместо того, чтобы вернуть их), а затем сидячую петлю, а в основном процессе стаскивать очередь, как результаты приходят в - было бы разумным способом сделать это? Что-то вроде:
mpqueue = multiprocessing.Queue()
pool = multiprocessing.Pool()
pool.map(generateAnalytics, [queuelist, mpqueue])
while not needQueueLoad: # set by another thread that's monitoring for new data (in the form of a new file that arrives a couple times a day)
while not mpqueue.empty():
dfDay = mpqueue.get()
astore[analyticsTable(symbol)] = dfDay # astore is a pandas store (HDF5). analyticsTable() returns the name of the appropriate table, which gets overwritten
dfLatest.loc[symbol] = dfDay.iloc[-1] # update with the latest results (dfLatest is the latest results for each symbol, which is loaded as a global at startup and periodically saved back to the store in another thread)
log.info('Processed {}/{} securities in queue.'.format(counter+1, len(queuelist)))
# do some stuff to update GUI that shows progress
sleep(0.1)
# do some bookkeeping to see if queue has finished
pool.join()
Спасибо. Я намеревался сначала использовать разделяемую память, пока не понял, что поддерживает только типы python; данные возврата являются рамкой данных pandas. Затем я посмотрел на использование пространства имен менеджера, которое, вероятно, будет работать, но тогда мне понадобится отдельный механизм для уведомления о готовности обновлений, а также запускает дополнительный процесс, который кажется излишним. В этой программе результаты все равно относительно малы, и я не думаю, что это будет проблемой в очереди. Мой код gui фактически работает в отдельном потоке (а не в процессе), так что проблема блокировки отсутствует. Я сейчас работаю над этим. – fantabolous