2015-12-28 7 views
0

Я пытаюсь передать документ из массивов JSON от Kafka до elasticsearch. Мои данные в формате JSON выглядит следующим образом:Разбор JSON от Kafka до Elasticsearch

{ 
    "A": "---", 
    "B": "---", 
    "C": "---", 
    "D": "---", 
    "ABC": "---" 
} 

Из-за какой-то причине, Кафка не в состоянии разобрать этот документ, и я получаю сообщение об ошибке, как:

org.codehaus.jackson.JsonParseException: Unexpected close marker '}': expected ']' (for ROOT starting at [Source: [[email protected]; line: 1, column: 0]) 
at [Source: [[email protected]; line: 1, column: 3] 
at org.codehaus.jackson.JsonParser._constructError(JsonParser.java:1433) 
at org.codehaus.jackson.impl.JsonParserMinimalBase._reportError(JsonParserMinimalBase.java:521) 
at org.codehaus.jackson.impl.JsonParserBase._reportMismatchedEndMarker(JsonParserBase.java:487) 
at org.codehaus.jackson.impl.Utf8StreamParser.nextToken(Utf8StreamParser.java:473) 
at org.codehaus.jackson.map.ObjectReader._initForReading(ObjectReader.java:828) 
at org.codehaus.jackson.map.ObjectReader._bindAndClose(ObjectReader.java:752) 
at org.codehaus.jackson.map.ObjectReader.readValue(ObjectReader.java:473) 
at org.elasticsearch.river.kafka.IndexDocumentProducer.addMessagesToBulkProcessor(IndexDocumentProducer.java:71) 
at org.elasticsearch.river.kafka.KafkaWorker.consumeMessagesAndAddToBulkProcessor(KafkaWorker.java:107) 
at org.elasticsearch.river.kafka.KafkaWorker.run(KafkaWorker.java:78) 
at java.lang.Thread.run(Thread.java:745) 

Он отлично работает, если мои данные из формат:

{"A": "---", "B": "---", "C": "---", "D": "---", "ABC": "---"} 

У меня есть несколько элементов, которые я хочу передать как массив в документе. Любая идея, почему я получаю ошибку для первого формата данных JSON? Благодаря

+2

Вы делаете 'объемный импорт'? какую версию 'ES' и' kafka-river' вы используете? – ChintanShah25

+0

Я использую ES 1.6. Я выполнил эту ссылку: https://github.com/mariamhakobyan/elasticsearch-river-kafka – Sweet

+0

ОК, но какую операцию вы выполняете? массовый импорт? – ChintanShah25

ответ

0

Вы получаете сообщение об ошибке, потому что ваш JSON не является правильным для bulk import

С Official Doc, это правильная структура JSON

action_and_meta_data\n 
optional_source\n 
action_and_meta_data\n 
optional_source\n 
.... 
action_and_meta_data\n 
optional_source\n 

Примечание: последняя строка данных должна заканчиваться с символом новой строки \ n.

Именно поэтому второй формат работает, и первый из них бросает ошибку.

Также на боковой ноте rivers было удалено из ES 2.0. Возможно, вам стоит взглянуть на kafka plugin

+0

Спасибо за информацию. Не могли бы вы рассказать мне, как формат должен быть для одного импорта документа вместо массового импорта? – Sweet

+0

Я не использовал kafka-river, но вы можете посмотреть [index api] (https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html). что-то вроде этого 'curl -XPUT 'http: // localhost: 9200/my_index/my_type/1' -d '{" A ":" --- "," B ":" --- "," C ": «---», «D»: «---», «ABC»: «---»} ''будут делать – ChintanShah25

0

Я видел те же ошибки при попытке отправить одно сообщение с несколькими строками, используя API ES BulkRequestBuilder напрямую (между ними нет плагинов). Решила проблему, убедившись, что полное сообщение представлено как одна строка (только один новый символ строки в конце сообщения). Также убедитесь, что ваше сообщение отправлено как одна строка в Kafka.

Вы можете ознакомиться здесь: https://github.com/reachkrishnaraj/kafka-elasticsearch-standalone-consumer/tree/branch2.0 Это проект с открытым исходным кодом, над которым мы работаем.

Я не использовал плагин Kafka, предложенный ChintanShah25 - он также может хорошо служить вашей цели.

Смежные вопросы