2015-09-10 7 views
4

Я использую logstash input jdbc plugin для чтения двух (или более) баз данных и отправки данных в elasticsearch и использования kibana 4 для визуализации этих данных.Вход Logstash jdbc дублирует результаты

Это мой logstash конфигурации:

input { 
    jdbc { 
    type => "A" 
    jdbc_driver_library => "C:\DEV\elasticsearch-1.7.1\plugins\elasticsearch-jdbc-1.7.1.0\lib\jtds-1.3.1.jar" 
    jdbc_driver_class => "Java::net.sourceforge.jtds.jdbc.Driver" 
    jdbc_connection_string => "jdbc:jtds:sqlserver://dev_data_base_server:1433/dbApp1;domain=CORPDOMAIN;useNTLMv2=true" 
    jdbc_user => "user" 
    jdbc_password => "pass" 
    schedule => "5 * * * *" 
    statement => "SELECT id, date, content, status from test_table" 
    } 

jdbc { 
    type => "B" 
    jdbc_driver_library => "C:\DEV\elasticsearch-1.7.1\plugins\elasticsearch-jdbc-1.7.1.0\lib\jtds-1.3.1.jar" 
    jdbc_driver_class => "Java::net.sourceforge.jtds.jdbc.Driver" 
    jdbc_connection_string => "jdbc:jtds:sqlserver://dev_data_base_server:1433/dbApp2;domain=CORPDOMAIN;useNTLMv2=true" 
    jdbc_user => "user" 
    jdbc_password => "pass" 
    schedule => "5 * * * *" 
    statement => "SELECT id, date, content, status from test_table" 
    } 
} 
filter { 

} 
output { 

    if [type] == "A" { 
     elasticsearch { 
      host => "localhost" 
      protocol => http 
      index => "logstash-servera-%{+YYYY.MM.dd}" 
     }  
    } 
    if [type] == "B" { 
     elasticsearch { 
      host => "localhost" 
      protocol => http 
      index => "logstash-serverb-%{+YYYY.MM.dd}" 
     }  
    } 

    stdout { codec => rubydebug } 
} 

Проблема заключается в том, что каждый раз запускать logstash, он начинает сохранять все данные, которые уже в упругом поиска.

После запуска с предложением where = date> '2015-09-10 «Я остановил логсташ и снова запустил (с --debug) с помощью« специального параметра »: sql_last_date. После запуска logstash Он начинает показывать это в журнале:

←[36mExecuting JDBC query {:statement=>"SELECT \n\tSUBSTRING(R.RECEBEDOR, 1, 2) 
AS 'DDD',\nCASE WHEN R.STATUS <> 'RCON' AND R.COD_RESPOSTA in (428,429,230,425, 
430,427,418,422,415,424,214,433,435,207,426) THEN 'REGRA DE NEGÓCIO' \n  W 
HEN R.STATUS = 'RCON' THEN 'SUCESSO'\n\t ELSE 'ERRO'\n END AS 'TIPO_MENSAGEM 
',\nAP.ALIAS as 'CANAL', R.ID_RECARGA, R.VALOR, R.STATUS, R.COD_RESPOSTA, R.DESC 
_RESPOSTA, R.DT_RECARGA as '@timestamp', R.ID_CLIENTE, R.ID_DEPENDENTE, R.ID_APL 
ICACAO, RECEBEDOR, R.ID_OPERADORA, R.TIPO_PRODUTO \n\nFROM RECARGA R (NOLOCK)\nJ 
OIN APLICACAO AP ON R.ID_APLICACAO = AP.ID_APLICACAO \nwhere R.DT_RECARGA > :sql 
_last_start\nORDER BY R.DT_RECARGA ASC", :parameters=>{:sql_last_start=>2015-09- 
10 18:48:00 UTC}, :level=>:debug, :file=>"/DEV/logstash-1.5.4/vendor/bundle/jrub 
y/1.9/gems/logstash-input-jdbc-1.0.0/lib/logstash/plugin_mixins/jdbc.rb", :line= 
>"107", :method=>"execute_statement"}←[0m 

На этот раз я провел с «реальным» утверждение, которое является:

SELECT 
    SUBSTRING(R.RECEBEDOR, 1, 2) AS 'DDD', 
CASE WHEN R.STATUS <> 'RCON' AND R.COD_RESPOSTA in (428,429,230,425,430,427,418,422,415,424,214,433,435,207,426) THEN 'REGRA DE NEGÓCIO' 
     WHEN R.STATUS = 'RCON' THEN 'SUCESSO' 
     ELSE 'ERRO' 
    END AS 'TIPO_MENSAGEM', 
AP.ALIAS as 'CANAL', R.ID_RECARGA, R.VALOR, R.STATUS, R.COD_RESPOSTA, R.DESC_RESPOSTA, R.DT_RECARGA as '@timestamp', R.ID_CLIENTE, R.ID_DEPENDENTE, R.ID_APLICACAO, RECEBEDOR, R.ID_OPERADORA 

FROM RECARGA R (NOLOCK) 
JOIN APLICACAO AP ON R.ID_APLICACAO = AP.ID_APLICACAO 
where R.DT_RECARGA > :sql_last_start 
ORDER BY R.DT_RECARGA ASC 

Каждый знает, как решить эту проблему?

Спасибо!

ответ

3

По умолчанию вход jdbc выполняет сконфигурированный оператор SQL. В вашем случае ваше заявление выбирает все в test_table. Вам нужно указать инструкции SQL только для загрузки данных с последнего входящего ввода jdbc с использованием предопределенного параметра sql_last_start в вашем SQL-запросе.

input { 
    jdbc { 
    type => "A" 
    jdbc_driver_library => "C:\DEV\elasticsearch-1.7.1\plugins\elasticsearch-jdbc-1.7.1.0\lib\jtds-1.3.1.jar" 
    jdbc_driver_class => "Java::net.sourceforge.jtds.jdbc.Driver" 
    jdbc_connection_string => "jdbc:jtds:sqlserver://dev_data_base_server:1433/dbApp1;domain=CORPDOMAIN;useNTLMv2=true" 
    jdbc_user => "user" 
    jdbc_password => "pass" 
    schedule => "5 * * * *" 
    statement => "SELECT id, date, content, status from test_table WHERE date > :sql_last_start" 
    } 

jdbc { 
    type => "B" 
    jdbc_driver_library => "C:\DEV\elasticsearch-1.7.1\plugins\elasticsearch-jdbc-1.7.1.0\lib\jtds-1.3.1.jar" 
    jdbc_driver_class => "Java::net.sourceforge.jtds.jdbc.Driver" 
    jdbc_connection_string => "jdbc:jtds:sqlserver://dev_data_base_server:1433/dbApp2;domain=CORPDOMAIN;useNTLMv2=true" 
    jdbc_user => "user" 
    jdbc_password => "pass" 
    schedule => "5 * * * *" 
    statement => "SELECT id, date, content, status from test_table WHERE date > :sql_last_start" 
    } 
} 

Кроме того, если по какой-либо совпадения загружается та же запись дважды из вашей БД, и вы не хотите Dups быть создан на сервере ES, вы можете также указать, использовать идентификатор записи как идентификатор документа в ваш вывод elasticsearch, таким образом, документ будет обновлен в ES и не будет дублироваться.

output { 

    if [type] == "A" { 
     elasticsearch { 
      host => "localhost" 
      protocol => http 
      index => "logstash-servera-%{+YYYY.MM.dd}" 
      document_id => "%{id}"  <--- same id as in DB 
     }  
    } 
    if [type] == "B" { 
     elasticsearch { 
      host => "localhost" 
      protocol => http 
      index => "logstash-serverb-%{+YYYY.MM.dd}" 
      document_id => "%{id}"  <--- same id as in DB 
     }  
    } 

    stdout { codec => rubydebug } 
} 
+0

Я пробовал использовать это. Впервые я использовал WHERE date> '2015-09-09'. Логсташ получил все данные со вчерашнего дня до сегодняшнего дня 11:30. Я побежал во второй раз (11:50), у Logstash не было никаких данных. И да, иметь больше данных в базе данных. –

+0

Единственное, что появляется в журнале: Запуск Logstash завершен Завершение работы Logstash –

+0

Можете ли вы запустить logstash с '--debug' и предоставить еще несколько выходных данных? – Val

5

sql_last_start теперь sql_last_valueplease check here специальный параметр sql_last_start теперь переименован в sql_last_value для большей ясности, поскольку она не ограничивается только DateTime, но может иметь другой столбец типа, а также. так что теперь решения может быть что-то вроде этого

input { 
jdbc { 
    type => "A" 
    jdbc_driver_library => "C:\DEV\elasticsearch-1.7.1\plugins\elasticsearch- jdbc-1.7.1.0\lib\jtds-1.3.1.jar" 
    jdbc_driver_class => "Java::net.sourceforge.jtds.jdbc.Driver" 
    jdbc_connection_string => "jdbc:jtds:sqlserver://dev_data_base_server:1433/dbApp1;domain=CORPDOMAIN;useNTLMv2=true" 
    jdbc_user => "user" 
    jdbc_password => "pass" 
    schedule => "5 * * * *" 
    use_column_value => true 
    tracking_column => date 
    statement => "SELECT id, date, content, status from test_table WHERE date >:sql_last_value" 
    #clean_run true means it will reset sql_last_value to zero or initial value if datatype is date(default is also false) 
    clean_run =>false 
    } 
jdbc{ 
    #for type B.... 
    } 
} 

я проверил с БДОМ SQL Server,

запустите в первый раз с clean_run => р, чтобы избежать ошибок типа данных в то время как в развитии мы можем иметь другое значение типа данных хранится в sql_last_value переменная

+0

Просьба предоставить необходимую информацию, чтобы ответить на вопрос на сайте. – sebenalern

+1

раньше я был просто комментарием, поскольку у меня не было достаточного рейтинга, чтобы прокомментировать ответ выше. теперь я написал полный ответ, чтобы он помог кому-то другому .. :) – Raghavendra

Смежные вопросы