2016-03-01 5 views
1

У меня есть кластер kafka, получающий сообщения. message - это байтовый массив zip-файла. zip-файл содержит двоичные файлы данных protobuf в качестве записей. Я читаю zip-файл и пытаюсь десериализовать записи protobuf, вот где мой код поражает "protocol message has invalid UTF-8,invalid tag" исключения.Невозможно десериализовать сжатые буферы протокола

Я могу десериализовать двоичный файл protobuf, прежде чем отправлять его в качестве заархивированного байтового массива для брокера.

но когда я застегиваю эти бинарные файлы protobuf, создаю сообщения для kafka, потребляю их, а затем пытаюсь десериализовать записи в потоке zip, я сталкиваюсь с проблемами.

Я не уверен, какой из них является виновником.

Поскольку эти бинарные буферы протокола GZipped, они снова замаскировали их?

может кто-то пролить некоторый свет.

Благодаря

************** ************** редактировать

Producer Side: 

public byte[] getZipfileBytes() { 
     ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
     ZipOutputStream zipOut = new ZipOutputStream(baos); 
     CheckedOutputStream checkSum = new CheckedOutputStream(zipOut, new Adler32()); 

     try { 
      ZipEntry zipEntry = new ZipEntry(testFile.getName()); 
      byte[] protoBytes = IOUtils.toByteArray(new FileInputStream(testFile)); 
      System.out.println("bytes length:\t"+protoBytes.length); 
      zipEntry.setSize(protoBytes.length); 
      zipOut.putNextEntry(zipEntry); 
      zipOut.write(protoBytes); 
      zipOut.close(); 
      System.out.println("checksum:"+checkSum.getChecksum().getValue()); 
      zipBytes = baos.toByteArray(); 
     } catch (IOException e) { 
      e.printStackTrace(); 
     } 

     return zipBytes; 
    } 



    Consumer Side: 
     processConsumerRecord(ConsumerRecord<String, byte[]> record) { 
       String key = record.key(); 
       byte[] dataPacket = record.value(); 

       ZipInputStream zipIn = new ZipInputStream(new ByteArrayInputStream(dataPacket)); 

       CheckedInputStream checkSum = new CheckedInputStream(zipIn, 
         new Adler32()); 
       ZipEntry zipEntry; 
       try { 
        zipEntry = zipIn.getNextEntry(); 
        while (zipEntry != null) { 
         String name = zipEntry.getName(); 
         ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
          try { 
           IOUtils.copy(zipIn, baos); 
           byte[] protoBytes = baos.toByteArray(); 

двоичные Protobuf байтов gzipped, так что мне нужно gunzip

если я сделаю gunzip, это не будет в формате gzip.

если я пропустил gunzip и сделаю parseFrom, я получаю неверные исключения тегов. .

GZIPInputStream gzip = new GZIPInputStream(
         new ByteArrayInputStream(baos.toByteArray())); 
         MyProtoObject mpo = null; 
         try { 
          mpo = MyProtoObject.parseFrom(protoBytes); 
         } catch (InvalidProtocolBufferException e1) { 
          e1.printStackTrace(); 
         } 
        } catch (IOException e1) { 
         e1.printStackTrace(); 
        } 

checkSum.getChecksum() GetValue() возвращает 1, в то время производства и потребления почтовый массив байтов
следующие значения переменной ZipEntry во время отладки:

producer 
     zipEntry ZipEntry (id=44) 
     comment null  
     crc 2147247736 
     csize 86794 
     extra null  
     flag 2056  
     method 8 
     name "test.dat" (id=49) 
     size 92931 
     time 1214084891 


    consumer 
     zipEntry ZipEntry (id=34) 
     comment null  
     crc 2147247736 
     csize 86794 
     extra null  
     flag 0 
     method 8 
     name "test.dat" (id=39) 
     size 92931 
     time 1214084891 

я даже проверил другой способ , вместо обработки protobytes в памяти, я написал zip-файл на диск, вытащил его вручную через winzip и затем десериализовал извлеченный бинарный прото-файл, он сработал !!!

Я делаю молнию/разархивировать неправильный путь, дайте мне знать

ответ

0

Есть две разные вещи в игре здесь: застежка-молния/разархивировать, и дело с Protobuf. Похоже, проблема здесь первая, и похоже, что она искажает данные protobuf. Итак, на данный момент: забудьте про protobuf и просто сосредоточьтесь на zip/unzip. Запишите, что было оригинальным сообщением было (перед тем, как вы его застегнули - возможно, в виде двоичного файла или фрагмента базы-64). Теперь на принимающей стороне отследите, что вы получаете в двоичном формате после того, как вы распаковали его (опять же, двоичный файл или блок базы-64). Если они не совсем на 100% идентичны, то все остальные ставки отключены. Пока вы не сможете успешно воспроизвести оригинальный исходный двоичный файл, protobuf не имеет шансов.

Если это проблема: было бы неплохо показать ваш почтовый индекс, чтобы мы могли его увидеть.

Если вы : правильно сжимаете/декомпрессируете двоичный файл, тогда проблема будет в вашем протобуф-коде.

Если это проблема: было бы полезно показать код сериализации/десериализации, чтобы мы могли его увидеть.

+0

Может мало/большое преобразование конца (или его отсутствие) может вызвать проблемы? – Emil

+0

@ На вашей избранной библиотеке протобуфа должна быть внутренняя внутренняя граница; спецификация protobuf делает эти данные ясными, и это удивит меня, если какая-либо установленная библиотека получила это неправильно. Ваш код zip/unzip * не должен заботиться о endianness * - ему просто нужно получить одинаковые байты в том же порядке как в начале, так и в конце процесса. –

+0

Я полностью согласен. Как вы сказали, я думаю, что неплохо проигнорировать шаг protobuf и убедиться, что шаги zip/unzip работают по назначению. Просто говоря, что endianess может вызвать проблемы при передаче данных – Emil

Смежные вопросы