2016-07-07 3 views
1

В основном проблема заключается в следующем: у меня есть группа работников, у которых есть функция, предписанная каждому (функция is worker (alist)), и я пытаюсь обрабатывать 35 рабочих одновременно. Каждый рабочий считывает свою строку из файла (по модулю) и должен обрабатывать строку с помощью функции «работник». Я проверил проверку пера и обнаружил, что сырые манипуляции и удаление бесполезных индексов работают на 100% по назначению.Pool Multiprocessing Python

Элемент args функции pool.apply_async не передает список «raw» в него и запускает процесс. Сырье полностью корректно и нормально функционирует, рабочий сам по себе работает нормально, функция pool.apply_async - единственное место, где, похоже, проблема, и я понятия не имею, как ее исправить. Любая помощь, пожалуйста?

Соответствующий код здесь:

NUM_WORKERS=35 
f=open("test.csv") 
pool=multiprocessing.Pool() 
open("final.csv",'w') 
for workernumber in range(1, NUM_WORKERS): 
    for i,line in enumerate(f): 
     if i==0: 
      print "Skipping first line" #dont do anything 
     elif i%workernumber==0: 
      raw = line.split(',')[0][1:-1].split() 
      uselessindices=[-2,-3,-4,-5,-6] 
      counter=0 
      for ui in uselessindices: 
       del raw[ui+counter] 
       counter+=1 
      print raw 
      pool.apply_async(worker, args=(raw,)) 
pool.close() 
pool.join() 
+0

Почему вы вообще зацикливаетесь на 'workworkumber'? Вы действительно хотите, чтобы первый работник обрабатывал каждую строку в файле, а второй - для обработки каждой другой строки? Я не вижу причин ожидать, что обработка строки более одного раза будет иметь какую-то выгоду, но, возможно, я что-то упустил. Или вы действительно хотите, чтобы каждая строка обрабатывалась один раз? Если это так, внешний цикл совершенно бессмыслен (как и условие на 'elif'). – Blckknght

+0

35 много процессоров. – 101

+0

@ 101 Я использовал произвольное число для num_workers. Есть 16 процессоров, но операция не требует интенсивного процессора, ее просто нужно сделать с помощью shit ton рабочих и скомпилировать. Я новичок в многопроцессорной работе и пытаюсь проложить свой путь через документацию. Есть ли способ сделать это лучше? – furby559

ответ

0
import multiprocessing 

def worker(arg): 
    print 'doing work "%s"' % arg 
    return 

NUM_WORKERS=35 
with open('test.csv', 'w') as test: 
    for i in xrange(100): 
     if i % 10 == 0: 
      test.write('\n') 
     test.write('"%s 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23",' % i) 

f=open("test.csv") 
pool=multiprocessing.Pool(processes=NUM_WORKERS) 
open("final.csv",'w') 

for i, line in enumerate(f): 
    if i == 0: 
     continue 
    raw = line.split(',')[0][1:-1].split() 
    uselessindices=[-2,-3,-4,-5,-6] 
    counter=0 
    for ui in uselessindices: 
     del raw[ui+counter] 
     counter+=1 
    pool.apply_async(worker, args=(raw,)) 
pool.close() 
pool.join() 
print 'last raw len: %s' % len(raw) 
print 'last raw value: %s' % raw 

Выход:

doing work "['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '23']" 
doing work "['10', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '23']" 
doing work "['20', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '23']" 
doing work "['30', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '23']" 
doing work "['40', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '23']" 
doing work "['50', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '23']" 
doing work "['60', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '23']" 
doing work "['70', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '23']" 
doing work "['80', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '23']" 
doing work "['90', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '23']" 
last raw len: 19 
last raw value: ['90', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '23'] 
+0

Список может не передаваться правильно, потому что строка raw = lines [i] .split (',') [0] [1: -1] .split() Если вы можете предоставить образец строки, возможно, я могу проверить это для вас. – Bamcclur

+0

Я проверил необработанные и данные списка raw, и он передается правильно. Я попытался запустить ваш код безрезультатно; рабочий все еще не работает. Простая печать в начале работника не выполняется, но скрипт работает без ошибок, что и произошло раньше. – furby559

+0

Я добавил некоторые тестовые данные, чтобы продемонстрировать, как ваш код будет обрабатывать csv с 10 значениями на строку, причем каждое значение содержит строку из 24 отдельных частей, что дает необработанную длину 19 после удаления ненужных частей. – Bamcclur

0

Я предлагаю вам поставить расчет raw в функцию генератора, а затем использовать Pool.imap_unordered() или Pool.map() для запуска worker() по всем элементам в генераторе.

Что-то вроде этого непроверенного кода:

def get_raw(): 
    with open("test.csv", 'rU') as f: 
     for i, line in enumerate(f): 
      if i == 0: 
       # skip header 
       continue 
      raw = line.split(',')[0][1:-1].split() 
      uselessindices=[-2,-3,-4,-5,-6] 
      counter=0 
      for ui in uselessindices: 
       del raw[ui+counter] 
       counter+=1 
      yield raw 

pool=multiprocessing.Pool(processes=NUM_WORKERS) 

pool.map(worker, get_raw()) 
pool.close() 
pool.join() 
+0

raw имеет 19 значений, а список значений 19 - это то, что необходимо для продолжения работника по его пути. Ошибка, которую я получаю здесь, когда я изменил код, выглядит следующим образом: pool.map (worker, get_raw()) Файл «/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py ", строка 251, на карте return self.map_async (func, iterable, chunksize) .get() Файл« /System/Library/Frameworks/Python.framework/ Версии/2.7/lib/python2.7/multiprocessing/pool.py ", строка 567, в get raise self._value ValueError: требуется больше 19 значений для распаковки – furby559

+0

Ошибка, возникшая сейчас после того, как я исправил все другие проблемы, которые были что происходит раньше: TypeError: не может конкатенировать объекты 'str' и 'int'. Если я изменю его на imap_unordered вместо карты, он отлично работает, но карта продолжает работать неправильно. imap_unordered и async не работают, потому что они записывая в файл, и он выводит только 117 строк вместо ожидаемого 1000 при использовании обоих из них. Я читал где-то еще, что вы не должны писать в файле, используя async, и вместо этого hould использовать карту, следовательно, почему я пытаюсь это решение. Любой совет? – furby559

+0

Это код для рабочего; Предположим, что все работает отлично здесь: определение функции работника (креном): '' \t variable1, variable2 ... variable20 = кортеж (креном) '' \t с открытым ("final.csv", 'а'), как MyFile: ' \t \t' finalVector = makeSingleVector (variable1, variable2, variable3 ... variable20) ' \t \t' для элемента finalVector: ' \t \t \t' myfile.write ("% s" % пункта) ' \t \t 'myfile.write (" \ n ")' \t 'return' – furby559

0

Так я узнал, что это не бросает ошибку, что происходило внутри работника в результате несогласованного числа входов в функцию ребенка (AKA работник вызывал другую функцию dosomething (a1, a2, ... a20) и предоставлял только 19 входов). Кажется, async не будет вызывать ошибки в проблемах, происходящих внутри рабочего, что очень раздражает, но теперь я понимаю. Спасибо за помощь!

Смежные вопросы