У меня есть дамп базы данных IMDB в виде CSV. CSV-выглядеть следующим образом:Повторяющаяся запись в ElasticSearch, несмотря на аггрегацию в Logstash
name, movie, role
"'El Burro' Van Rankin, Jorge","Serafín (1999)",PLAYED_IN
"'El Burro' Van Rankin, Jorge","Serafín (1999)",PLAYED_IN
"'El Burro' Van Rankin, Jorge","Serafín (1999)",PLAYED_IN
.........
"A.S., Alwi","Rumah masa depan (1984)",PLAYED_IN
"A.S., Giri","Sumangali (1940)",PLAYED_IN
"A.S., Luis","Bob the Drag Queen: Bloodbath (2016)",PLAYED_IN
"A.S., Pragathi","Suli (2016)",PLAYED_IN
"A.S.F. Dancers, The","D' Lucky Ones! (2006)",PLAYED_IN
.........
Моя цель состоит в том, чтобы поместить данные в упругом поиска, но я не хочу иметь дубликат актеров, поэтому я хочу, чтобы агрегировать кино они играют в так что набор данных выглядят так:
{
"_index": "imdb13",
"_type": "logs",
"_id": "AVmw9JHCrsOFTsZwAmBm",
"_score": 13.028783,
"_source": {
"@timestamp": "2017-01-18T09:42:15.149Z",
"movie": [
"Naomi and Ely's No Kiss List (2015)",
"Staten Island Summer (2015/II)",
"What Happened Last Night (2016)",
...
],
"@version": "1",
"name": "Abernethy, Kevin",
}
}
Поэтому я использую Logstash для ввода данных в ElasticSearch. Я использую агрегированный плагин и мой конфигурационный файл выглядит следующим образом:
input {
file {
path => "/home/maeln/imdb-data/roles.csv"
start_position => "beginning"
}
}
filter {
csv {
columns => [ "name", "movie" ]
remove_field => ["role", "message", "host", "column3", "path"]
separator => ","
}
aggregate {
task_id => "%{name}"
code => "
map['movie'] ||= []
event.to_hash.each do |key,value|
map[key] = value unless map.has_key?(key)
map[key] << value if map[key].is_a?(Array)
end
"
push_previous_map_as_event => true
timeout => 30
timeout_tags => ['aggregated']
}
if "aggregated" not in [tags] {
drop {}
}
}
output {
elasticsearch {
hosts => "localhost:9200"
index => "imdb13"
}
}
Но тогда, когда я делаю простой поиск по индексу, все актеры дублируется только один фильм в поле «кино», как это:
{
"took": 4,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"failed": 0
},
"hits": {
"total": 149,
"max_score": 13.028783,
"hits": [
{
"_index": "imdb13",
"_type": "logs",
"_id": "AVmw9JHCrsOFTsZwAmBm",
"_score": 13.028783,
"_source": {
"@timestamp": "2017-01-18T09:42:15.149Z",
"movie": [
"Naomi and Ely's No Kiss List (2015)"
],
"@version": "1",
"name": "Abernethy, Kevin",
"tags": [
"aggregated"
]
}
},
{
"_index": "imdb13",
"_type": "logs",
"_id": "AVmw9JHCrsOFTsZwAmBq",
"_score": 12.998644,
"_source": {
"@timestamp": "2017-01-18T09:42:15.149Z",
"movie": [
"Staten Island Summer (2015/II)"
],
"@version": "1",
"name": "Abernethy, Kevin",
"tags": [
"aggregated"
]
}
},
{
"_index": "imdb13",
"_type": "logs",
"_id": "AVmw9JHCrsOFTsZwAmBu",
"_score": 12.998644,
"_source": {
"@timestamp": "2017-01-18T09:42:15.150Z",
"movie": [
"What Happened Last Night (2016)"
],
"@version": "1",
"name": "Abernethy, Kevin",
"tags": [
"aggregated"
]
}
},
.....
Есть ли способ исправить это?
Журнал из логсташа с опцией --debug (только частично, весь журнал около ~ 1Gio): paste (я положил его на pastebin из-за предела 30000 символов в stackoverflow).
Последние строки журнала:
[2017-01-18T11:34:09,977][DEBUG][logstash.filters.csv ] filters/LogStash::Filters::CSV: removing field {:field=>"path"}
[2017-01-18T11:34:09,977][DEBUG][logstash.filters.csv ] filters/LogStash::Filters::CSV: removing field {:field=>"role"}
[2017-01-18T11:34:09,977][DEBUG][logstash.filters.csv ] Event after csv filter {:event=>2017-01-18T10:34:09.900Z %{host} %{message}}
[2017-01-18T11:34:09,977][DEBUG][logstash.filters.csv ] filters/LogStash::Filters::CSV: removing field {:field=>"message"}
[2017-01-18T11:34:09,977][DEBUG][logstash.filters.csv ] filters/LogStash::Filters::CSV: removing field {:field=>"path"}
[2017-01-18T11:34:09,977][DEBUG][logstash.filters.csv ] filters/LogStash::Filters::CSV: removing field {:field=>"host"}
[2017-01-18T11:34:09,977][DEBUG][logstash.pipeline ] output received {"event"=>{"@timestamp"=>2017-01-18T10:34:09.897Z, "movie"=>["Tayong dalawa (2009)"], "@version"=>"1", "name"=>"Anselmuccio, Alex", "tags"=>["aggregated"]}}
[2017-01-18T11:34:09,977][DEBUG][logstash.filters.csv ] Event after csv filter {:event=>2017-01-18T10:34:09.915Z %{host} %{message}}
[2017-01-18T11:34:09,977][DEBUG][logstash.filters.csv ] filters/LogStash::Filters::CSV: removing field {:field=>"column3"}
[2017-01-18T11:34:09,977][DEBUG][logstash.filters.aggregate] Aggregate create_timeout_event call with task_id 'Anson, Christopher'
[2017-01-18T11:34:09,977][DEBUG][logstash.filters.csv ] filters/LogStash::Filters::CSV: removing field {:field=>"path"}
[2017-01-18T11:34:09,977][DEBUG][logstash.util.decorators ] filters/LogStash::Filters::Aggregate: adding tag {"tag"=>"aggregated"}
[2017-01-18T11:34:09,977][DEBUG][logstash.pipeline ] output received {"event"=>{"@timestamp"=>2017-01-18T10:34:09.917Z, "movie"=>["Tabi tabi po! (2001)"], "@version"=>"1", "name"=>"Anson, Alvin", "tags"=>["aggregated"]}}
[2017-01-18T11:34:09,978][DEBUG][logstash.filters.csv ] Event after csv filter {:event=>2017-01-18T10:34:09.921Z %{host} %{message}}
[2017-01-18T11:34:09,978][DEBUG][logstash.filters.aggregate] Aggregate successful filter code execution {:code=>"\n\t\t\t\tmap['movie'] ||= []\n\t\t\t\t\tevent.to_hash.each do |key,value|\n\t\t\t\t\tmap[key] = value unless map.has_key?(key)\n\t\t\t\t\tmap[key] << value if map[key].is_a?(Array)\n\t\t\t\tend\n\t\t\t\t"}
[2017-01-18T11:34:09,978][DEBUG][logstash.pipeline ] output received {"event"=>{"@timestamp"=>2017-01-18T10:34:09.911Z, "movie"=>["21 Jump Street (1987)"], "@version"=>"1", "name"=>"Ansley, Zachary", "tags"=>["aggregated"]}}
[2017-01-18T11:34:09,978][DEBUG][logstash.filters.aggregate] Aggregate create_timeout_event call with task_id 'Anseth, Elias Moussaoui'
[2017-01-18T11:34:09,978][DEBUG][logstash.pipeline ] output received {"event"=>{"@timestamp"=>2017-01-18T10:34:09.897Z, "movie"=>["Tayong dalawa (2009)"], "@version"=>"1", "name"=>"Anselmuccio, Alex", "tags"=>["aggregated"]}}
[2017-01-18T11:34:09,978][DEBUG][logstash.util.decorators ] filters/LogStash::Filters::Aggregate: adding tag {"tag"=>"aggregated"}
[2017-01-18T11:34:09,978][DEBUG][logstash.pipeline ] output received {"event"=>{"@timestamp"=>2017-01-18T10:34:09.917Z, "movie"=>["The Death Match: Fighting Fist of Samurai Joe (2013)"], "@version"=>"1", "name"=>"Anson, Alvin", "tags"=>["aggregated"]}}
[2017-01-18T11:34:09,978][DEBUG][logstash.filters.aggregate] Aggregate successful filter code execution {:code=>"\n\t\t\t\tmap['movie'] ||= []\n\t\t\t\t\tevent.to_hash.each do |key,value|\n\t\t\t\t\tmap[key] = value unless map.has_key?(key)\n\t\t\t\t\tmap[key] << value if map[key].is_a?(Array)\n\t\t\t\tend\n\t\t\t\t"}
[2017-01-18T11:34:09,978][DEBUG][logstash.pipeline ] output received {"event"=>{"@timestamp"=>2017-01-18T10:34:09.917Z, "movie"=>["The Diplomat Hotel (2013)"], "@version"=>"1", "name"=>"Anson, Alvin", "tags"=>["aggregated"]}}
[2017-01-18T11:34:09,978][DEBUG][logstash.filters.aggregate] Aggregate create_timeout_event call with task_id 'Anson, Alvin'
[2017-01-18T11:34:09,978][DEBUG][logstash.pipeline ] output received {"event"=>{"@timestamp"=>2017-01-18T10:34:09.897Z, "movie"=>["Tayong dalawa (2009)"], "@version"=>"1", "name"=>"Anselmuccio, Alex", "tags"=>["aggregated"]}}
[2017-01-18T11:34:09,978][DEBUG][logstash.pipeline ] filter received {"event"=>{"path"=>"/home/maeln/Projets/oracle-of-bacon/imdb-data/roles.csv", "@timestamp"=>2017-01-18T10:34:09.900Z, "@version"=>"1", "host"=>"maeln-GE70-2PE", "message"=>"\"Ansfelt, Jacob\",\"Manden med de gyldne ører (2009)\",PLAYED_IN"}}
[2017-01-18T11:34:09,978][DEBUG][logstash.util.decorators ] filters/LogStash::Filters::Aggregate: adding tag {"tag"=>"aggregated"}
[2017-01-18T11:34:09,978][DEBUG][logstash.filters.aggregate] Aggregate successful filter code execution {:code=>"\n\t\t\t\tmap['movie'] ||= []\n\t\t\t\t\tevent.to_hash.each do |key,value|\n\t\t\t\t\tmap[key] = value unless map.has_key?(key)\n\t\t\t\t\tmap[key] << value if map[key].is_a?(Array)\n\t\t\t\tend\n\t\t\t\t"}
Pastebin с только линией, содержащей logstash.filters.aggregate: link
Можете ли вы добавить флаг '--debug' в командной строке и обновить свой вопрос с помощью полученного вами результата? – Val
@Val Done, но журнал огромен, поэтому я ставлю только частичный. – Maeln
Можете ли вы использовать grep только строки с 'logstash.filters.aggregate'? – Val