2014-10-10 2 views
1

это мой первый пост здесь!Python TCP Payload Duplication - Передача данных нескольким конечным точкам одновременно

Моя цель - дублировать полезную нагрузку однонаправленного потока TCP и отправлять эту полезную нагрузку нескольким конечным точкам одновременно. У меня есть рабочий прототип, написанный на Python, однако я новичок в Python и в программировании Socket. В идеале решение может работать как в средах Windows, так и в * nix.

Этот прототип работает, однако он создает новое TCP-соединение для отправки для каждой длины буфера (в настоящее время установлено 4096 байт). Основная проблема заключается в том, что в конечном итоге у меня не будет локальных портов для отправки, и в идеале я хотел бы, чтобы данные передавались от каждого входящего потока TCP до одного потока TCP (для каждой конечной точки). Входящие данные могут варьироваться от менее 1024 байт до сотен мегабайт.

В настоящий момент инициируется новый исходящий поток TCP для каждых 4096 байт. Я не уверен, что проблема в моей реализации потоковой передачи, или если я пропустил что-то еще очень очевидное.

В моих исследованиях я обнаружил, что select() может помочь, однако я не уверен, что это было бы уместно, потому что мне может потребоваться обработать некоторые входящие данные и ответить на отправляющий клиент для определенных случаев в будущем ,

Вот код, который я до сих пор (некоторые из вариантов кода я попытался закомментировано):

#!/usr/bin/python 
#One way TCP payload duplication 
import sys 
import threading 
from socket import * 
bufsize = 4096 
host= '' 

# Methods: 
#handles sending the data to the endpoints 
def send(endpoint,port,data): 
    sendSocket = socket(AF_INET, SOCK_STREAM) 
    #sendSocket.setblocking(1) 
    sendSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) 
    #sendport = sendSocket.getsockname 
    #print sendport 
    try: 
     sendSocket.connect((endpoint, port)) 
     sendSocket.sendall(data) 
    except IOError as msg: 
     print "Send Failed. Error Code: " + str(msg[0]) + ' Message: ' + msg[1] 
     sys.exit() 

#handles threading for sending data to endpoints 
def forward(service, ENDPOINT_LIST, port, data): 
    #for each endpoint in the endpoint list start a new send thread 
    for endpoint in ENDPOINT_LIST: 
     print "Forwarding data for %s from %s:%s to %s:%s" % (service,host,port,endpoint,port) 
     #send(endpoint,port,data) 
     ethread = threading.Thread(target=send, args=(endpoint,port,data)) 
     ethread.start() 

#handles threading for incoming clients 
def clientthread(conn,service,ENDPOINT_LIST,port): 
    while True: 
     #receive data form client 
     data = conn.recv(bufsize) 
     if not data: 
      break 
     cthread = threading.Thread(target=forward, args=(service, ENDPOINT_LIST, port, data)) 
     cthread.start() 
    #no data? then close the connection 
    conn.close() 

#handles listening to sockets for incoming connections 
def listen(service, ENDPOINT_LIST, port): 
    #create the socket 
    listenSocket = socket(AF_INET, SOCK_STREAM) 
    #Allow reusing addresses - I think this is important to stop local ports getting eaten up by never-ending tcp streams that don't close 
    listenSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) 
    #try to bind the socket to host and port 
    try: 
     listenSocket.bind((host, port)) 
    #display an error message if you can't 
    except IOError as msg: 
     print "Bind Failed. Error Code: " + str(msg[0]) + ' Message: ' + msg[1] 
     sys.exit() 
    #start listening on the socket 
    listenSocket.listen(10) 
    print "Service %s on port %s is listening" %(service,port) 
    while True: 
     #wait to accept a connection 
     conn, addr = listenSocket.accept() 
     print 'Connected to ' + addr[0] + ':' + str(addr[1]) + ' on port ' + str(port) 
     #start new thread for each connection 
     lthread = threading.Thread(target=clientthread , args=(conn,service,ENDPOINT_LIST,port)) 
     lthread.start() 
    #If no data close the connection 
    listenSocket.close() 

service = "Dumb-one-way-tcp-service-name1" 
ENDPOINT_LIST = ["192.168.1.100","192.168.1.200"] 
port = 55551  
listen(service,ENDPOINT_LIST,port) 

Я посмотрел на другие библиотеки, чтобы попытаться достичь своей цели, в том числе с помощью:

  • Twisted
  • Asyncore
  • Scapy

Однако я нашел их довольно сложными для моих скромных потребностей и уровня навыков программирования.

Если у кого-то есть предложения относительно того, как я мог бы уточнить подход, который у меня есть, или любые другие способы достижения этой цели, пожалуйста, дайте мне знать!

+0

Есть несколько решений, которые проще или сложнее, в зависимости от требуемой синхронизации. Скажем, вы получите 10 МБ и хотите передать его в 3 пункта назначения. Скажем, номер 3-го места медленно принимает поток данных. Вы хотите получить все 10 МБ быстро и отправить его сразу двум другим пунктам назначения и буферизировать его в памяти для третьего? Или это нормально (или даже лучше), чтобы замедлить все это, т. Е. Получать и передавать 10 МБ со скоростью самого медленного соединения? –

+0

@ArminRigo, я думаю, что синхронизация важна. Я хочу быстро принять данные. Если конечная точка имеет пропускную способность, то пересылаемый поток должен быть отправлен с той же скоростью, на которую он пришел, однако, если конечная точка медленнее, чем другая (что, вероятно, не будет проблемой для меня), было бы хорошо, если бы она буферизовала полезная нагрузка до его отправки. Я подумал, что в Python это будет иметь место в том, что он будет буферизовать входящую полезную нагрузку в ОЗУ, когда он пытается ее отправить, или мне нужно явно указать ему, чтобы он где-то ее где-то буферизовал? И вы думаете, что его буферизация может помочь мне решить многие проблемы? – BeSure

ответ

0

Короче говоря, ваш вопрос в том, что портов недостаточно, не так ли? Кажется, вы не закрывали сокет после отправки. Попробуйте это в send():

... 
except IOError as msg: 
    print "Send Failed. Error Code: " + str(msg[0]) + ' Message: ' + msg[1] 
    sys.exit() 
finally: 
    sendSocket.close() 
+0

Это, вероятно, хорошая идея закрыть соединение в последнем случае, но это не помогло остановить новое исходящее TCP-соединение для каждой длины буфера – BeSure

+0

@BeSure Таким образом, вы не хотите отправлять трафик каждый раз в новом соедините? –

+0

Нет. Я не для каждого входящего соединения. Я хочу передать его полезную нагрузку в одно TCP-соединение с каждой конечной точкой. Полезная нагрузка может варьироваться до сотен мегабайт. – BeSure

0

Есть два способа, если вы не хотите, чтобы узнать некоторые более продвинутые структуры, как Twisted.

Ближайший к тому, что вы делаете: используйте потоки, но вам нужно иметь один поток для исходящего соединения --- и не за исходящий пакет. Создайте 3 Queue.Queue объектов и создайте 3 потока, передавая каждому из объектов Queue и одно из направлений. Каждый поток открывает сокет, а затем в цикле он получает следующую строку из своего собственного Queue и отправляет ее в сокет. Clientthread (который может быть только основным потоком, априори) получает данные как строки и помещает каждую из этих строк во все очереди. Таким образом, отправленные пакеты не выходят из строя, как могли, если вы создаете один поток для каждого пакета.

Альтернатива заключается в том, чтобы избежать нитей полностью и использовать select(). Это немного больше ума.В основном у вас есть только один большой цикл, который начинается с select(). Он должен тщательно управлять, чтобы передать правильный список сокетов на select(): вы хотите, чтобы звонок select(), чтобы проснуться либо, когда есть входящие данные из входящего сокета, или, если исходящий сокет готов к отправке большего количества и есть что-то еще для отправки. В этой модели у вас будет 3 списка строк; когда вы читаете входящие данные, вы добавляете их ко всем трем спискам; вызов select() передается список исходящих сокетов, у которых есть непустой список (так, больше для отправки); и при отправке вы не должны использовать sendall() в этой модели, но send(), и если было отправлено меньше полной строки, вы должны повторно добавить остаток в начало соответствующего списка.

+0

Я думаю, что Queue - это подход, который я хочу принять, я попытался реализовать ваши предложения, и у меня есть работа в Queue, однако я не уверен, как структурировать потоки, я все еще создаю слишком много потоков, которые не работают для больших файлов, и я не могу понять, почему ... – BeSure

Смежные вопросы