4

I'm trying to consume a Kafka topic using Logstash, for indexing by Elasticsearch. The Kafka events are JSON documents.

We recently upgraded our Elastic Stack to 5.1.2.

I believe that I was able to consume the topic OK in 5.0, using the same settings, but that was a while ago so perhaps I'm doing something wrong now, but can't see it. This is my config (slightly sanitized):

input {
    kafka {
        bootstrap_servers => "host1:9092,host2:9092,host3:9092"
        client_id => "logstash-elastic-5-c5"
        group_id => "logstash-elastic-5-g5"
        topics => "trp_v1"
        auto_offset_reset => "earliest"
    }
}

filter {

    json {
        source => "message"
    }

    mutate {

        rename => { "@timestamp" => "indexedDatetime" }

        remove_field => [
            "@timestamp",
            "@version",
            "message"
        ]
    }
}

output {
    stdout { codec => rubydebug }

    elasticsearch {
        hosts => ["host10:9200", "host11:9200", "host12:9200", "host13:9200"]
        action => "index"
        index => "trp-i"
        document_type => "event"
    }
}

When I run this, no messages are consumed, no sign of activity appears in the log after "[org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] Setting newly assigned partitions", and in Kafka Manager the consumer appears to immediately appear with "total lag = 0" for the topic.

This version of the Kafka plugin stores consumer offsets in Kafka itself, so each time I try to run Logstash against the same topic, I increment the group_id so in theory, it should start from the earliest offset for the topic.

Any advice?

EDIT: It appears that despite setting auto_offset_reset to "earliest", it isn't working - it's as if it's being set to "latest". I left Logstash running, then had more entries loaded into the Kafka queue, and they were processed by Logstash.

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.