2016-07-02 3 views
2

Я хочу одновременно выполнять функции f1 и f2. но следующий код не работает!Параллелирование различных функций одновременно в python

from multiprocessing import Pool 

def f1(x): 
return x*x 

def f2(x): 
return x^2 

if __name__ == '__main__': 

    x1=10 
    x2=20 
    p= Pool(2) 
    out=(p.map([f1, f2], [x1, x2])) 

y1=out[0] 
y2=out[1] 
+0

вы не можете запустить несколько потоков в то же самое точное время, если это то, что вы хотите, начиная нить требуется некоторое время. Всегда будет разница во времени, так почему бы не начать эти потоки отдельно друг за другом? См. [This docs] (https://docs.python.org/2/library/multiprocessing.html) для справки. – Jezor

+0

Могу ли я спросить, почему вы хотите это сделать? – itmuckel

+0

@ Micha90 Мои функции очень трудоемки (5 дней для работы на кластерах!), Поэтому для экономии времени мне нужно запускать их параллельно. – Mohammad

ответ

2

Я считаю, что вы хотели бы использовать threading.Thread и shared queue в вашем коде.

from queue import Queue 
from threading import Thread 
import time 

def f1(q, x): 
    # Sleep function added to compare execution times. 
    time.sleep(5) 
    # Instead of returning the result we put it in shared queue. 
    q.put(x * 2) 

def f2(q, x): 
    time.sleep(5) 
    q.put(x^2) 

if __name__ == '__main__': 
    x1 = 10 
    x2 = 20 
    result_queue = Queue() 

    # We create two threads and pass shared queue to both of them. 
    t1 = Thread(target=f1, args=(result_queue, x1)) 
    t2 = Thread(target=f2, args=(result_queue, x2)) 

    # Starting threads... 
    print("Start: %s" % time.ctime()) 
    t1.start() 
    t2.start() 

    # Waiting for threads to finish execution... 
    t1.join() 
    t2.join() 
    print("End: %s" % time.ctime()) 

    # After threads are done, we can read results from the queue. 
    while not result_queue.empty(): 
     result = result_queue.get() 
     print(result) 

Код выше должен распечатать результат, аналогичный:

Start: Sat Jul 2 20:50:50 2016 
End: Sat Jul 2 20:50:55 2016 
20 
22 

Как вы можете видеть, даже если обе функции подождите 5 секунд, чтобы дать свои результаты, они делают это параллельно, так общее время выполнения 5 секунд.

Если вам небезразлична какая функция помещает то, что приводит к вашей очереди, я могу увидеть два решения, которые позволят это определить. Вы можете либо создать несколько очередей, либо обернуть результаты в кортеж.

def f1(q, x): 
    time.sleep(5) 
    # Tuple containing function information. 
    q.put((f1, x * 2)) 

И для дальнейшего упрощения (особенно если у вас есть много функций, чтобы иметь дело с) вы можете украсить свои функции (чтобы избежать повторного кода и разрешить вызовы функций без очереди):

def wrap_result(func): 
    def wrapper(*args): 
     # Assuming that shared queue is always the last argument. 
     q = args[len(args) - 1] 
     # We use it to store the results only if it was provided. 
     if isinstance(q, Queue): 
      function_result = func(*args[:-1]) 
      q.put((func, function_result)) 
     else: 
      function_result = func(*args) 
     return function_result 

    return wrapper 

@wrap_result 
def f1(x): 
    time.sleep(5) 
    return x * 2 

Обратите внимание, что мой декоратор был написан в спешке, и его реализация может потребовать усовершенствований (например, если ваши функции принимают kwargs). Если вы решите использовать его, вам придется передать свои аргументы в обратном порядке: t1 = threading.Thread(target=f1, args=(x1, result_queue)).

Немного дружелюбного совета.

«Следующий код не работает» ничего не говорит о проблеме. Вызывает ли это исключение? Это дает неожиданные результаты?

Важно прочитать сообщения об ошибках. Еще важнее - изучить их смысл. Код, который вы предоставили поднимает TypeError с довольно очевидным сообщением:

File ".../stack.py", line 16, in <module> out = (p.map([f1, f2], [x1, x2]))

TypeError: 'list' object is not callable

Это означает, что первый аргумент Pool().map() должен быть callable объект, функция, например. Давайте посмотрим на документы этого метода.

Apply func to each element in iterable, collecting the results in a list that is returned.

Он явно не позволяет передавать список функций в качестве аргумента.

Here вы можете узнать больше о Pool().map() способ.

+0

Большое вам спасибо за ответ. Это было очень полезно. Мне интересно, если f1 занимает больше времени для вычисления, чем f2, то первая строка в «результате» будет для f2? Я проверил это для простой функции, и результаты соответствовали функциям (f1 и f2). Должен ли я реализовать свой второй комментарий в коде? – Mohammad

+0

Да, значения будут вставлены в очередь, упорядоченную по времени выполнения функции, ничего не интуитивно понятное здесь (функции работают параллельно, первое, чтобы завершить выполнение, первое, чтобы дать результат). Все зависит от того, каковы ваши намерения, если вам нужно знать, какой результат соответствует какой функции, то да, вы должны сделать результаты отличительными. – Jezor

+0

Спасибо за ваш «дружеский совет», я только что увидел это :) Я знал, что код, который я предоставил, не работает, и я просто написал его, чтобы сказать, что я хочу делать с моим кодом ... Ваш первый комментарий для вызов различных функций в то же время был действительно полезен, но для решения вышеупомянутой проблемы я действительно не понял, что делать точно! Когда я добавил второй комментарий к коду, он дает мне следующую ошибку: NameError: «глобальное имя« обертка »не определено» ..... не могли бы вы объяснить это немного подробнее о том, как я должен реализовать остальные код ... Спасибо заранее. – Mohammad

1

I want to execute f1 and f2 at the same time. but the following code doesn't work! ...

out=(p.map([f1, f2], [x1, x2])) 

Минимальное изменение кода, чтобы заменить p.map() вызов с:

r1 = p.apply_async(f1, [x1]) 
out2 = f2(x2) 
out1 = r1.get() 

Хотя, если все, что вы хотите, чтобы запустить две вызовы функций одновременно, то вам не нужен Pool() здесь, вы могли бы просто начать тема/процесс вручную и use Pipe/Queue to get the result:

#!/usr/bin/env python 
from multiprocessing import Process, Pipe 

def another_process(f, args, conn): 
    conn.send(f(*args)) 
    conn.close() 

if __name__ == '__main__': 
    parent_conn, child_conn = Pipe(duplex=False) 
    p = Process(target=another_process, args=(f1, [x1], child_conn)) 
    p.start() 
    out2 = f2(x2) 
    out1 = parent_conn.recv() 
    p.join() 
Смежные вопросы