Я считаю, что вы хотели бы использовать threading.Thread
и shared queue в вашем коде.
from queue import Queue
from threading import Thread
import time
def f1(q, x):
# Sleep function added to compare execution times.
time.sleep(5)
# Instead of returning the result we put it in shared queue.
q.put(x * 2)
def f2(q, x):
time.sleep(5)
q.put(x^2)
if __name__ == '__main__':
x1 = 10
x2 = 20
result_queue = Queue()
# We create two threads and pass shared queue to both of them.
t1 = Thread(target=f1, args=(result_queue, x1))
t2 = Thread(target=f2, args=(result_queue, x2))
# Starting threads...
print("Start: %s" % time.ctime())
t1.start()
t2.start()
# Waiting for threads to finish execution...
t1.join()
t2.join()
print("End: %s" % time.ctime())
# After threads are done, we can read results from the queue.
while not result_queue.empty():
result = result_queue.get()
print(result)
Код выше должен распечатать результат, аналогичный:
Start: Sat Jul 2 20:50:50 2016
End: Sat Jul 2 20:50:55 2016
20
22
Как вы можете видеть, даже если обе функции подождите 5 секунд, чтобы дать свои результаты, они делают это параллельно, так общее время выполнения 5 секунд.
Если вам небезразлична какая функция помещает то, что приводит к вашей очереди, я могу увидеть два решения, которые позволят это определить. Вы можете либо создать несколько очередей, либо обернуть результаты в кортеж.
def f1(q, x):
time.sleep(5)
# Tuple containing function information.
q.put((f1, x * 2))
И для дальнейшего упрощения (особенно если у вас есть много функций, чтобы иметь дело с) вы можете украсить свои функции (чтобы избежать повторного кода и разрешить вызовы функций без очереди):
def wrap_result(func):
def wrapper(*args):
# Assuming that shared queue is always the last argument.
q = args[len(args) - 1]
# We use it to store the results only if it was provided.
if isinstance(q, Queue):
function_result = func(*args[:-1])
q.put((func, function_result))
else:
function_result = func(*args)
return function_result
return wrapper
@wrap_result
def f1(x):
time.sleep(5)
return x * 2
Обратите внимание, что мой декоратор был написан в спешке, и его реализация может потребовать усовершенствований (например, если ваши функции принимают kwargs). Если вы решите использовать его, вам придется передать свои аргументы в обратном порядке: t1 = threading.Thread(target=f1, args=(x1, result_queue))
.
Немного дружелюбного совета.
«Следующий код не работает» ничего не говорит о проблеме. Вызывает ли это исключение? Это дает неожиданные результаты?
Важно прочитать сообщения об ошибках. Еще важнее - изучить их смысл. Код, который вы предоставили поднимает TypeError
с довольно очевидным сообщением:
File ".../stack.py", line 16, in <module> out = (p.map([f1, f2], [x1, x2]))
TypeError: 'list' object is not callable
Это означает, что первый аргумент Pool().map()
должен быть callable
объект, функция, например. Давайте посмотрим на документы этого метода.
Apply func to each element in iterable, collecting the results in a list that is returned.
Он явно не позволяет передавать список функций в качестве аргумента.
Here вы можете узнать больше о Pool().map()
способ.
вы не можете запустить несколько потоков в то же самое точное время, если это то, что вы хотите, начиная нить требуется некоторое время. Всегда будет разница во времени, так почему бы не начать эти потоки отдельно друг за другом? См. [This docs] (https://docs.python.org/2/library/multiprocessing.html) для справки. – Jezor
Могу ли я спросить, почему вы хотите это сделать? – itmuckel
@ Micha90 Мои функции очень трудоемки (5 дней для работы на кластерах!), Поэтому для экономии времени мне нужно запускать их параллельно. – Mohammad