0

I'm trying to sync data from SQL Server. I tried using the following link.

https://www.elastic.co/de/blog/how-to-keep-elasticsearch-synchronized-with-a-relational-database-using-logstash

The Problem here is, that the statements inside the Logstash configuration are for MySQL. I tried to convert the statement into SQL Server statements. After I run it, there are no files getting indexed. My statement inside the configuration file of Logstash looks like this:

statement => "SELECT TOP 100 PERCENT *, DATEDIFF(s, '1970-01-01 00:00:00', modification_time) AS unix_ts_in_secs FROM es_table WHERE (DATEDIFF(s, '1970-01-01 00:00:00', modification_time) > :sql_last_value AND modification_time < getutcdate()) ORDER BY modification_time ASC"

And the following output:

[2020-02-25T11:55:50,092][INFO ][logstash.inputs.jdbc     ][main] (0.007739s) SELECT TOP (1) count(*) AS [COUNT] FROM (SELECT TOP 100 PERCENT *, DATEDIFF(s, '1970-01-01 00:00:00', modification_time) AS unix_ts_in_secs FROM es_table WHERE (DATEDIFF(s, '1970-01-01 00:00:00', modification_time) > 0 AND modification_time < getutcdate()) ORDER BY modification_time ASC) AS [T1]
[2020-02-25T11:55:55,202][INFO ][logstash.inputs.jdbc     ][main] (0.001840s) SELECT CAST(SERVERPROPERTY('ProductVersion') AS varchar)
[2020-02-25T11:55:55,208][INFO ][logstash.inputs.jdbc     ][main] (0.001305s) SELECT TOP (1) count(*) AS [COUNT] FROM (SELECT TOP 100 PERCENT *, DATEDIFF(s, '1970-01-01 00:00:00', modification_time) AS unix_ts_in_secs FROM es_table WHERE (DATEDIFF(s, '1970-01-01 00:00:00', modification_time) > 0 AND modification_time < getutcdate()) ORDER BY modification_time ASC) AS [T1]
[2020-02-25T11:56:00,338][INFO ][logstash.inputs.jdbc     ][main] (0.002202s) SELECT CAST(SERVERPROPERTY('ProductVersion') AS varchar)
[2020-02-25T11:56:00,349][INFO ][logstash.inputs.jdbc     ][main] (0.002047s) SELECT TOP (1) count(*) AS [COUNT] FROM (SELECT TOP 100 PERCENT *, DATEDIFF(s, '1970-01-01 00:00:00', modification_time) AS unix_ts_in_secs FROM es_table WHERE (DATEDIFF(s, '1970-01-01 00:00:00', modification_time) > 0 AND modification_time < getutcdate()) ORDER BY modification_time ASC) AS [T1]

So, the statement is running but nothing is getting indexed.

My Logstash configuration looks like this:

input {
  jdbc {
    jdbc_driver_library => "<driver>"
    jdbc_driver_class => "<class>"
    jdbc_connection_string => "<connection>"
    jdbc_user => <user>
    jdbc_password => <pw>
    jdbc_paging_enabled => true
    tracking_column => "unix_ts_in_secs"
    use_column_value => true
    tracking_column_type => "numeric"
    schedule => "*/5 * * * * *"
    statement => "SELECT TOP 100 PERCENT *, DATEDIFF(s, '1970-01-01 00:00:00', modification_time) AS unix_ts_in_secs FROM es_table WHERE (DATEDIFF(s, '1970-01-01 00:00:00', modification_time) > :sql_last_value AND modification_time < getutcdate()) ORDER BY modification_time ASC"
  }
}
filter {
  mutate {
    copy => { "id" => "[@metadata][_id]"}
    remove_field => ["id", "@version", "unix_ts_in_secs"]
  }
}
output {
  stdout { codec =>  "rubydebug"}
  elasticsearch {
      index => "rdbms_sync_idx"
      document_id => "%{[@metadata][_id]}"
  }
}
6
  • What is your "tracking_column". Please post your logstash configuration. Commented Feb 25, 2020 at 11:05
  • @WinnieDaPooh i updated my question with my configuration file. My "tracking_column" is unix_ts_in_secs. Commented Feb 25, 2020 at 11:11
  • 1
    May be you are missing the "last_run_metadata_path" elastic.co/guide/en/logstash/current/… A tracker file is used to capture the "state". Commented Feb 25, 2020 at 11:29
  • Damn, thank you so much! Why is this nowhere in the guide mentioned :O Commented Feb 25, 2020 at 12:30
  • @WinnieDaPooh Feel free to write an answer and I'll accept it. Commented Feb 25, 2020 at 12:31

1 Answer 1

1

May be you are missing the "last_run_metadata_path"

A tracker file is used to capture the "state".

Example from the official documentation is,

input {
  jdbc {
    statement => "SELECT * FROM mgd.seq_sequence WHERE _sequence_key > ? AND _sequence_key < ? + ? ORDER BY _sequence_key ASC"
    prepared_statement_bind_values => [":sql_last_value", ":sql_last_value", 4]
    prepared_statement_name => "foobar"
    use_prepared_statements => true
    use_column_value => true
    tracking_column_type => "numeric"
    tracking_column => "_sequence_key"
    last_run_metadata_path => "/elastic/tmp/testing/confs/test-jdbc-int-sql_last_value.yml"
    # ... other configuration bits
  }
}
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.