2015-11-05 2 views
1

Я пытаюсь использовать новый объект Tornado queue вместе с concurrent.futures, чтобы позволить моему веб-серверу передавать задачи, связанные с процессором, в другие процессы. Я хочу иметь доступ к объекту Future, который возвращается из ProcessPoolExecutor из модуля concurrent.futures, чтобы я мог запросить его состояние для отображения на переднем конце (например, покажите, что процесс в данный момент запущен, покажите, что он завершен).Queue and ProcessPoolExecutor in Tornado

Я, кажется, есть два препятствия, с помощью этого метода:

  1. Как я могу представить несколько q.get() объектов в ProcessPoolExecutor, а также иметь доступ к возвращаемым Future объектов?
  2. Как я могу позволить HomeHandler получить доступ к объекту Future, который был возвращен ProcessPoolExecutor, чтобы я мог отображать информацию о состоянии на переднем конце?

Спасибо за любую помощь.

from tornado import gen 
from tornado.ioloop import IOLoop 
from tornado.queues import Queue 

from concurrent.futures import ProcessPoolExecutor 

define("port", default=8888, help="run on the given port", type=int) 
q = Queue(maxsize=2) 


def expensive_function(input_dict): 
    gen.sleep(1) 


@gen.coroutine 
def consumer(): 
    while True: 
     input_dict = yield q.get() 
     try: 
      with ProcessPoolExecutor(max_workers=4) as executor: 
       future = executor.submit(expensive_function, input_dict) 
     finally: 
      q.task_done() 


@gen.coroutine 
def producer(input_dict): 
    yield q.put(input_dict) 


class Application(tornado.web.Application): 
def __init__(self): 
    handlers = [ 
     (r"/", HomeHandler), 
    ] 
    settings = dict(
     blog_title=u"test", 
     template_path=os.path.join(os.path.dirname(__file__), "templates"), 
     static_path=os.path.join(os.path.dirname(__file__), "static"), 
     debug=True, 
    ) 
    super(Application, self).__init__(handlers, **settings) 


class HomeHandler(tornado.web.RequestHandler): 
    def get(self): 
     self.render("home.html") 

    def post(self, *args, **kwargs): 
     input_dict = {'foo': 'bar'} 

     producer(input_dict) 

     self.redirect("/") 


def main(): 
    tornado.options.parse_command_line() 
    http_server = tornado.httpserver.HTTPServer(Application()) 
    http_server.listen(options.port) 
    tornado.ioloop.IOLoop.current().start() 


def start_consumer(): 
    tornado.ioloop.IOLoop.current().spawn_callback(consumer) 


if __name__ == "__main__": 
    tornado.ioloop.IOLoop.current().run_sync(start_consumer) 
    main() 

ответ

1

Что вы пытаетесь достичь путем объединения Queue и ProcessPoolExecutor? У исполнителя уже есть собственная внутренняя очередь. Все, что вам нужно сделать, это сделать ProcessPoolExecutor глобальным (он не должен быть глобальным, но вы захотите сделать что-то похожее на глобальное, даже если вы сохраняете очередь; нет смысла создавать новый ProcessPoolExecutor каждый раз через петлю consumer) и отправлять вещи прямо из обработчика.

@gen.coroutine 
def post(self): 
    input_dict = ... 
    result = yield executor.submit(expensive_function, input_dict) 
+0

Идея заключалась в том, что я не могу иметь больше чем N экземпляров 'expensive_function' работает на сервере в то время, так что я хотел некоторые управления в месте, которое будет контролировать, сколько' expensive_calculation's работают. Тогда, конечно, «ProcessPoolExecutor» будет заниматься обработкой процессов для рабочих. Определяет ли 'ProcessPoolExecutor' размер внутренней максимальной очереди? Я просто не хочу, чтобы 100 пользователей одновременно отправляли задание и имели каркас сервера. – ionick

+0

Параметр 'max_workers' для' ProcessPoolExecutor' определяет, сколько экземпляров 'cost_calculation' может работать одновременно. Остальные запросы будут ждать очереди в очереди. Эта очередь не ограничена, в отличие от «Queue (maxsize = 2)», которую вы создаете здесь, но это не имеет значения: независимо от того, находятся ли запросы в очереди или ждут их шанса войти в очередь, эффект и стоимость одинаковы. –

+0

А, я вижу. Я думал о «max_workers» как аналоге «количество ядер» (т. Е. 100 заданий будут просто разделены, например, на 4 ядра сразу). Это имеет гораздо больший смысл. Спасибо за помощь. – ionick