2011-07-02 13 views
3

Я использую Protobuf Теперь в течение нескольких недель, но я до сих пор продолжаю получать исключения при разборе Protobuf сообщений в Java.Исключение при чтении Protobuf сообщений в Java

Я использую C++ создавать свои Protobuf сообщение и отправлять их с наддувом гнезд к сокету сервера, где Java-клиент IST прослушивания. C++ код для передачи сообщения заключается в следующем:

boost::asio::streambuf b; 
std::ostream os(&b); 

ZeroCopyOutputStream *raw_output = new OstreamOutputStream(&os); 
CodedOutputStream *coded_output = new CodedOutputStream(raw_output); 

coded_output->WriteVarint32(agentMessage.ByteSize()); 
agentMessage.SerializeToCodedStream(coded_output); 

delete coded_output; 
delete raw_output; 

boost::system::error_code ignored_error; 

boost::asio::async_write(socket, b.data(), boost::bind(
     &MessageService::handle_write, this, 
     boost::asio::placeholders::error)); 

Как вы можете видеть, что я пишу с WriteVarint32 длину сообщения, таким образом, на стороне Java должен знать, используя parseDelimitedFrom насколько это следует читать:

AgentMessage agentMessage = AgentMessageProtos.AgentMessage  
           .parseDelimitedFrom(socket.getInputStream()); 

Но это не поможет, я получаю такого рода исключений:

Protocol message contained an invalid tag (zero). 
Message missing required fields: ... 
Protocol message tag had invalid wire type. 
Protocol message end-group tag did not match expected tag. 
While parsing a protocol message, the input ended unexpectedly in the middle of a field. This could mean either than the input has been truncated or that an embedded message misreported its own length. 

это важно знать, что эти исключения не бросаются на каждое сообщение. Это всего лишь часть сообщений, которые я получаю, большинство из них очень хорошо работает - все равно я хотел бы исправить это, так как я не хочу пропускать сообщения.

Я был бы очень благодарен, если бы кто-то помог мне или потратил его идеи.


Еще один интересный факт - количество сообщений, которые я получаю. Обычно для моей программы обычно составляет 1.000 за 2 секунды. Через 20 секунд около 100.000 и так далее. Я уменьшил сообщения, отправленные искусственно, и когда передано только 6-8 сообщений, ошибок вообще нет. Так может ли это быть проблемой буферизации на стороне сокета клиента Java?

Включите, скажем, 60 000 сообщений, из которых 5 из них повреждены в среднем.

+0

Может быть, глупый вопрос, но есть ли способ, которым вы оставили отступы/негабаритные буферы в данных, а не обрезание излишков? –

+0

(эта ошибка, безусловно, может быть легко вызвана запасными нулями) –

+0

@ Marc-Gravell: Что было бы негабаритных буферов? На самом деле, я не понимаю, что вы думаете, может вызвать это. Может быть, вы могли бы указать, где я должен искать это? Btw. Я добавил несколько других исключений, которые я получаю. –

ответ

1

Я не знаком с Java API, но мне интересно, как Java имеет дело с значением uint32, обозначающим длину сообщения, потому что Java только подписала 32-битные целые числа. Быстрый взгляд на ссылку Java API показал, что 32-битное значение без знака хранится в 32-битной переменной. Итак, как обрабатывается случай, когда 32-битное значение без знака обозначает длину сообщения? Кроме того, похоже, что поддержка целых чисел varint со знаком в реализации Java. Они называются ZigZag32/64. AFAIK, версия C++ не знает о таких кодировках. Так может быть, причина для вашей проблемы может быть связана с этими вещами?

+0

Возможно, но я предполагаю, что это произойдет каждый раз, и поскольку эти исключения срабатывают только иногда, я не уверен. –

0

[Я на самом деле не эксперт TCP, это может быть далеко]

Проблема, чтение [Java] TCP сокета (байт [] буфера) будет возвращать после прочтения до конца Кадр TCP. Если это будет среднее сообщение (Я имею в виду, сообщение protobuf), синтаксический анализатор задохнется и бросит InvalidProtocolBufferException.

Любой Protobuf синтаксический вызов использует CodedInputStream внутренне (src here), который, в случае, если источником является InputStream, опирается на read() - и, следовательно, подлежит проблеме TCP сокетов.

Итак, когда вы загружаете большие объемы данных через сокет, некоторые сообщения должны быть разделены на два кадра - и это то, где они повреждаются.

Я предполагаю, что когда вы снижаете скорость передачи сообщений (как вы сказали, до 6-8 сообщений в секунду), каждый кадр отправляется перед тем, как следующий элемент данных будет помещен в поток, поэтому каждое сообщение всегда получает собственный кадр TCP, т. е. никто не получает разделение и не получает ошибок. (Или, может быть, это просто, что ошибки редки и с низкой скоростью вам просто нужно больше времени, чтобы их увидеть)

Что касается решения, лучше всего было бы самостоятельно обработать буфер, то есть прочитать byte[] (возможно, используя readFully() вместо read(), потому что первый будет блокироваться до тех пор, пока не будет достаточно данных для заполнения буфера [или EOF встречается], поэтому он устойчив к работе с концом середины сообщения), убедитесь, что у него достаточно данных для синтаксического анализа в целом сообщении, а затем передать буфер в синтаксический анализатор.

Кроме того, некоторые хорошие чтения по этому вопросу в this Google Groups topic - вот где я получил часть readFully().

Смежные вопросы