First, create your ES mapping, if necessarry, declaring you inner objects as nested objects.
{
"mappings": {
"key_container": {
"properties": {
"id": {
"type": "keyword",
"index": true
},
"keys": {
"type": "nested",
"properties": {
"key1": {
"type": "keyword",
"index": true
},
"key2": {
"type": "text",
"index": true
}
}
}
}
}
}
}
The keys property will contain the array of nested objects.
Than you can load the csv in two hops with logstash:
- Index (Create) the base object containing only the id property
- update the base object with the keys property containing the array of nested objects
The first logstash configuration (only the relevant part):
filter {
csv {
columns => ["id","key1","key1"]
separator => ","
# Remove the keys because the will be loaded in the next hop with update
remove_field => [ "key1", "key2"]
}
# Remove the row containing the column names
if [id] == "id" {
drop { }
}
}
output {
elasticsearch {
action => "index"
document_id => "%{id}"
hosts => [ "localhost:9200" ]
index => "key_container"
}
}
The second steps logstash configuration (you have to enable scripting in elasticsearch):
filter {
csv {
columns => ["id","key1","key2"]
separator => ","
}
# Convert the attributes into an object called 'key' that is passed to the script below (via the 'event' object)
mutate{
rename => {
"key1" => "[key][key1]"
"key2" => "[key][key2]"
}
}
}
output {
elasticsearch {
action => "update"
document_id => "%{id}"
doc_as_upsert => "true"
hosts => [ "localhost:9200" ]
index => "key_container"
script_lang => "groovy"
# key_container.keys is an array of key objects
# arrays can be built only with scripts and defined as an array when we put the first element into it
script => "if (ctx._source.containsKey('keys')) {ctx._source.keys += event.key} else {ctx._source.keys = [event.key]}"
}
}
Summary, you need this two hop loading because of array creation that requires scripting which is available only with update.