это мой первый пост здесь!Python TCP Payload Duplication - Передача данных нескольким конечным точкам одновременно
Моя цель - дублировать полезную нагрузку однонаправленного потока TCP и отправлять эту полезную нагрузку нескольким конечным точкам одновременно. У меня есть рабочий прототип, написанный на Python, однако я новичок в Python и в программировании Socket. В идеале решение может работать как в средах Windows, так и в * nix.
Этот прототип работает, однако он создает новое TCP-соединение для отправки для каждой длины буфера (в настоящее время установлено 4096 байт). Основная проблема заключается в том, что в конечном итоге у меня не будет локальных портов для отправки, и в идеале я хотел бы, чтобы данные передавались от каждого входящего потока TCP до одного потока TCP (для каждой конечной точки). Входящие данные могут варьироваться от менее 1024 байт до сотен мегабайт.
В настоящий момент инициируется новый исходящий поток TCP для каждых 4096 байт. Я не уверен, что проблема в моей реализации потоковой передачи, или если я пропустил что-то еще очень очевидное.
В моих исследованиях я обнаружил, что select() может помочь, однако я не уверен, что это было бы уместно, потому что мне может потребоваться обработать некоторые входящие данные и ответить на отправляющий клиент для определенных случаев в будущем ,
Вот код, который я до сих пор (некоторые из вариантов кода я попытался закомментировано):
#!/usr/bin/python
#One way TCP payload duplication
import sys
import threading
from socket import *
bufsize = 4096
host= ''
# Methods:
#handles sending the data to the endpoints
def send(endpoint,port,data):
sendSocket = socket(AF_INET, SOCK_STREAM)
#sendSocket.setblocking(1)
sendSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
#sendport = sendSocket.getsockname
#print sendport
try:
sendSocket.connect((endpoint, port))
sendSocket.sendall(data)
except IOError as msg:
print "Send Failed. Error Code: " + str(msg[0]) + ' Message: ' + msg[1]
sys.exit()
#handles threading for sending data to endpoints
def forward(service, ENDPOINT_LIST, port, data):
#for each endpoint in the endpoint list start a new send thread
for endpoint in ENDPOINT_LIST:
print "Forwarding data for %s from %s:%s to %s:%s" % (service,host,port,endpoint,port)
#send(endpoint,port,data)
ethread = threading.Thread(target=send, args=(endpoint,port,data))
ethread.start()
#handles threading for incoming clients
def clientthread(conn,service,ENDPOINT_LIST,port):
while True:
#receive data form client
data = conn.recv(bufsize)
if not data:
break
cthread = threading.Thread(target=forward, args=(service, ENDPOINT_LIST, port, data))
cthread.start()
#no data? then close the connection
conn.close()
#handles listening to sockets for incoming connections
def listen(service, ENDPOINT_LIST, port):
#create the socket
listenSocket = socket(AF_INET, SOCK_STREAM)
#Allow reusing addresses - I think this is important to stop local ports getting eaten up by never-ending tcp streams that don't close
listenSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
#try to bind the socket to host and port
try:
listenSocket.bind((host, port))
#display an error message if you can't
except IOError as msg:
print "Bind Failed. Error Code: " + str(msg[0]) + ' Message: ' + msg[1]
sys.exit()
#start listening on the socket
listenSocket.listen(10)
print "Service %s on port %s is listening" %(service,port)
while True:
#wait to accept a connection
conn, addr = listenSocket.accept()
print 'Connected to ' + addr[0] + ':' + str(addr[1]) + ' on port ' + str(port)
#start new thread for each connection
lthread = threading.Thread(target=clientthread , args=(conn,service,ENDPOINT_LIST,port))
lthread.start()
#If no data close the connection
listenSocket.close()
service = "Dumb-one-way-tcp-service-name1"
ENDPOINT_LIST = ["192.168.1.100","192.168.1.200"]
port = 55551
listen(service,ENDPOINT_LIST,port)
Я посмотрел на другие библиотеки, чтобы попытаться достичь своей цели, в том числе с помощью:
- Twisted
- Asyncore
- Scapy
Однако я нашел их довольно сложными для моих скромных потребностей и уровня навыков программирования.
Если у кого-то есть предложения относительно того, как я мог бы уточнить подход, который у меня есть, или любые другие способы достижения этой цели, пожалуйста, дайте мне знать!
Есть несколько решений, которые проще или сложнее, в зависимости от требуемой синхронизации. Скажем, вы получите 10 МБ и хотите передать его в 3 пункта назначения. Скажем, номер 3-го места медленно принимает поток данных. Вы хотите получить все 10 МБ быстро и отправить его сразу двум другим пунктам назначения и буферизировать его в памяти для третьего? Или это нормально (или даже лучше), чтобы замедлить все это, т. Е. Получать и передавать 10 МБ со скоростью самого медленного соединения? –
@ArminRigo, я думаю, что синхронизация важна. Я хочу быстро принять данные. Если конечная точка имеет пропускную способность, то пересылаемый поток должен быть отправлен с той же скоростью, на которую он пришел, однако, если конечная точка медленнее, чем другая (что, вероятно, не будет проблемой для меня), было бы хорошо, если бы она буферизовала полезная нагрузка до его отправки. Я подумал, что в Python это будет иметь место в том, что он будет буферизовать входящую полезную нагрузку в ОЗУ, когда он пытается ее отправить, или мне нужно явно указать ему, чтобы он где-то ее где-то буферизовал? И вы думаете, что его буферизация может помочь мне решить многие проблемы? – BeSure