Я пытаюсь использовать новый объект Tornado queue вместе с concurrent.futures
, чтобы позволить моему веб-серверу передавать задачи, связанные с процессором, в другие процессы. Я хочу иметь доступ к объекту Future
, который возвращается из ProcessPoolExecutor
из модуля concurrent.futures
, чтобы я мог запросить его состояние для отображения на переднем конце (например, покажите, что процесс в данный момент запущен, покажите, что он завершен).Queue and ProcessPoolExecutor in Tornado
Я, кажется, есть два препятствия, с помощью этого метода:
- Как я могу представить несколько
q.get()
объектов вProcessPoolExecutor
, а также иметь доступ к возвращаемымFuture
объектов? - Как я могу позволить
HomeHandler
получить доступ к объектуFuture
, который был возвращенProcessPoolExecutor
, чтобы я мог отображать информацию о состоянии на переднем конце?
Спасибо за любую помощь.
from tornado import gen
from tornado.ioloop import IOLoop
from tornado.queues import Queue
from concurrent.futures import ProcessPoolExecutor
define("port", default=8888, help="run on the given port", type=int)
q = Queue(maxsize=2)
def expensive_function(input_dict):
gen.sleep(1)
@gen.coroutine
def consumer():
while True:
input_dict = yield q.get()
try:
with ProcessPoolExecutor(max_workers=4) as executor:
future = executor.submit(expensive_function, input_dict)
finally:
q.task_done()
@gen.coroutine
def producer(input_dict):
yield q.put(input_dict)
class Application(tornado.web.Application):
def __init__(self):
handlers = [
(r"/", HomeHandler),
]
settings = dict(
blog_title=u"test",
template_path=os.path.join(os.path.dirname(__file__), "templates"),
static_path=os.path.join(os.path.dirname(__file__), "static"),
debug=True,
)
super(Application, self).__init__(handlers, **settings)
class HomeHandler(tornado.web.RequestHandler):
def get(self):
self.render("home.html")
def post(self, *args, **kwargs):
input_dict = {'foo': 'bar'}
producer(input_dict)
self.redirect("/")
def main():
tornado.options.parse_command_line()
http_server = tornado.httpserver.HTTPServer(Application())
http_server.listen(options.port)
tornado.ioloop.IOLoop.current().start()
def start_consumer():
tornado.ioloop.IOLoop.current().spawn_callback(consumer)
if __name__ == "__main__":
tornado.ioloop.IOLoop.current().run_sync(start_consumer)
main()
Идея заключалась в том, что я не могу иметь больше чем N экземпляров 'expensive_function' работает на сервере в то время, так что я хотел некоторые управления в месте, которое будет контролировать, сколько' expensive_calculation's работают. Тогда, конечно, «ProcessPoolExecutor» будет заниматься обработкой процессов для рабочих. Определяет ли 'ProcessPoolExecutor' размер внутренней максимальной очереди? Я просто не хочу, чтобы 100 пользователей одновременно отправляли задание и имели каркас сервера. – ionick
Параметр 'max_workers' для' ProcessPoolExecutor' определяет, сколько экземпляров 'cost_calculation' может работать одновременно. Остальные запросы будут ждать очереди в очереди. Эта очередь не ограничена, в отличие от «Queue (maxsize = 2)», которую вы создаете здесь, но это не имеет значения: независимо от того, находятся ли запросы в очереди или ждут их шанса войти в очередь, эффект и стоимость одинаковы. –
А, я вижу. Я думал о «max_workers» как аналоге «количество ядер» (т. Е. 100 заданий будут просто разделены, например, на 4 ядра сразу). Это имеет гораздо больший смысл. Спасибо за помощь. – ionick