2016-11-15 3 views
0

Я использую Кафка как мой вход и поставить его в elasticsearch (выход)(logstash) индекс только конкретные данные в elasticsearch от входа Кафка

input { 
    kafka { 
     topics =>["maxwell"] 
     codec => json 
    } 
} 
filter { 
} 
output { 
    stdout { codec => rubydebug } 
    elasticsearch { 
     index => 'test_kafka' 
     document_type => "%{table}" 
     hosts => 'localhost:9200' 
    } 
} 

Когда это работает, он выводит следующие JSON

{ 
    "database": "my_db", 
    "xid": 88935, 
    "@timestamp": "2016-11-14T12:00:13.763Z", 
    "data": { 
    "contact_country_code": null, 
    "contact_type_id": 1, 
    "created": "2014-10-03 12:24:36", 
    "modified_by": null, 
    "modified": "2014-10-03 12:24:36", 
    "contact_id": 1, 
    "is_default": 0, 
    "created_by": null, 
    "contact_number": "1241222232" 
    }, 
    "old": { 
    "contact_number": "1241222" 
    }, 
    "commit": true, 
    "@version": "1", 
    "type": "update", 
    "table": "contact", 
    "ts": 1479124813 
} 

Мой вопрос, как я могу только извлечь ключ данных с помощью динамического document_type в elasticsearch для достижения этой одной

{ 
    "_index": "test_kafka", 
    "_type": "contact", 
    "_id": "AVhitY804rvpX8qdVt9d", 
    "_score": 1, 
    "_source": { 
    "contact_country_code": null, 
    "contact_type_id": 1, 
    "created": "2014-10-03 12:24:36", 
    "modified_by": null, 
    "modified": "2014-10-03 12:24:36", 
    "contact_id": 1, 
    "is_default": 0, 
    "created_by": null, 
    "contact_number": "1241222232" 
    } 
} 

ответ

1

Вы можете добавить фильтр ruby, чтобы массировать ваше событие, как показано ниже. Сначала он сохраняет поле table внутри полей @metadata, поэтому вы можете ссылаться на него в своем elasticsearch выводах. Затем он удаляет все поля, кроме data. Затем он копирует все поля внутри поля data на корневом уровне и, наконец, удаляет поле data.

input { 
    kafka { 
     topics =>["maxwell"] 
     codec => json 
    } 
} 
filter { 
    mutate { 
    add_field => { "[@metadata][type]" => "%{table}" } 
    } 
    ruby { 
    code => " 
     # Ruby code for Logstash 2.x 
     event.to_hash.delete_if {|k, v| k != 'data'} 
     event.to_hash.update(event['data'].to_hash) 
     event.to_hash.delete_if {|k, v| k == 'data'} 

     # Ruby code for Logstash 5.x 
     event.to_hash.delete_if {|k, v| k != 'data'}    
     event.to_hash.update(event.get('data').to_hash) 
     event.to_hash.delete_if {|k, v| k == 'data'} 
    " 
    }   
} 
output { 
    stdout { codec => rubydebug } 
    elasticsearch { 
     hosts => 'localhost:9200' 
     index => 'test_kafka' 
     document_type => "%{[@metadata][type]}" 
    } 
} 
+0

Параметр 'event' не мутировал, кстати, я использую logstash 5,0 – scireon

+0

Я также попытался заменить событие с этим кодом ' '' события = event.get («данные») '' ' , но не работал должным образом – scireon

+0

Хорошо, это рубиновый код для Logstash 2.x. Позвольте мне исправить это для Logstash 5. – Val

Смежные вопросы