2009-08-06 7 views
136

Мне было интересно, есть ли какая-либо библиотека для асинхронных вызовов методов в Python. Было бы замечательно, если бы вы могли бы сделать что-то вродеАсинхронный вызов метода в Python?

@async 
def longComputation(): 
    <code> 


token = longComputation() 
token.registerCallback(callback_function) 
# alternative, polling 
while not token.finished(): 
    doSomethingElse() 
    if token.finished(): 
     result = token.result() 

Или вызвать подпрограмму без асинхронной асинхронно

def longComputation() 
    <code> 

token = asynccall(longComputation()) 

Было бы здорово, чтобы иметь более изысканную стратегию как родной в ядре языка. Было ли это рассмотрено?

+0

На Python 3.4: https://docs.python.org/3/library/asyncio.html (есть портировать для 3.3 и новенький '' async' и await' синтаксис от 3.5). – jonrsharpe

+0

Механизм обратного вызова отсутствует, но вы можете агрегировать результаты в словаре, и он основан на многопроцессорном модуле Python. Я уверен, что вы можете добавить еще один параметр в украшенную функцию как обратный вызов. https://github.com/alex-sherman/deco. – RajaRaviVarma

ответ

117

Вы можете использовать multiprocessing module, добавленный в Python 2.6. Вы можете использовать пулы процессов, а затем получить результаты асинхронно:

apply_async(func[, args[, kwds[, callback]]]) 

т.д .:

from multiprocessing import Pool 

def f(x): 
    return x*x 

if __name__ == '__main__': 
    pool = Pool(processes=1)    # Start a worker processes. 
    result = pool.apply_async(f, [10], callback) # Evaluate "f(10)" asynchronously calling callback when finished. 

Это только одна альтернатива. Этот модуль предоставляет множество возможностей для достижения желаемого. Также из этого будет очень легко сделать декоратор.

+5

Возможно, стоит помнить, что это порождает отдельные процессы, а не отдельный поток в процессе. Это может иметь некоторые последствия. – user47741

+2

Lucas S., к сожалению, ваш пример не работает. Функция обратного вызова никогда не вызывается. – DataGreed

+11

Это работает: result = pool.apply_async (f, [10], callback = finish) –

28

Это не ядро ​​языка, но очень зрелая библиотека, которая делает то, что вам нужно, это Twisted. Он вводит объект «Отложен», к которому можно присоединить обратные вызовы или обработчики ошибок («errbacks»). Отсрочка - это в основном «обещание», что функция в конечном итоге будет иметь результат.

+1

В частности, посмотрите на twisted.internet.defer (http://twistedmatrix.com/documents/8.2.0/api/twisted.internet.defer.html). –

155

Что-то вроде:

import threading 

thr = threading.Thread(target=foo, args=(), kwargs={}) 
thr.start() # Will run "foo" 
.... 
thr.is_alive() # Will return whether foo is running currently 
.... 
thr.join() # Will wait till "foo" is done 

Смотрите документацию на https://docs.python.org/2/library/threading.html#module-threading для получения более подробной информации; этот код должен работать и для Python 3.

+10

Это действительно должен быть главный ответ. Я пробовал всех остальных и имел проблемы. Это так просто. – chriscauley

+0

Да, если вам просто нужно делать асинхронно, почему бы просто не использовать поток? после всего потока - это легкий вес, чем процесс – kk1957

+13

Важное примечание: стандартная реализация (CPython) потоков не поможет с задачами, связанными с вычислением, из-за «Global Interpreter Lock». См. Библиотеку doc: [link] (http://docs.python.org/2/library/threading.html#module-threading) – solublefish

2

Есть ли причина не использовать темы? Вы можете использовать класс threading. Вместо finished() функция используется isAlive(). Функция result() может join() поток и получить результат. И, если можно, переопределите функции run() и __init__ для вызова функции, указанной в конструкторе, и сохраните значение где-нибудь в экземпляре класса.

+2

Если это вычислительная дорогостоящая функция потоковой передачи, вы ничего не получите (это, вероятно, на самом деле замедлит работу), поскольку процесс Python ограничен одним ядром ЦП из-за GIL. – Kurt

+2

@ Курт, хотя это правда, ОП не упомянул, что выступление было его проблемой. Есть и другие причины для асинхронного поведения ... –

+0

Темы в python не очень хороши, если вы хотите иметь возможность убивать асинхронный вызов метода, поскольку только основной поток в python получает сигналы. – CivFan

7

Мое решение:

import threading 

class TimeoutError(RuntimeError): 
    pass 

class AsyncCall(object): 
    def __init__(self, fnc, callback = None): 
     self.Callable = fnc 
     self.Callback = callback 

    def __call__(self, *args, **kwargs): 
     self.Thread = threading.Thread(target = self.run, name = self.Callable.__name__, args = args, kwargs = kwargs) 
     self.Thread.start() 
     return self 

    def wait(self, timeout = None): 
     self.Thread.join(timeout) 
     if self.Thread.isAlive(): 
      raise TimeoutError() 
     else: 
      return self.Result 

    def run(self, *args, **kwargs): 
     self.Result = self.Callable(*args, **kwargs) 
     if self.Callback: 
      self.Callback(self.Result) 

class AsyncMethod(object): 
    def __init__(self, fnc, callback=None): 
     self.Callable = fnc 
     self.Callback = callback 

    def __call__(self, *args, **kwargs): 
     return AsyncCall(self.Callable, self.Callback)(*args, **kwargs) 

def Async(fnc = None, callback = None): 
    if fnc == None: 
     def AddAsyncCallback(fnc): 
      return AsyncMethod(fnc, callback) 
     return AddAsyncCallback 
    else: 
     return AsyncMethod(fnc, callback) 

И работает точно так же, как просил:

@Async 
def fnc(): 
    pass 
19

Вы можете осуществить декоратора, чтобы сделать ваши функции асинхронно, хотя это немного сложнее. Модуль multiprocessing полон небольших причуд и, по-видимому, произвольных ограничений - тем более, причина заключается в том, чтобы инкапсулировать его за дружественный интерфейс.

from inspect import getmodule 
from multiprocessing import Pool 


def async(decorated): 
    r'''Wraps a top-level function around an asynchronous dispatcher. 

     when the decorated function is called, a task is submitted to a 
     process pool, and a future object is returned, providing access to an 
     eventual return value. 

     The future object has a blocking get() method to access the task 
     result: it will return immediately if the job is already done, or block 
     until it completes. 

     This decorator won't work on methods, due to limitations in Python's 
     pickling machinery (in principle methods could be made pickleable, but 
     good luck on that). 
    ''' 
    # Keeps the original function visible from the module global namespace, 
    # under a name consistent to its __name__ attribute. This is necessary for 
    # the multiprocessing pickling machinery to work properly. 
    module = getmodule(decorated) 
    decorated.__name__ += '_original' 
    setattr(module, decorated.__name__, decorated) 

    def send(*args, **opts): 
     return async.pool.apply_async(decorated, args, opts) 

    return send 

Приведенные ниже код иллюстрирует использование декоратора:

@async 
def printsum(uid, values): 
    summed = 0 
    for value in values: 
     summed += value 

    print("Worker %i: sum value is %i" % (uid, summed)) 

    return (uid, summed) 


if __name__ == '__main__': 
    from random import sample 

    # The process pool must be created inside __main__. 
    async.pool = Pool(4) 

    p = range(0, 1000) 
    results = [] 
    for i in range(4): 
     result = printsum(i, sample(p, 100)) 
     results.append(result) 

    for result in results: 
     print("Worker %i: sum value is %i" % result.get()) 

В реальном случае я бы ellaborate немного больше на декораторе, обеспечивая некоторый способ, чтобы отключить его для отладки (в то время как сохранение будущего интерфейса) или, возможно, средство для устранения исключений; но я думаю, что это демонстрирует принцип достаточно хорошо.

+0

Это должен быть лучший ответ. Мне нравится, как он может вернуть ценность. Не похоже на поток, который просто работает асинхронно. –

12

Просто

import threading, time 

def f(): 
    print "f started" 
    time.sleep(3) 
    print "f finished" 

threading.Thread(target=f).start() 
+1

То, что мне было нужно, спасибо – user2875404

7

Вы можете использовать eventlet. Он позволяет писать то, что представляется синхронным кодом, но работать он асинхронно по сети.

Вот пример супер минимального гусеничном:

urls = ["http://www.google.com/intl/en_ALL/images/logo.gif", 
    "https://wiki.secondlife.com/w/images/secondlife.jpg", 
    "http://us.i1.yimg.com/us.yimg.com/i/ww/beta/y3.gif"] 

import eventlet 
from eventlet.green import urllib2 

def fetch(url): 

    return urllib2.urlopen(url).read() 

pool = eventlet.GreenPool() 

for body in pool.imap(fetch, urls): 
    print "got body", len(body) 
+0

[Это не относится к paramiko] (http://stackoverflow.com/q/33453581/2284570) – user2284570

3

Что-то вроде это работает для меня, вы можете вызвать функцию, и он будет рассылать себя на новый поток.

from thread import start_new_thread 

def dowork(asynchronous=True): 
    if asynchronous: 
     args = (False) 
     start_new_thread(dowork,args) #Call itself on a new thread. 
    else: 
     while True: 
      #do something... 
      time.sleep(60) #sleep for a minute 
    return 
30

С Python 3.5 вы можете использовать расширенные генераторы для асинхронных функций.

import asyncio 
import datetime 

Усовершенствованный синтаксис генератора:

@asyncio.coroutine 
def display_date(loop): 
    end_time = loop.time() + 5.0 
    while True: 
     print(datetime.datetime.now()) 
     if (loop.time() + 1.0) >= end_time: 
      break 
     yield from asyncio.sleep(1) 


loop = asyncio.get_event_loop() 
# Blocking call which returns when the display_date() coroutine is done 
loop.run_until_complete(display_date(loop)) 
loop.close() 

Новый async/await синтаксис:

async def display_date(loop): 
    end_time = loop.time() + 5.0 
    while True: 
     print(datetime.datetime.now()) 
     if (loop.time() + 1.0) >= end_time: 
      break 
     await asyncio.sleep(1) 


loop = asyncio.get_event_loop() 
# Blocking call which returns when the display_date() coroutine is done 
loop.run_until_complete(display_date(loop)) 
loop.close() 
+1

@carnabeh, не могли бы вы расширить этот пример, чтобы включить опцию def defComput() "функция? В большинстве случаев используйте «ожидание asyncio.sleep (1)», но если longComput() возвращает, скажем, double, вы не можете просто использовать «ждать longComput()». – Fab

0

Вы можете использовать concurrent.futures (добавленный в Python 3.2).

import time 
from concurrent.futures import ThreadPoolExecutor 


def long_computation(duration): 
    for x in range(0, duration): 
     print(x) 
     time.sleep(1) 
    return duration * 2 


print('Use polling') 
with ThreadPoolExecutor(max_workers=1) as executor: 
    future = executor.submit(long_computation, 5) 
    while not future.done(): 
     print('waiting...') 
     time.sleep(0.5) 

    print(future.result()) 

print('Use callback') 
executor = ThreadPoolExecutor(max_workers=1) 
future = executor.submit(long_computation, 5) 
future.add_done_callback(lambda f: print(f.result())) 

print('waiting for callback') 

executor.shutdown(False) # non-blocking 

print('shutdown invoked') 
Смежные вопросы