2015-03-27 4 views
4

У меня есть устройство, подключенное через usb, и я использую pyUSB для взаимодействия с данными.pyUSB получить непрерывный поток данных от датчика

Это то, что мой код в настоящее время выглядит следующим образом:

import usb.core 
import usb.util 

def main(): 
    device = usb.core.find(idVendor=0x072F, idProduct=0x2200) 

    # use the first/default configuration 
    device.set_configuration() 

    # first endpoint 
    endpoint = device[0][(0,0)][0] 

    # read a data packet 
    data = None 
    while True: 
     try: 
      data = device.read(endpoint.bEndpointAddress, 
           endpoint.wMaxPacketSize) 
      print data 

     except usb.core.USBError as e: 
      data = None 
      if e.args == ('Operation timed out',): 

       continue 

if __name__ == '__main__': 
    main() 

Он базируется читателя мыши, но данные, которые я получаю не делает для меня смысл:

array('B', [80, 3]) 
array('B', [80, 2]) 
array('B', [80, 3]) 
array('B', [80, 2]) 

Я предполагаю, что он читает только часть того, что на самом деле предоставляется? Я попытался установить maxpacketsize больше, но ничего.

ответ

2

pyUSB отправляет и принимает данные в строковом формате. Данные, которые вы получаете, - это коды ASCII. Вам нужно добавить следующую строку, чтобы правильно прочитать данные в коде.

data = device.read(endpoint.bEndpointAddress, 
          endpoint.wMaxPacketSize) 

RxData = ''.join([chr(x) for x in data]) 
print RxData 

Функция chr(x) преобразует ASCII коды в строку. Это должно решить вашу проблему.

1

Я всего лишь случайный пользователь Python, так что будьте осторожны. Если ваш скрипт python не может справиться с количеством данных, которые будут отобраны, то это то, что работает для меня. Я отправляю от UC к блокам ПК по 64 байта. Я использую буфер для хранения своих образцов, а затем я сохраняю их в файле или замышляю. Я корректирую число, умноженное на 64 (10 в примере ниже), пока не получаю все образцы, которые я ожидал.

# Initialization 
rxBytes = array.array('B', [0]) * (64 * 10) 
rxBuffer = array.array('B') 

В цикле, я получаю новые образцы и хранить их в буфере

# Get new samples 
hid_dev.read(endpoint.bEndpointAddress, rxBytes) 
rxBuffer.extend(rxBytes) 

Надеется, что это помогает.

Смежные вопросы