2016-10-25 4 views
0

Я хочу настроить связь между несколькими процессами Торнадо, каждый из которых действует как веб-серверы I.e. используя tornado.web.RequestHandler. Идея заключается в том, что я хочу полностью спрятанную сеть между процессами. У меня есть 4 процесса, и я хочу, чтобы установить постоянный постоянную связь между ними с помощью tornado.tcpserver и tornado.tcpclient:Tornado TCP Server/Клиентская связь процесса

T1---T2 
| \ /| 
| \/ | 
|/\ | 
T3---T4 

Я новичок в программировании TCP Однако в примере, который я видел в документации Торнадо: http://www.tornadoweb.org/en/stable/iostream.html Под Реализации для класса tornado.iostream.IOStream после того, как установлен сокет, все сообщения выполнены, а затем сокет закрыт. В примере приводят код через блоки с обратными вызовами, каждый из которых выполняет свою обязанность.

Однако можно ли открыть TCP-соединение и иметь BaseIOStream.read_until_close() в режиме ожидания и вызываться, только когда клиент пишет на сервер?

Другими словами, клиент и сервер остаются подключенными, и когда клиент пишет на сервер, он каким-то образом прерывает IOLoop Tornado, чтобы вызвать read()?

Или мое мышление ошибочно, и способ сделать это каждый раз, когда мне нужны процессы для связи, я устанавливаю новое TCP-соединение, выполняю работу и затем убиваю соединение? Это только кажется, что создание этой новое соединение каждый раз будет содержать много накладных расходов, а не оставляя открытым соединение ...

+0

В основном мне интересно, есть ли у TCPServer что-то, что эквивалентно WebSocketHandler.on_message() –

ответ

2

Вот базовая реализация. (Я не могу обещать, что это качество продукции!) Сохраните его в файл и выполнить что-то вроде этого, каждый в другом окне терминала:

> python myscript.py 10001 10002 10003 
> python myscript.py 10002 10003 10001 
> python myscript.py 10003 10001 10002 

Первый аргумент является прослушивание порта, остальные аргументы являются порты других серверов.

import argparse 
import logging 
import os 
import random 
import socket 
import struct 

from tornado import gen 
from tornado.ioloop import IOLoop 
from tornado.iostream import IOStream, StreamClosedError 
from tornado.tcpclient import TCPClient 
from tornado.tcpserver import TCPServer 
from tornado.options import options as tornado_options 


parser = argparse.ArgumentParser() 
parser.add_argument("port", type=int, help="port to listen on") 
parser.add_argument("peers", type=int, nargs="+", help="peers' ports") 
opts = parser.parse_args() 

# This is just to configure Tornado logging. 
tornado_options.parse_command_line() 
logger = logging.getLogger(os.path.basename(__file__)) 
logger.setLevel(logging.INFO) 

# Cache this struct definition; important optimization. 
int_struct = struct.Struct("<i") 
_UNPACK_INT = int_struct.unpack 
_PACK_INT = int_struct.pack 

tcp_client = TCPClient() 


@gen.coroutine 
def client(port): 
    while True: 
     try: 
      stream = yield tcp_client.connect('localhost', port) 
      logging.info("Connected to %d", port) 

      # Set TCP_NODELAY/disable Nagle's Algorithm. 
      stream.set_nodelay(True) 

      while True: 
       msg = ("Hello from port %d" % opts.port).encode() 
       length = _PACK_INT(len(msg)) 
       yield stream.write(length + msg) 
       yield gen.sleep(random.random() * 10) 

     except StreamClosedError as exc: 
      logger.error("Error connecting to %d: %s", port, exc) 
      yield gen.sleep(5) 


loop = IOLoop.current() 

for peer in opts.peers: 
    loop.spawn_callback(client, peer) 


class MyServer(TCPServer): 
    @gen.coroutine 
    def handle_stream(self, stream, address): 
     logging.info("Connection from peer") 
     try: 
      while True: 
       # Read 4 bytes. 
       header = yield stream.read_bytes(4) 

       # Convert from network order to int. 
       length = _UNPACK_INT(header)[0] 

       msg = yield stream.read_bytes(length) 
       logger.info('"%s"' % msg.decode()) 

       del msg # Dereference msg in case it's big. 

     except StreamClosedError: 
      logger.error("%s disconnected", address) 


server = MyServer() 
server.listen(opts.port) 

loop.start() 

Обратите внимание, что мы не называем read_until_close, так что нам нужно каким-то образом, чтобы знать, когда сообщение полностью получено. Я делаю это с 32-разрядным целым в начале каждого сообщения, которое кодирует длину остальной части сообщения.

Вы спросили: «Когда клиент пишет сервер, он каким-то образом прерывает IOLoop Tornado, чтобы вызвать read()?» Это то, на что рассчитан IOLoop для Tornado, и это то, что мы подразумеваем под «асинхронным»: многие сопрограммы Tornado или обратные вызовы могут ждать сетевых событий, и IOLoop пробуждает их, когда происходят события, которые они ожидают. Это то, что происходит везде, где вы видите «доходность» в коде выше.

+0

Ahh I см. Джесси! Поэтому read_bytes() в бесконечном цикле будет выдаваться до тех пор, пока не будет записана информация. Спасибо за то, что я подробно рассмотрю ваш пример и выполнил его. Просто взглянуть на какую-либо причину, по которой вы использовали метод unsigned int? Это имеет смысл просто любопытно? Я предполагаю, что read_until() также можно заменить вместо read_bytes() ... Еще раз спасибо! –

+1

'read_until' может быть дорогостоящим на больших сообщениях, поскольку он должен искать все данные, которые он получает, ища разделитель. Если вы можете разработать любой протокол, который вам нужен, вы получите лучшую производительность, разработав протокол, в котором вы знаете, сколько байтов вы должны прочитать. –

+0

Hi Jesse Я получаю некоторые очень странные результаты с этой программой, когда я меняю код на использование async и жду. Я продолжаю получать ошибки утверждения и предупреждать, что «MyServer.handle_stream» никогда не ждали ... Знаете ли вы, что проблема изменит вышеприведенное, чтобы использовать await и async? –

1

Однако можно ли открыть соединение TCP и имеют BaseIOStream.read_until_close() бездействует и вызывается только тогда, когда клиент пишет на сервер?

Не известно о торнадо. Но, что касается TCP, после установления соединения (сервер и клиент поддерживают состояние как «ESTABLISHED») сервер и клиент могут обмениваться данными, пока кто-либо не захочет закрыть соединение или в случае сетевых проблем, из-за которых отправляются сообщения не достигать другого конца.

Другими словами, пребывание клиент и сервер подключен и когда клиент пишет на сервер это как-то прерывает Торнадо IOLoop называть чтения()?

Да. Это должно быть так.

Или мое мышление ошибочного и способа сделать это каждый раз, когда мне нужны процессы общения я установить новое соединение TCP, выполнить работу, а затем убить соединение?

No. Каждый обмен данными должен не повторно инициация соединения TCP

+0

Спасибо за комментарии Prabhu, но я также хочу знать, как это может быть достигнуто специально с каркасом Tornado ... –

Смежные вопросы