1

I am using Logstash 2.4 to read JSON messages from a Kafka topic and send them to an Elasticsearch Index.

The JSON format is as below --

{
   "schema":
             {
            "type": "struct",
        "fields": [
                    {
                   "type":"string",
                   "optional":false,
                   "field":"reloadID"
                },
                {
                   "type":"string",
                   "optional":false,
                   "field":"externalAccountID"
                },
                {
                   "type":"int64",
                   "optional":false,
                   "name":"org.apache.kafka.connect.data.Timestamp",
                   "version":1,
                   "field":"reloadDate"
                },
                {
                   "type":"int32",
                   "optional":false,
                   "field":"reloadAmount"
                },
                {
                   "type":"string",
                   "optional":true,
                   "field":"reloadChannel"
                }
              ],
        "optional":false,
        "name":"reload"
         },
   "payload":
             {
            "reloadID":"328424295",
        "externalAccountID":"9831200013",
        "reloadDate":1446242463000,
        "reloadAmount":240,
        "reloadChannel":"C1"
         }
}

Without any filter in my config file, the target documents from the ES index look like below --

{
  "_index" : "kafka_reloads",
  "_type" : "logs",
  "_id" : "AVfcyTU4SyCFNFP2z5-l",
  "_score" : 1.0,
  "_source" : {
    "schema" : {
      "type" : "struct",
      "fields" : [ {
        "type" : "string",
        "optional" : false,
        "field" : "reloadID"
      }, {
        "type" : "string",
        "optional" : false,
        "field" : "externalAccountID"
      }, {
        "type" : "int64",
        "optional" : false,
        "name" : "org.apache.kafka.connect.data.Timestamp",
        "version" : 1,
        "field" : "reloadDate"
      }, {
        "type" : "int32",
        "optional" : false,
        "field" : "reloadAmount"
      }, {
        "type" : "string",
        "optional" : true,
        "field" : "reloadChannel"
      } ],
      "optional" : false,
      "name" : "reload"
    },
    "payload" : {
      "reloadID" : "155559213",
      "externalAccountID" : "9831200014",
      "reloadDate" : 1449529746000,
      "reloadAmount" : 140,
      "reloadChannel" : "C1"
    },
    "@version" : "1",
    "@timestamp" : "2016-10-19T11:56:09.973Z",
  }
}

But, I want only the value part of the "payload" field to move to my ES index as the target JSON body. So I tried to use the 'mutate' filter in the config file as below --

input {
   kafka {
            zk_connect => "zksrv-1:2181,zksrv-2:2181,zksrv-4:2181"
            group_id => "logstash"
            topic_id => "reload"
            consumer_threads => 3
   }
}
filter {
  mutate {
     remove_field => [ "schema","@version","@timestamp" ]
  }
}
output {
   elasticsearch {
                    hosts => ["datanode-6:9200","datanode-2:9200"]
                    index => "kafka_reloads"
   }
}

With this filter, the ES documents now look like below --

{
      "_index" : "kafka_reloads",
      "_type" : "logs",
      "_id" : "AVfch0yhSyCFNFP2z59f",
      "_score" : 1.0,
      "_source" : {
        "payload" : {
          "reloadID" : "850846698",
          "externalAccountID" : "9831200013",
          "reloadDate" : 1449356706000,
          "reloadAmount" : 30,
          "reloadChannel" : "C1"
        }
      }
}

But actually It should be like below --

{
      "_index" : "kafka_reloads",
      "_type" : "logs",
      "_id" : "AVfch0yhSyCFNFP2z59f",
      "_score" : 1.0,
      "_source" : {
          "reloadID" : "850846698",
          "externalAccountID" : "9831200013",
          "reloadDate" : 1449356706000,
          "reloadAmount" : 30,
          "reloadChannel" : "C1"
      }
}

Is there a way to do this? Can anyone help me on this?

I also tried the below filter --

filter {
   json {
      source => "payload"
   }
}

But that is giving me errors like --

Error parsing json {:source=>"payload", :raw=>{"reloadID"=>"572584696", "externalAccountID"=>"9831200011", "reloadDate"=>1449093851000, "reloadAmount"=>180, "reloadChannel"=>"C1"}, :exception=>java.lang.ClassCastException: org.jruby.RubyHash cannot be cast to org.jruby.RubyIO, :level=>:warn}

Any help will be much appreciated.

Thanks Gautam Ghosh

2 Answers 2

5

You can achieve what you want using the following ruby filter:

  ruby {
     code => "
        event.to_hash.delete_if {|k, v| k != 'payload'}
        event.to_hash.update(event['payload'].to_hash)
        event.to_hash.delete_if {|k, v| k == 'payload'}
     "
  }

What it does is:

  1. remove all fields but the payload one
  2. copy all payload inner fields at the root level
  3. delete the payload field itself

You'll end up with what you need.

Sign up to request clarification or add additional context in comments.

7 Comments

Excellent!! That worked perfectly.. thanks a lot buddy!!
Great stuff, thanks @Val. This will be of lots of use to anyone using Oracle GoldenGate -> Kafka -> Logstash ->
@RobinMoffatt I'm glad if this is useful to you. Since I've been using this code in several contexts, I'm considering creating a logstash filter out of it. I'll update this thread whenever I do it. I'm just not yet sure how to call it. Any idea?
Or I could simply amend the existing mutate filter with another extract operation maybe. Not sure about the name of that operation though...
@RobinMoffatt if my pull request is accepted it will soon be easier to do the same thing using mutate/copy: discuss.elastic.co/t/new-operation-for-the-mutate-filter/75070
|
0

It's been a while but here there is a valid workaround, hope it would be useful.

json_encode {
  source => "json"
  target => "json_string"
}

json {
  source => "json_string"
}

1 Comment

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.