2017-01-18 2 views
0

У меня есть дамп базы данных IMDB в виде CSV. CSV-выглядеть следующим образом:Повторяющаяся запись в ElasticSearch, несмотря на аггрегацию в Logstash

name, movie, role 
"'El Burro' Van Rankin, Jorge","Serafín (1999)",PLAYED_IN 
"'El Burro' Van Rankin, Jorge","Serafín (1999)",PLAYED_IN 
"'El Burro' Van Rankin, Jorge","Serafín (1999)",PLAYED_IN 
......... 
"A.S., Alwi","Rumah masa depan (1984)",PLAYED_IN 
"A.S., Giri","Sumangali (1940)",PLAYED_IN 
"A.S., Luis","Bob the Drag Queen: Bloodbath (2016)",PLAYED_IN 
"A.S., Pragathi","Suli (2016)",PLAYED_IN 
"A.S.F. Dancers, The","D' Lucky Ones! (2006)",PLAYED_IN 
......... 

Моя цель состоит в том, чтобы поместить данные в упругом поиска, но я не хочу иметь дубликат актеров, поэтому я хочу, чтобы агрегировать кино они играют в так что набор данных выглядят так:

{ 
    "_index": "imdb13", 
    "_type": "logs", 
    "_id": "AVmw9JHCrsOFTsZwAmBm", 
    "_score": 13.028783, 
    "_source": { 
     "@timestamp": "2017-01-18T09:42:15.149Z", 
     "movie": [ 
     "Naomi and Ely's No Kiss List (2015)", 
     "Staten Island Summer (2015/II)", 
     "What Happened Last Night (2016)", 
     ... 
     ], 
     "@version": "1", 
     "name": "Abernethy, Kevin", 
    } 
    } 

Поэтому я использую Logstash для ввода данных в ElasticSearch. Я использую агрегированный плагин и мой конфигурационный файл выглядит следующим образом:

input { 
     file { 
      path => "/home/maeln/imdb-data/roles.csv" 
      start_position => "beginning" 
     } 
} 

filter { 
     csv { 
      columns => [ "name", "movie" ] 
      remove_field => ["role", "message", "host", "column3", "path"] 
      separator => "," 
     } 

     aggregate { 
      task_id => "%{name}" 
      code => " 
       map['movie'] ||= [] 
        event.to_hash.each do |key,value| 
        map[key] = value unless map.has_key?(key) 
        map[key] << value if map[key].is_a?(Array) 
       end 
       " 
      push_previous_map_as_event => true 
      timeout => 30 
      timeout_tags => ['aggregated'] 
     } 

     if "aggregated" not in [tags] { 
      drop {} 
     } 
} 

output { 
    elasticsearch { 
     hosts => "localhost:9200" 
     index => "imdb13" 
    } 
} 

Но тогда, когда я делаю простой поиск по индексу, все актеры дублируется только один фильм в поле «кино», как это:

{ 
    "took": 4, 
    "timed_out": false, 
    "_shards": { 
    "total": 5, 
    "successful": 5, 
    "failed": 0 
    }, 
    "hits": { 
    "total": 149, 
    "max_score": 13.028783, 
    "hits": [ 
     { 
     "_index": "imdb13", 
     "_type": "logs", 
     "_id": "AVmw9JHCrsOFTsZwAmBm", 
     "_score": 13.028783, 
     "_source": { 
      "@timestamp": "2017-01-18T09:42:15.149Z", 
      "movie": [ 
      "Naomi and Ely's No Kiss List (2015)" 
      ], 
      "@version": "1", 
      "name": "Abernethy, Kevin", 
      "tags": [ 
      "aggregated" 
      ] 
     } 
     }, 
     { 
     "_index": "imdb13", 
     "_type": "logs", 
     "_id": "AVmw9JHCrsOFTsZwAmBq", 
     "_score": 12.998644, 
     "_source": { 
      "@timestamp": "2017-01-18T09:42:15.149Z", 
      "movie": [ 
      "Staten Island Summer (2015/II)" 
      ], 
      "@version": "1", 
      "name": "Abernethy, Kevin", 
      "tags": [ 
      "aggregated" 
      ] 
     } 
     }, 
     { 
     "_index": "imdb13", 
     "_type": "logs", 
     "_id": "AVmw9JHCrsOFTsZwAmBu", 
     "_score": 12.998644, 
     "_source": { 
      "@timestamp": "2017-01-18T09:42:15.150Z", 
      "movie": [ 
      "What Happened Last Night (2016)" 
      ], 
      "@version": "1", 
      "name": "Abernethy, Kevin", 
      "tags": [ 
      "aggregated" 
      ] 
     } 
     }, 
     ..... 

Есть ли способ исправить это?

Журнал из логсташа с опцией --debug (только частично, весь журнал около ~ 1Gio): paste (я положил его на pastebin из-за предела 30000 символов в stackoverflow).

Последние строки журнала:

[2017-01-18T11:34:09,977][DEBUG][logstash.filters.csv  ] filters/LogStash::Filters::CSV: removing field {:field=>"path"} 
[2017-01-18T11:34:09,977][DEBUG][logstash.filters.csv  ] filters/LogStash::Filters::CSV: removing field {:field=>"role"} 
[2017-01-18T11:34:09,977][DEBUG][logstash.filters.csv  ] Event after csv filter {:event=>2017-01-18T10:34:09.900Z %{host} %{message}} 
[2017-01-18T11:34:09,977][DEBUG][logstash.filters.csv  ] filters/LogStash::Filters::CSV: removing field {:field=>"message"} 
[2017-01-18T11:34:09,977][DEBUG][logstash.filters.csv  ] filters/LogStash::Filters::CSV: removing field {:field=>"path"} 
[2017-01-18T11:34:09,977][DEBUG][logstash.filters.csv  ] filters/LogStash::Filters::CSV: removing field {:field=>"host"} 
[2017-01-18T11:34:09,977][DEBUG][logstash.pipeline  ] output received {"event"=>{"@timestamp"=>2017-01-18T10:34:09.897Z, "movie"=>["Tayong dalawa (2009)"], "@version"=>"1", "name"=>"Anselmuccio, Alex", "tags"=>["aggregated"]}} 
[2017-01-18T11:34:09,977][DEBUG][logstash.filters.csv  ] Event after csv filter {:event=>2017-01-18T10:34:09.915Z %{host} %{message}} 
[2017-01-18T11:34:09,977][DEBUG][logstash.filters.csv  ] filters/LogStash::Filters::CSV: removing field {:field=>"column3"} 
[2017-01-18T11:34:09,977][DEBUG][logstash.filters.aggregate] Aggregate create_timeout_event call with task_id 'Anson, Christopher' 
[2017-01-18T11:34:09,977][DEBUG][logstash.filters.csv  ] filters/LogStash::Filters::CSV: removing field {:field=>"path"} 
[2017-01-18T11:34:09,977][DEBUG][logstash.util.decorators ] filters/LogStash::Filters::Aggregate: adding tag {"tag"=>"aggregated"} 
[2017-01-18T11:34:09,977][DEBUG][logstash.pipeline  ] output received {"event"=>{"@timestamp"=>2017-01-18T10:34:09.917Z, "movie"=>["Tabi tabi po! (2001)"], "@version"=>"1", "name"=>"Anson, Alvin", "tags"=>["aggregated"]}} 
[2017-01-18T11:34:09,978][DEBUG][logstash.filters.csv  ] Event after csv filter {:event=>2017-01-18T10:34:09.921Z %{host} %{message}} 
[2017-01-18T11:34:09,978][DEBUG][logstash.filters.aggregate] Aggregate successful filter code execution {:code=>"\n\t\t\t\tmap['movie'] ||= []\n\t\t\t\t\tevent.to_hash.each do |key,value|\n\t\t\t\t\tmap[key] = value unless map.has_key?(key)\n\t\t\t\t\tmap[key] << value if map[key].is_a?(Array)\n\t\t\t\tend\n\t\t\t\t"} 
[2017-01-18T11:34:09,978][DEBUG][logstash.pipeline  ] output received {"event"=>{"@timestamp"=>2017-01-18T10:34:09.911Z, "movie"=>["21 Jump Street (1987)"], "@version"=>"1", "name"=>"Ansley, Zachary", "tags"=>["aggregated"]}} 
[2017-01-18T11:34:09,978][DEBUG][logstash.filters.aggregate] Aggregate create_timeout_event call with task_id 'Anseth, Elias Moussaoui' 
[2017-01-18T11:34:09,978][DEBUG][logstash.pipeline  ] output received {"event"=>{"@timestamp"=>2017-01-18T10:34:09.897Z, "movie"=>["Tayong dalawa (2009)"], "@version"=>"1", "name"=>"Anselmuccio, Alex", "tags"=>["aggregated"]}} 
[2017-01-18T11:34:09,978][DEBUG][logstash.util.decorators ] filters/LogStash::Filters::Aggregate: adding tag {"tag"=>"aggregated"} 
[2017-01-18T11:34:09,978][DEBUG][logstash.pipeline  ] output received {"event"=>{"@timestamp"=>2017-01-18T10:34:09.917Z, "movie"=>["The Death Match: Fighting Fist of Samurai Joe (2013)"], "@version"=>"1", "name"=>"Anson, Alvin", "tags"=>["aggregated"]}} 
[2017-01-18T11:34:09,978][DEBUG][logstash.filters.aggregate] Aggregate successful filter code execution {:code=>"\n\t\t\t\tmap['movie'] ||= []\n\t\t\t\t\tevent.to_hash.each do |key,value|\n\t\t\t\t\tmap[key] = value unless map.has_key?(key)\n\t\t\t\t\tmap[key] << value if map[key].is_a?(Array)\n\t\t\t\tend\n\t\t\t\t"} 
[2017-01-18T11:34:09,978][DEBUG][logstash.pipeline  ] output received {"event"=>{"@timestamp"=>2017-01-18T10:34:09.917Z, "movie"=>["The Diplomat Hotel (2013)"], "@version"=>"1", "name"=>"Anson, Alvin", "tags"=>["aggregated"]}} 
[2017-01-18T11:34:09,978][DEBUG][logstash.filters.aggregate] Aggregate create_timeout_event call with task_id 'Anson, Alvin' 
[2017-01-18T11:34:09,978][DEBUG][logstash.pipeline  ] output received {"event"=>{"@timestamp"=>2017-01-18T10:34:09.897Z, "movie"=>["Tayong dalawa (2009)"], "@version"=>"1", "name"=>"Anselmuccio, Alex", "tags"=>["aggregated"]}} 
[2017-01-18T11:34:09,978][DEBUG][logstash.pipeline  ] filter received {"event"=>{"path"=>"/home/maeln/Projets/oracle-of-bacon/imdb-data/roles.csv", "@timestamp"=>2017-01-18T10:34:09.900Z, "@version"=>"1", "host"=>"maeln-GE70-2PE", "message"=>"\"Ansfelt, Jacob\",\"Manden med de gyldne ører (2009)\",PLAYED_IN"}} 
[2017-01-18T11:34:09,978][DEBUG][logstash.util.decorators ] filters/LogStash::Filters::Aggregate: adding tag {"tag"=>"aggregated"} 
[2017-01-18T11:34:09,978][DEBUG][logstash.filters.aggregate] Aggregate successful filter code execution {:code=>"\n\t\t\t\tmap['movie'] ||= []\n\t\t\t\t\tevent.to_hash.each do |key,value|\n\t\t\t\t\tmap[key] = value unless map.has_key?(key)\n\t\t\t\t\tmap[key] << value if map[key].is_a?(Array)\n\t\t\t\tend\n\t\t\t\t"} 

Pastebin с только линией, содержащей logstash.filters.aggregate: link

+0

Можете ли вы добавить флаг '--debug' в командной строке и обновить свой вопрос с помощью полученного вами результата? – Val

+0

@Val Done, но журнал огромен, поэтому я ставлю только частичный. – Maeln

+0

Можете ли вы использовать grep только строки с 'logstash.filters.aggregate'? – Val

ответ

1

вопрос вы столкнулись относится к тому, что, как только линия прочитайте, что он передается фильтру + выходной поток.

Если у вас несколько процессоров, некоторые из этих потоков будут обрабатывать ваши строки параллельно, и, следовательно, порядок вывода больше не гарантируется. Что еще более важно, каждый из ваших фильтров aggregate будет локальным для данного потока, поэтому вполне возможно, что несколько строк, относящихся к одному и тому же актеру (даже если по порядку), обрабатываются разными потоками параллельно, и порядок вывода может отличаться.

Как только решение будет состоять в том, чтобы запустить logstash с опцией -w 1, чтобы создать только один рабочий поток, но при этом уменьшите пропускную способность.

Смежные вопросы