2015-08-17 4 views
0

Я использую pandas для расчета статистики и т. Д. На большом количестве данных, но в итоге работает в течение нескольких часов, и я часто получаю новые данные. Я пытался оптимизировать уже, но я хотел бы сделать это быстрее, поэтому я пытаюсь использовать его в нескольких процессах. Проблема, с которой я сталкиваюсь, заключается в том, что мне нужно выполнить некоторую промежуточную работу с результатами по мере их завершения, а примеры, которые я видел для multiprocessing.Process и Pool, ждут завершения всего, прежде чем работать с результатами.python multiprocessing - как действовать на промежуточные результаты

Это сильно обрезанный код, который я использую сейчас. Часть, которую я хочу включить в отдельные процессы, - это generateAnalytics().

for counter, symbol in enumerate(queuelist): # queuelist 
    if needQueueLoad: # set by another thread that's monitoring for new data (in the form of a new file that arrives a couple times a day) 
     log.info('Shutting down analyticsRunner thread') 
     break 
    dfDay = generateAnalytics(symbol) # slow running function (15s+) 
    astore[analyticsTable(symbol)] = dfDay # astore is a pandas store (HDF5). analyticsTable() returns the name of the appropriate table, which gets overwritten 
    dfLatest.loc[symbol] = dfDay.iloc[-1] # update with the latest results (dfLatest is the latest results for each symbol, which is loaded as a global at startup and periodically saved back to the store in another thread) 

    log.info('Processed {}/{} securities in queue.'.format(counter+1, len(queuelist))) 
    # do some stuff to update progress GUI 

Я не могу понять, как заставить последние линии работать с результатами, пока они продолжаются, и будут благодарны за предложения.

Я рассматриваю все это работает в Pool и с процессами добавить результаты в Queue (вместо того, чтобы вернуть их), а затем сидячую петлю, а в основном процессе стаскивать очередь, как результаты приходят в - было бы разумным способом сделать это? Что-то вроде:

mpqueue = multiprocessing.Queue() 
pool = multiprocessing.Pool() 
pool.map(generateAnalytics, [queuelist, mpqueue]) 

while not needQueueLoad: # set by another thread that's monitoring for new data (in the form of a new file that arrives a couple times a day) 
    while not mpqueue.empty(): 
     dfDay = mpqueue.get() 
     astore[analyticsTable(symbol)] = dfDay # astore is a pandas store (HDF5). analyticsTable() returns the name of the appropriate table, which gets overwritten 
     dfLatest.loc[symbol] = dfDay.iloc[-1] # update with the latest results (dfLatest is the latest results for each symbol, which is loaded as a global at startup and periodically saved back to the store in another thread)  
     log.info('Processed {}/{} securities in queue.'.format(counter+1, len(queuelist))) 
     # do some stuff to update GUI that shows progress    
    sleep(0.1) 
    # do some bookkeeping to see if queue has finished 
pool.join() 

ответ

2

Использование Queue выглядит как разумный способ сделать это, с двумя замечаниями.

  1. Так выглядит из кода, который вы используете графический интерфейс, проверка результатов, вероятно, лучше делать в функции времени ожидания или простой функции, а не в то время как петле. Использование цикла while для проверки результатов блокирует цикл событий GUI.

  2. Если рабочий процесс должен вернуть большое количество данных в основной процесс через очередь, это добавит значительные накладные расходы. Возможно, вам захочется использовать общую память или даже промежуточный файл.

+0

Спасибо. Я намеревался сначала использовать разделяемую память, пока не понял, что поддерживает только типы python; данные возврата являются рамкой данных pandas. Затем я посмотрел на использование пространства имен менеджера, которое, вероятно, будет работать, но тогда мне понадобится отдельный механизм для уведомления о готовности обновлений, а также запускает дополнительный процесс, который кажется излишним. В этой программе результаты все равно относительно малы, и я не думаю, что это будет проблемой в очереди. Мой код gui фактически работает в отдельном потоке (а не в процессе), так что проблема блокировки отсутствует. Я сейчас работаю над этим. – fantabolous

Смежные вопросы