I would like to start a logstash instance with following setting:
input {
kafka {
topic_id => "topic_a"
.......
}
kafka {
topic_id => "topic_b"
.......
}
}
filter {
json {
source => "message"
}
uuid {
target => "@uuid"
}
mutate {
replace => { "message" => "%{message}" } # want to get the full json literal but does not work
add_field => {
"topic" => "%{topic_id}" # it does not work either
}
}
# logic to apply different filter base on topic_id
if [topic_id] =~ 'topic_a' { # this block seems never entered
mutate {
replace => { "topic" => "topic_a" }
}
} else {
.....
}
}
output {
.....
}
The output on my Kibana would should something like below:
topic : %{topic_id}
It suggested that the configuration above could not extract the topic_id. I have no idea on how to configure the filter part. Could anyone give a hint on this? Thanks.
BTW I'm using logstash-2.2.2
Edit: updated config according to logstash document, result still the same