2017-01-11 5 views
0

Я пытаюсь создать скрипт, который анализирует файл и преобразует его в большой список, который, как предполагается, будет обрабатываться параллельно. Я пробовал несколько реализаций многопроцессорности python, но все они кажутся запущенными последовательно.Общий доступ к многопроцессорности Python

def grouper(n, iterable, padvalue=None): 
    """grouper(3, 'abcdefg', 'x') --> 
    ('a','b','c'), ('d','e','f'), ('g','x','x')""" 
    return izip_longest(*[iter(iterable)]*n, fillvalue=padvalue) 

def createRecords(givenchunk): 
    for i1 in range(len(givenchunk)): 
    <create somedata> 
    records.append(somedata) 

if __name__=='__main__': 
    manager = Manager() 
    parsedcdrs = manager.list([]) 
    records = manager.list([]) 

    <some general processing here which creates a shared list "parsedcdrs". Uses map to create a process "p" in some def which is terminated afterwards.> 

    # Get available cpus 
    cores = multiprocessing.cpu_count() 

    # First implementation with map with map. 
    t = multiprocessing.Pool(cores) 
    print "Map processing with chunks containing 5000" 
    t.map(createRecords, zip(parsedcdr), 5000) 

    # Second implementation with async. 
    t = multiprocessing.Pool(cores) 
    for chunk in grouper(5000, parsedcdr): 
    print "Async processing with chunks containing 5000" 
    t.apply_async(createRecords, args=(chunk,), callback=log_result) 
    t.close() 
    t.join() 

    # Third implementation with Process. 
    jobs = [] 
    for chunk in grouper(5000, parsedcdr): 
    t = multiprocessing.Process(target=createRecords, args=(chunk,)) 
    t.start() 
    jobs.append(t) 
    print "Process processing with chunks containing 5000" 
    for j in jobs: 
    j.join() 
    for j in jobs: 
    j.join() 

Может ли кто-нибудь указать мне в правильном направлении?

+0

ваш aproach почти нормально, насколько я вижу в вашей первой реализации, но один вопрос, каждый из элементов списка для обработки - это другой список (или итерируемый)? – Netwave

+0

Элементы в списке "parsedcdr" действительно другие списки, например: '[[1482232410 [ 'astp3', u'elem1' , u'elem2' , u'elem3' ]], [1482232576 , ['astp3', u'elem4 ', u'elem5', u'elem6 ']]] ' – driesken

+1

Ваша первая реализация должна работать без проблем, почему вы думаете, что они работают по-разному? также попробуйте удалить куски '5000', чтобы посмотреть, выбирают ли они их один за другим. – Netwave

ответ

0

В примере выше многопроцессорный режим работает нормально. Проблема была в другом def, что привело к низкой производительности.

Смежные вопросы