2016-12-10 3 views
0

У меня есть конфигурация logstash, которая использует следующее в блоке вывода в попытке уменьшить дубликаты.Логсташ doc_as_upsert кросс-индекс в Elasticsearch для устранения дубликатов

output { 
     if [type] == "usage" { 
       elasticsearch { 
         hosts => ["elastic4:9204"] 
         index => "usage-%{+YYYY-MM-dd-HH}" 
         document_id => "%{[@metadata][fingerprint]}" 
         action => "update" 
         doc_as_upsert => true 
       } 

     } 
} 

Отпечаток пальца рассчитывается из хэша SHA1 двух уникальных полей.

Это работает, когда logstash видит один и тот же документ в одном и том же индексе, но поскольку команда, которая генерирует входные данные, не имеет надежной скорости, при которой появляются разные документы, logstash иногда вставляет дубликаты документов в другую дату с отметкой индекс.

Например, команда, запускающая logstash для получения ввода, обычно возвращает последние два часа данных. Однако, поскольку я не могу окончательно сказать, когда документ появится/исчезнет, ​​я настраиваю команду каждые пятнадцать минут.

Это нормально, когда дубликаты происходят в течение одного часа. Однако, когда марка даты или дня истекает, и документ все еще появляется, эластичная/логсташ думает, что это новый документ.

Есть ли способ сделать перекрестный индекс upsert? Все они будут одним и тем же типом документа, они просто применимы к каждому индексу, который соответствует «use-*»

ответ

1

Новый индекс - это совершенно новое пространство ключей, и нет возможности сказать, что ES не индексирует два документа с помощью тот же идентификатор в двух разных индексах.

Однако вы можете предотвратить это, добавив в свой конвейер elasticsearch filter, который будет искать документ по всем индексам, и если он найдет его, он может удалить событие.

Что-то, как это будет делать (обратите внимание, что usages бы псевдоним, охватывающий все usage-* индексов):

filter { 
    elasticsearch { 
     hosts => ["elastic4:9204"] 
     index => "usages" 
     query => "_id:%{[@metadata][fingerprint]}" 
     fields => {"_id" => "other_id"} 
    } 
    # if the document was found, drop this one 
    if [other_id] { 
     drop {} 
    } 
} 
+0

Я заметил, что нагрузка на кластере увеличился совсем немного после сдачи этого на месте. Но пока это работает! – anthozep

+0

Да, потому что теперь для каждого события, которое проходит через Logstash, выполняется запрос против вашего ES, чтобы проверить, существует ли это событие или нет. – Val

+0

В попытке уменьшить пространство поиска я добавил фильтр ruby, чтобы получить вчерашнюю дату. Меня беспокоит только обман, который может появиться, когда имя индекса перейдет на следующий день. Тем не менее, похоже, что в elasticearch filter он не расширяет поля: 'elasticsearch { hosts => [" fmyelastic04.fm.intel.com:9204 "] index => 'monitoring-cama-usage- "% {[@ metadata] [вчера]}" ' query => "_id:% {[@ metadata] [fingerprint]}" fields => {"_id" => "id_found"} } ' – anthozep

Смежные вопросы