2017-01-27 2 views
0

У меня много потерь пакетов, использующих UDP в python. Я знаю, что я должен использовать TCP, если мне не нужна потеря пакетов, но у меня нет (полного) контроля над отправителем.Получать UDP-пакеты с python, потеря пакетов

Это камера, которая отправляет 15 изображений в секунду, используя многоадресную рассылку UDP.

Ниже вы видите код, который я написал сейчас. Он использует многопроцессорную обработку, чтобы позволить функции производителя и потребителя работать параллельно. Функция производителя захватывает пакеты, потребительская функция обрабатывает их и записывает изображения в файлы .bmp.

Я написал класс PacketStream, который записывает байты из пакетов в файл .bmp.

Когда камера отправляет новое изображение, он сначала отправляет один пакет с первым байтом = 0x01. Это содержит информацию об изображении. Затем отправляется 612 пакетов с первым байтом = 0x02. Они содержат байты из изображения (508 байт/пакет).

Поскольку в секунду отправляется 15 изображений, в каждую секунду отправляется ~ 9000 пакетов. Более того, это происходит более быстрыми темпами в пакетах на изображение, при ~ 22 пакетах/мс.

Я могу получить все пакеты с помощью tcpdump или wirehark. Но используя приведенный ниже код, пакеты пропущены. Наверняка, мой Windows 7 ПК должен уметь справиться с этим? Я также использую его на малине pi 3, и там более или менее одинаковое количество пакетов пропущено. Поэтому я думаю, что это проблема с кодом.

Я пробовал много разных вещей, таких как потоки вместо многопроцессорности, Труба вместо очереди.

Я также попытался увеличения буфера сокета с

sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 3000000) 

безрезультатно.

Возможно ли это в python?

Спасибо заранее,

import time 
from multiprocessing import Process, Queue 
import socket 
import struct 
from PIL import Image 


class PacketStream: 
    def __init__(self, output_path): 
     self.output_path = output_path 
     self.data_buffer = '' 
     self.img_id = -1 # -1 = waiting for start of new image 

    def process(self, data): 
     message_id = data[0] 
     if message_id == '\x01': 
      self.wrap_up_last_image() 
      self.img_id = ord(data[3]) 
      self.data_buffer = '' 
     if message_id == '\x02': 
      self.data_buffer += data[6:] 

    def wrap_up_last_image(self): 
     if self.img_id > 0: 
      n_bytes = len(self.data_buffer) 
      if n_bytes == 307200: 
       global i 
       write_image(self.output_path + str(i).zfill(7) + '_' + str(self.img_id).zfill(3) + '.bmp', 
          self.data_buffer) 
       i += 1 
      else: 
       print 'Image lost: %s bytes missing.' % (307200 - n_bytes) 


def write_image(path, data): 
    im = Image.frombuffer('L', (640, 480), bytearray(data), 'raw', 'L', 0, 1) 
    im.save(path) 
    print time.time(), path 


def producer(q): 
    # setup socket 
    MCAST_GRP = '239.255.83.71' 
    MCAST_PORT = 2271 
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) 
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 
    sock.bind(('', MCAST_PORT)) 
    mreq = struct.pack('4sl', socket.inet_aton(MCAST_GRP), socket.INADDR_ANY) 
    sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) 
    while True: 
     q.put(sock.recv(512)) 


def consumer(q): 
    packet_stream = PacketStream('D:/bmpdump/') 
    while True: 
     data = q.get() 
     packet_stream.process(data) 

i = 0 
if __name__ == '__main__': 
    q = Queue() 

    t1 = Process(target=producer, args=(q,)) 
    t1.daemon = True # so they stop when the main prog stops 
    t1.start() 
    t2 = Process(target=consumer, args=(q,)) 
    t2.daemon = True 
    t2.start() 

    time.sleep(10.0) 

    print 'Program finished.' 

EDIT

Спасибо за все предложения.

1) Я уже пробовал threading + queue, также '.join(), похоже, не имеет большого значения. Я совершенно уверен, что проблема заключается в том, что поток производителя не получает достаточного приоритета. Я не могу найти, как увеличить это с помощью Python? Возможно ли это?

2) Мне удалось потерять только около 10%, используя приведенный ниже код. Процессор на ~ 25% (на Raspberry Pi) Ключ к потреблению данных, когда есть пауза в потоке пакетов, то есть, когда последний пакет данных прибыл

import time 
import socket 
import struct 
from PIL import Image 


def write_image(path, data): 
    im = Image.frombuffer('L', (640, 480), bytearray(data), 'raw', 'L', 0, 1) 
    im.save(path) 
    print time.time(), path 

def consume(data_buffer): 
    img_id = ord(data_buffer[0][1]) 
    real_data_buffer = [data[6:] for data in data_buffer] 
    data_string = ''.join(real_data_buffer) 

    global i 
    write_image('/media/pi/exthdd_02/bmpdump/' + str(i).zfill(7) + '_' + str(img_id).zfill(3) + '.bmp', data_string) 
    i += 1 

def producer(sock): 
    print 'Producer start' 
    data_buffer = [] 
    while True: 
     data = sock.recvfrom(512)[0] 
     if data[0] == '\x01': 
      data_buffer = [] 
     else: 
      data_buffer.append(data) 
     if len(data_buffer) == 612: 
      consume(data_buffer) 


# image counter 
i = 0 

# setup socket 
MCAST_GRP = '239.255.83.71' 
MCAST_PORT = 2271 
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) 
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 
sock.bind((MCAST_GRP, MCAST_PORT)) 
mreq = struct.pack("4sl", socket.inet_aton(MCAST_GRP), socket.INADDR_ANY) 
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) 
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 30000000) 

producer(sock) 
+0

Я думаю, что вы должны использовать '' sock.recvfrom'', а не '' sock.recv'' при работе с сокетами UDP. Может быть, это помогает? –

+0

Возможно, вам потребуется увеличить приоритет потока/процесса чтения данных UDP-данных (используя sched_setscheduler() или аналогичный в Linux или аналогичный API для любой другой ОС, которую вы могли бы использовать. Я не уверен, что API Python что соответствует этому). Таким образом, у вас будет меньше шансов, что ваш поток читателей будет удерживаться от процессора другой задачей, что может привести к получению полного приема-буфера и упавших пакетов. –

ответ

0

Несколько предложений по улучшению коды , но сначала вопрос: вы вообще не измеряли, что может замедлить работу? Например, вы посмотрели на использование ЦП вашей системы. Если вы нанесете 100%, что вполне может послужить причиной потери пакетов. Если он в основном неактивен, происходит что-то еще, и проблема не связана с производительностью кода.

Теперь некоторые предложения по улучшению кода:

  • использование socket.recvfrom вместо sock.recv при работе с UDP сокетами
  • не используют многопроцессорных с процессами, сериализации, что должно произойти, чтобы отправить данные от одного процесса к другому могут очень хорошо быть узким местом производительности, если мы говорим ~ 9000 вызовов/сек. Попробуйте использовать темы вместо этого (threading + queue модулей). Но поскольку вы не предоставляете каких-либо наблюдаемых номеров, трудно сказать.
  • не использовать конкатенацию строк для создания буфера приемника при получении пакетов. Это создает большое количество новых временных строковых объектов и копирует данные все время. Вместо этого добавьте каждый пакет в список, и когда вы получили все данные, "".join(packets) все вместе один раз в конце.
Смежные вопросы