0

I have the following Logstash configuration:

input {
    kafka {
        bootstrap_servers => "svc-kafka:9093"
        topics => ["ELK.LOG_EVENT.PROC", "ELK.API_ANALYTICS.PROC"]
        codec => "json"
        decorate_events => true
    }
}
output {
    if [kafka][topic] == "ELK.LOG_EVENT.PROC" {
        elasticsearch {
            hosts => ["svc-es:9200"]
            index => "elklogevent-%{+YYYY.MM.dd}"
            document_id => "%{id}"
        }
    } else {
        elasticsearch {
            hosts => ["svc-es:9200"]
            index => "elkapianalytics-%{+YYYY.MM.dd}"
            document_id => "%{id}"
        }
    }
}

But I am getting the following error:

[2018-10-11T13:16:30,035][ERROR][logstash.agent           ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of #, input, filter, output at line 24, column 1 (byte 514) after ", :backtrace=>["/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:42:in `compile_imperative'", "/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:50:in `compile_graph'", "/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:12:in `block in compile_sources'", "org/jruby/RubyArray.java:2486:in `map'", "/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:11:in `compile_sources'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:51:in `initialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:171:in `initialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:40:in `execute'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:335:in `block in converge_state'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:141:in `with_pipelines'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:332:in `block in converge_state'", "org/jruby/RubyArray.java:1734:in `each'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:319:in `converge_state'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:166:in `block in converge_state_and_update'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:141:in `with_pipelines'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:164:in `converge_state_and_update'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:90:in `execute'", "/usr/share/logstash/logstash-core/lib/logstash/runner.rb:343:in `block in execute'", "/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/stud-0.0.23/lib/stud/task.rb:24:in `block in initialize'"]}

Even though it clearly says it is unhappy with line 24, I don't know what is the best way to diagnose what the issue is? I am not too familiar with the Ruby language and/or Logstash in general.

This snippet is based on this SO post: logstash 5.0.1: setup elasticsearch multiple indexes ouput for multiple kafka input topics

--- EDIT: Sample Debug output after including stdout { codec => rubydebug } inside output {} block ---

{
           "message" => "Started Application in 15.296 seconds (JVM running for 16.37)",
          "@version" => "1",
        "loggerFqcn" => "org.apache.commons.logging.LogFactory$Log4jLog",
    "threadPriority" => 5,
         "timestamp" => "2018-10-11T14:39:35.984+0000",
             "level" => "INFO",
          "threadId" => 1,
          "hostname" => "deploy-obfuscated-service-59ffb8957d-rbgs6",
        "endOfBatch" => false,
        "loggerName" => "com.abc.obfuscated.Application",
           "service" => "obfuscated-service",
            "thread" => "main",
        "timeMillis" => 1539268775984,
        "@timestamp" => 2018-10-11T14:39:35.987Z
}
6
  • This is not your whole configuration, right? Did you cut out some parts? Commented Oct 11, 2018 at 13:34
  • @Val That is the whole conf. Just an input and an output. Am I missing something? Commented Oct 11, 2018 at 13:44
  • There's no line 24 and when testing the config it says "Configuration OK". Do you have other configuration files in your config directory? Commented Oct 11, 2018 at 14:11
  • You are right, I had added stdout { codec => rubydebug } at the end of the conf file (line 24) instead of inside the output {} block to try and debug. I've corrected now and the error has gone away but it still won't index for both topics. Whichever Topic the data comes from, it always falls into the else block... Commented Oct 11, 2018 at 14:30
  • Can you update your question with the output you get from stdout? Commented Oct 11, 2018 at 14:35

1 Answer 1

1

When using decorate_events => true the [kafka][topic] field is actually added to the @metadata field, so you simply need to change your configuration to:

if [@metadata][kafka][topic] == "ELK.LOG_EVENT.PROC" {
Sign up to request clarification or add additional context in comments.

2 Comments

Is there a reason why the original post which I used to base my config on did not include the [@metadata] part?
Probably that in an earlier version of the kafka plugin the field was added directly at the root of the event. Now I've fixed my earlier post.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.