1

how can i append an array on elasticsearch with json object using logstash from csv

exemple of csv

a csv containt lines

id,key1,key2
1,toto1,toto2
1,titi1,titi2
2,tata1,tata2

the result should be 2 documents

{
    "id": 1,
    [{
        "key1": "toto1",
        "key2": "toto2"
    }, {
        "key1": "titi1 ",
        "key2": "titi2"
    }]
}
,{
    "id": 2,
    [{
        "key1": "tata1",
        "key2": "tata2"
    }]
}

cordially

1 Answer 1

1

First, create your ES mapping, if necessarry, declaring you inner objects as nested objects.

{
 "mappings": {
    "key_container": {
      "properties": {
        "id": {
          "type": "keyword",
          "index": true
        },
        "keys": {
          "type": "nested",
          "properties": {
            "key1": {
              "type": "keyword",
              "index": true
            },
            "key2": {
              "type": "text",
              "index": true
            }
          }
        }
      }
    }
  }
 }

The keys property will contain the array of nested objects.

Than you can load the csv in two hops with logstash:

  1. Index (Create) the base object containing only the id property
  2. update the base object with the keys property containing the array of nested objects

The first logstash configuration (only the relevant part):

filter {
    csv {
        columns => ["id","key1","key1"]
        separator => ","
        # Remove the keys because the will be loaded in the next hop with update
        remove_field => [ "key1", "key2"]
    }
    # Remove the row containing the column names
    if [id] == "id" {
        drop { }
    }
}
output {
    elasticsearch {
        action => "index"
        document_id => "%{id}"
        hosts => [ "localhost:9200" ]
        index => "key_container"
    }
}

The second steps logstash configuration (you have to enable scripting in elasticsearch):

filter {
    csv {
        columns => ["id","key1","key2"]
        separator => ","
    }
    # Convert the attributes into an object called 'key' that is passed to the script below (via the 'event' object)
    mutate{
        rename => {
            "key1" => "[key][key1]"
            "key2" => "[key][key2]"
        }
    }
}
output {
    elasticsearch {
        action => "update"
        document_id => "%{id}"
        doc_as_upsert => "true"
        hosts => [ "localhost:9200" ]
        index => "key_container"
        script_lang => "groovy"
        # key_container.keys is an array of key objects
        # arrays can be built only with scripts and defined as an array when we put the first element into it
        script => "if (ctx._source.containsKey('keys')) {ctx._source.keys += event.key} else {ctx._source.keys = [event.key]}"
    }
}

Summary, you need this two hop loading because of array creation that requires scripting which is available only with update.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.