У меня есть кластер kafka, получающий сообщения. message - это байтовый массив zip-файла. zip-файл содержит двоичные файлы данных protobuf в качестве записей. Я читаю zip-файл и пытаюсь десериализовать записи protobuf, вот где мой код поражает "protocol message has invalid UTF-8,invalid tag"
исключения.Невозможно десериализовать сжатые буферы протокола
Я могу десериализовать двоичный файл protobuf, прежде чем отправлять его в качестве заархивированного байтового массива для брокера.
но когда я застегиваю эти бинарные файлы protobuf, создаю сообщения для kafka, потребляю их, а затем пытаюсь десериализовать записи в потоке zip, я сталкиваюсь с проблемами.
Я не уверен, какой из них является виновником.
Поскольку эти бинарные буферы протокола GZipped, они снова замаскировали их?
может кто-то пролить некоторый свет.
Благодаря
************** ************** редактировать
Producer Side:
public byte[] getZipfileBytes() {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ZipOutputStream zipOut = new ZipOutputStream(baos);
CheckedOutputStream checkSum = new CheckedOutputStream(zipOut, new Adler32());
try {
ZipEntry zipEntry = new ZipEntry(testFile.getName());
byte[] protoBytes = IOUtils.toByteArray(new FileInputStream(testFile));
System.out.println("bytes length:\t"+protoBytes.length);
zipEntry.setSize(protoBytes.length);
zipOut.putNextEntry(zipEntry);
zipOut.write(protoBytes);
zipOut.close();
System.out.println("checksum:"+checkSum.getChecksum().getValue());
zipBytes = baos.toByteArray();
} catch (IOException e) {
e.printStackTrace();
}
return zipBytes;
}
Consumer Side:
processConsumerRecord(ConsumerRecord<String, byte[]> record) {
String key = record.key();
byte[] dataPacket = record.value();
ZipInputStream zipIn = new ZipInputStream(new ByteArrayInputStream(dataPacket));
CheckedInputStream checkSum = new CheckedInputStream(zipIn,
new Adler32());
ZipEntry zipEntry;
try {
zipEntry = zipIn.getNextEntry();
while (zipEntry != null) {
String name = zipEntry.getName();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
IOUtils.copy(zipIn, baos);
byte[] protoBytes = baos.toByteArray();
двоичные Protobuf байтов gzipped, так что мне нужно gunzip
если я сделаю gunzip, это не будет в формате gzip.
если я пропустил gunzip и сделаю parseFrom, я получаю неверные исключения тегов. .
GZIPInputStream gzip = new GZIPInputStream(
new ByteArrayInputStream(baos.toByteArray()));
MyProtoObject mpo = null;
try {
mpo = MyProtoObject.parseFrom(protoBytes);
} catch (InvalidProtocolBufferException e1) {
e1.printStackTrace();
}
} catch (IOException e1) {
e1.printStackTrace();
}
checkSum.getChecksum() GetValue() возвращает 1, в то время производства и потребления почтовый массив байтов
следующие значения переменной ZipEntry во время отладки:
producer
zipEntry ZipEntry (id=44)
comment null
crc 2147247736
csize 86794
extra null
flag 2056
method 8
name "test.dat" (id=49)
size 92931
time 1214084891
consumer
zipEntry ZipEntry (id=34)
comment null
crc 2147247736
csize 86794
extra null
flag 0
method 8
name "test.dat" (id=39)
size 92931
time 1214084891
я даже проверил другой способ , вместо обработки protobytes в памяти, я написал zip-файл на диск, вытащил его вручную через winzip и затем десериализовал извлеченный бинарный прото-файл, он сработал !!!
Я делаю молнию/разархивировать неправильный путь, дайте мне знать
Может мало/большое преобразование конца (или его отсутствие) может вызвать проблемы? – Emil
@ На вашей избранной библиотеке протобуфа должна быть внутренняя внутренняя граница; спецификация protobuf делает эти данные ясными, и это удивит меня, если какая-либо установленная библиотека получила это неправильно. Ваш код zip/unzip * не должен заботиться о endianness * - ему просто нужно получить одинаковые байты в том же порядке как в начале, так и в конце процесса. –
Я полностью согласен. Как вы сказали, я думаю, что неплохо проигнорировать шаг protobuf и убедиться, что шаги zip/unzip работают по назначению. Просто говоря, что endianess может вызвать проблемы при передаче данных – Emil