У меня много потерь пакетов, использующих UDP в python. Я знаю, что я должен использовать TCP, если мне не нужна потеря пакетов, но у меня нет (полного) контроля над отправителем.Получать UDP-пакеты с python, потеря пакетов
Это камера, которая отправляет 15 изображений в секунду, используя многоадресную рассылку UDP.
Ниже вы видите код, который я написал сейчас. Он использует многопроцессорную обработку, чтобы позволить функции производителя и потребителя работать параллельно. Функция производителя захватывает пакеты, потребительская функция обрабатывает их и записывает изображения в файлы .bmp.
Я написал класс PacketStream, который записывает байты из пакетов в файл .bmp.
Когда камера отправляет новое изображение, он сначала отправляет один пакет с первым байтом = 0x01. Это содержит информацию об изображении. Затем отправляется 612 пакетов с первым байтом = 0x02. Они содержат байты из изображения (508 байт/пакет).
Поскольку в секунду отправляется 15 изображений, в каждую секунду отправляется ~ 9000 пакетов. Более того, это происходит более быстрыми темпами в пакетах на изображение, при ~ 22 пакетах/мс.
Я могу получить все пакеты с помощью tcpdump или wirehark. Но используя приведенный ниже код, пакеты пропущены. Наверняка, мой Windows 7 ПК должен уметь справиться с этим? Я также использую его на малине pi 3, и там более или менее одинаковое количество пакетов пропущено. Поэтому я думаю, что это проблема с кодом.
Я пробовал много разных вещей, таких как потоки вместо многопроцессорности, Труба вместо очереди.
Я также попытался увеличения буфера сокета с
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 3000000)
безрезультатно.
Возможно ли это в python?
Спасибо заранее,
import time
from multiprocessing import Process, Queue
import socket
import struct
from PIL import Image
class PacketStream:
def __init__(self, output_path):
self.output_path = output_path
self.data_buffer = ''
self.img_id = -1 # -1 = waiting for start of new image
def process(self, data):
message_id = data[0]
if message_id == '\x01':
self.wrap_up_last_image()
self.img_id = ord(data[3])
self.data_buffer = ''
if message_id == '\x02':
self.data_buffer += data[6:]
def wrap_up_last_image(self):
if self.img_id > 0:
n_bytes = len(self.data_buffer)
if n_bytes == 307200:
global i
write_image(self.output_path + str(i).zfill(7) + '_' + str(self.img_id).zfill(3) + '.bmp',
self.data_buffer)
i += 1
else:
print 'Image lost: %s bytes missing.' % (307200 - n_bytes)
def write_image(path, data):
im = Image.frombuffer('L', (640, 480), bytearray(data), 'raw', 'L', 0, 1)
im.save(path)
print time.time(), path
def producer(q):
# setup socket
MCAST_GRP = '239.255.83.71'
MCAST_PORT = 2271
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('', MCAST_PORT))
mreq = struct.pack('4sl', socket.inet_aton(MCAST_GRP), socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
while True:
q.put(sock.recv(512))
def consumer(q):
packet_stream = PacketStream('D:/bmpdump/')
while True:
data = q.get()
packet_stream.process(data)
i = 0
if __name__ == '__main__':
q = Queue()
t1 = Process(target=producer, args=(q,))
t1.daemon = True # so they stop when the main prog stops
t1.start()
t2 = Process(target=consumer, args=(q,))
t2.daemon = True
t2.start()
time.sleep(10.0)
print 'Program finished.'
EDIT
Спасибо за все предложения.
1) Я уже пробовал threading + queue, также '.join(), похоже, не имеет большого значения. Я совершенно уверен, что проблема заключается в том, что поток производителя не получает достаточного приоритета. Я не могу найти, как увеличить это с помощью Python? Возможно ли это?
2) Мне удалось потерять только около 10%, используя приведенный ниже код. Процессор на ~ 25% (на Raspberry Pi) Ключ к потреблению данных, когда есть пауза в потоке пакетов, то есть, когда последний пакет данных прибыл
import time
import socket
import struct
from PIL import Image
def write_image(path, data):
im = Image.frombuffer('L', (640, 480), bytearray(data), 'raw', 'L', 0, 1)
im.save(path)
print time.time(), path
def consume(data_buffer):
img_id = ord(data_buffer[0][1])
real_data_buffer = [data[6:] for data in data_buffer]
data_string = ''.join(real_data_buffer)
global i
write_image('/media/pi/exthdd_02/bmpdump/' + str(i).zfill(7) + '_' + str(img_id).zfill(3) + '.bmp', data_string)
i += 1
def producer(sock):
print 'Producer start'
data_buffer = []
while True:
data = sock.recvfrom(512)[0]
if data[0] == '\x01':
data_buffer = []
else:
data_buffer.append(data)
if len(data_buffer) == 612:
consume(data_buffer)
# image counter
i = 0
# setup socket
MCAST_GRP = '239.255.83.71'
MCAST_PORT = 2271
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((MCAST_GRP, MCAST_PORT))
mreq = struct.pack("4sl", socket.inet_aton(MCAST_GRP), socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 30000000)
producer(sock)
Я думаю, что вы должны использовать '' sock.recvfrom'', а не '' sock.recv'' при работе с сокетами UDP. Может быть, это помогает? –
Возможно, вам потребуется увеличить приоритет потока/процесса чтения данных UDP-данных (используя sched_setscheduler() или аналогичный в Linux или аналогичный API для любой другой ОС, которую вы могли бы использовать. Я не уверен, что API Python что соответствует этому). Таким образом, у вас будет меньше шансов, что ваш поток читателей будет удерживаться от процессора другой задачей, что может привести к получению полного приема-буфера и упавших пакетов. –