2015-12-10 3 views
0

Здесь я столкнулся с проблемой, что я получаю сообщение от источника Kafka и пишу перехватчик для извлечения двух полей (dataSoure и businessType) из сообщения kafka (формат json). Здесь я использую gson.fromJson(). Но проблема в том, что я получил ошибку ниже.Событие flume было усечено

Здесь я хочу знать, урезает ли Flume событие Flume, когда он превышает лимит? Если да, то как настроить его на большее значение. Поскольку мое сообщение kafka всегда очень длинное, около 60K байт.

В ожидании ответа. Заранее спасибо!

2015-12-09 11: 48: 05665 (PollableSourceRunner-KafkaSource применить) [ОШИБКА - org.apache.flume.source.kafka.KafkaSource.process (KafkaSource.java:153)] KafkaSource EXCEPTION, {} com.google.gson.JsonSyntaxException: com.google.gson.stream.MalformedJsonException: строка без прерывания в строка 1 столбец 4096 на com.google.gson.Gson.fromJson (Gson.java:809) на com.google.gson.Gson.fromJson (Gson.java:761) на com.google.gson.Gson.fromJson (Gson.java:710) на com.xxx.flume.interceptor.JsonLogTypeInterceptor.intercept (JsonLogTypeIntercept or.java:43) at com.xxx.flume.interceptor.JsonLogTypeInterceptor.intercept (JsonLogTypeInterceptor.java:61) at org.apache.flume.interceptor.InterceptorChain.intercept (InterceptorChain.java:62) at org. apache.flume.channel.ChannelProcessor.processEventBatch (ChannelProcessor.java:146) в org.apache.flume.source.kafka.KafkaSource.process (KafkaSource.java:130)

ответ

0

Наконец, я нашел корень вызывают отладку исходного кода. Это becaues Я попытался преобразовать event.getBody() в карту с использованием Gson, что неверно, поскольку event.getBody() является байтом [], а не строкой, которая не может быть преобразована. Правильный код должен быть следующим:

String body = new String(event.getBody(), "UTF-8"); 
Map<String, Object> map = gson.fromJson(body, new TypeToken<Map<String, Object>>() {}.getType()); 
Смежные вопросы