2013-02-15 3 views
2

Я пытаюсь решить Problem 8 в проекте euler с многопоточной техникой в ​​python.Многопоточность в python с петлей

Найти наибольший продукт из пяти последовательных цифр в 1000-значном номере. Номер можно найти here.

Мой подход состоит в том, чтобы сгенерировать продукт из кусков 5 из исходного списка и повторить этот процесс 5 раз, каждый с начальным индексом, сдвинутым вправо.

Вот мой класс резьбы

class pThread(threading.Thread): 
    def __init__(self, l): 
     threading.Thread.__init__(self) 
     self.l = l 
     self.p = 0 

    def run(self): 

     def greatest_product(l): 
     """ 
     Divide the list into chunks of 5 and find the greatest product 
     """ 
      def product(seq): 
       return reduce(lambda x,y : x*y, seq) 

      def chunk_product(l, n=5): 
       for i in range(0, len(l), n): 
        yield product(l[i:i+n]) 

      result = 0 
      for p in chunk_product(num): 
       result = result > p and result or p 

      return result 

     self.p = greatest_product(self.l) 

Когда я пытаюсь создать 5 потоков для покрытия всех 5-значных куски в моем первоначальном списке, ручной подход ниже дает правильный ответ, с num быть список однозначных чисел, которые я анализирую из текста:

thread1 = pThread(num) 
del num[0] 
thread2 = pThread(num) 
del num[0] 
thread3 = pThread(num) 
del num[0] 
thread4 = pThread(num) 
del num[0] 
thread5 = pThread(num) 

thread1.start() 
thread2.start() 
thread3.start() 
thread4.start() 
thread5.start() 

thread1.join() 
thread2.join() 
thread3.join() 
thread4.join() 
thread5.join() 

def max(*args): 
    result = 0 
    for i in args: 
     result = i > result and i or result 
    return result 

print max(thread1.p, thread2.p, thread3.p, thread4.p, thread5.p) 

Но это не дает правильный результат:

threads = [] 
for i in range(0, 4): 
    tmp = num[:] 
    del tmp[0:i+1] 
    thread = pThread(tmp) 
    thread.start() 
    threads.append(thread) 

for i in range(0, 4): 
    threads[i].join() 

Что я сделал не так? Я очень новичок в многопоточности, поэтому, пожалуйста, будьте осторожны.

+1

Советы: переписать код без использования дель, это будет гораздо легче понять, почему он не работает, и почему исходный код также не является правильным. Кроме того, учитывая GIL, этот код вряд ли будет запускаться быстрее, чем однопоточная версия. –

+0

Я переписал код без del, и да, он разрывает обе версии. Я изучу его позже позже. Благодарим за ваше предложение. –

+1

резьба это серьезно overkill. вы можете сделать это в 1 строке с помощью max (уменьшить (op.mul, n_list [i: i + 5]) для i в xrange (1000)) ' – wim

ответ

4

Есть 3 проблемы:

  1. Во-первых, что «ручной» подход не делает t дать правильный ответ. Просто бывает, что правильный ответ на проблему находится на смещении 4 с самого начала вашего списка. Вы можете увидеть это с помощью:

    import operator as op 
    print max(reduce(op.mul, num[i:i+5]) for i in range(1000)) 
    for k in range(5): 
        print max(reduce(op.mul, num[i:i+5]) for i in range(k, 1000, 5)) 
    

    Одна из проблем с «ручным» подходом является то, что потоки разделяют переменную num, каждый из них имеет один и тот же список. Поэтому, когда вы делаете del num[0], все threadX.l затронуты. Тот факт, что вы последовательно получаете тот же ответ, связан с второй проблемой.

  2. Линия

    for p in chunk_product(num): 
    

    должно быть:

    for p in chunk_product(l): 
    

    , так как вы хотите использовать параметр функции greatest_product(l), а не глобальную переменную num.

  3. Во втором методе вы создаете только 4 потока, так как диапазон петель превышает [0, 1, 2, 3]. Кроме того, вы хотите удалить значения tmp[0:i], а не tmp[0:i+1]. Вот код:

    threads = [] 
    for i in range(5): 
        tmp = num[:] 
        del tmp[0:i] 
        thread = pThread(tmp) 
        thread.start() 
        threads.append(thread) 
    
    for i in range(5): 
        threads[i].join() 
    
    print len(threads), map(lambda th: th.p, threads) 
    print max(map(lambda th: th.p, threads)) 
    
+0

@JasonWhite Это назначенная функциональность, поскольку каждый поток перебирает список с другим смещением. В вашем примере (с двумя потоками) поток 1 будет вычислять продукты для [1,2] [3,4] [5,6] [7,8] [9,10], а поток 2 будет вычислять продукты для [2, 3] [4,5] [6,7] [8,9]. Это связано с тем, что поток 2 имеет self.l = [2,3,4,5,6,7,8,9,10]. Я думаю, что это то, что OP хотел реализовать. – wasserfeder

+0

А я понимаю, о чем вы говорите. Удалил мой предыдущий комментарий, когда я прочитал его неправильно; P –

+0

СПАСИБО СООНОК. Вы случайно не знаете, как наградить награду? Я действительно хочу дать вам 100 респондентов за ваш ответ. –

1

Я принял удар в этом основном, чтобы получить некоторую практику многопроцессорности и узнать, как использовать argparse.

Это заняло около 4-5 концертов бара на всякий случай, если у вашей машины не много.

python euler.py -l 50000000 -n 100 -p 8 

Took 5.836833333969116 minutes 
The largest product of 100 consecutive numbers is: a very large number 

Если вы наберете питон euler.py -h в командной строке вы получите:

usage: euler.py [-h] -l L [L ...] -n N [-p P] 

Calculates the product of consecutive numbers and return the largest product. 

optional arguments: 
    -h, --help show this help message and exit 
    -l L [L ...] A single number or list of numbers, where each # is seperated 
       by a space 
    -n N   A number that specifies how many consecutive numbers should be 
       multiplied together. 
    -p P   Number of processes to create. Optional, defaults to the # of 
       cores on the pc.   

И код:

"""A multiprocess iplementation for calculation the maximum product of N consecutive 
numbers in a given range (list of numbers).""" 

import multiprocessing 
import math 
import time 
import operator 
from functools import reduce 
import argparse 

def euler8(alist,lenNums): 
    """Returns the largest product of N consecutive numbers in a given range""" 
    return max(reduce(operator.mul, alist[i:i+lenNums]) for i in range(len(alist))) 

def split_list_multi(listOfNumbers,numLength,threads): 
    """Split a list into N parts where N is the # of processes.""" 
    fullLength = len(listOfNumbers) 
    single = math.floor(fullLength/threads) 
    results = {} 
    counter = 0 
    while counter < threads: 
     if counter == (threads-1): 
      temp = listOfNumbers[single*counter::] 
      if counter == 0: 
       results[str(counter)] = listOfNumbers[single*counter::] 
      else: 
       prevListIndex = results[str(counter-1)][-int('{}'.format(numLength-1))::] 
       newlist = prevListIndex + temp 
       results[str(counter)] = newlist 
     else: 
      temp = listOfNumbers[single*counter:single*(counter+1)] 
      if counter == 0: 
       newlist = temp 
      else: 
       prevListIndex = results[str(counter-1)][-int('{}'.format(numLength-1))::] 
       newlist = prevListIndex + temp 
      results[str(counter)] = newlist 
     counter += 1 
    return results,threads 

def worker(listNumbers,number,output): 
    """A worker. Used to run seperate processes and put the results in the queue""" 
    result = euler8(listNumbers,number) 
    output.put(result) 

def main(listOfNums,lengthNumbers,numCores=multiprocessing.cpu_count()): 
    """Runs the module. 
    listOfNums must be a list of ints, or single int 
    lengthNumbers is N (an int) where N is the # of consecutive numbers to multiply together 
    numCores (an int) defaults to however many the cpu has, can specify a number if you choose.""" 

    if isinstance(listOfNums,list): 
     if len(listOfNums) == 1: 
      valuesToSplit = [i for i in range(int(listOfNums[0]))] 
     else: 
      valuesToSplit = [int(i) for i in listOfNums] 
    elif isinstance(listOfNums,int): 
     valuesToSplit = [i for i in range(listOfNums)] 
    else: 
     print('First arg must be a number or a list of numbers') 

    split = split_list_multi(valuesToSplit,lengthNumbers,numCores) 
    done_queue = multiprocessing.Queue() 
    jobs = [] 
    startTime = time.time() 

    for num in range(split[1]): 
     numChunks = split[0][str(num)] 
     thread = multiprocessing.Process(target=worker, args=(numChunks,lengthNumbers,done_queue)) 
     jobs.append(thread) 
     thread.start() 

    resultlist = [] 
    for i in range(split[1]): 
     resultlist.append(done_queue.get()) 

    for j in jobs: 
     j.join() 

    resultlist = max(resultlist) 
    endTime = time.time() 
    totalTime = (endTime-startTime)/60 
    print("Took {} minutes".format(totalTime)) 

    return print("The largest product of {} consecutive numbers is: {}".format(lengthNumbers, resultlist))    

if __name__ == '__main__': 
    #To call the module from the commandline with arguments 
    parser = argparse.ArgumentParser(description="""Calculates the product of consecutive numbers \ 
    and return the largest product.""") 
    parser.add_argument('-l', nargs='+', required=True, 
         help='A single number or list of numbers, where each # is seperated by a space') 
    parser.add_argument('-n', required=True, type=int, 
         help = 'A number that specifies how many consecutive numbers should be \ 
         multiplied together.') 
    parser.add_argument('-p', default=multiprocessing.cpu_count(), type=int, 
         help='Number of processes to create. Optional, defaults to the # of cores on the pc.') 
    args = parser.parse_args() 
    main(args.l, args.n, args.p) 
+0

Спасибо, пожалуйста, не удаляйте этот ответ. Я вернусь к вам, как только я пойму все, что вы написали. –

Смежные вопросы