7

Let's assume that the Oracle Schema has following tables and columns:

    Country
        country_id; (Primary Key)
        country_name;

    Department
        department_id; (Primary Key)
        department_name;
        country_id; (Foreign key to Country:country_id)

    Employee
        employee_id; (Primary Key)
        employee_name;
        department_id; (Foreign key to Department:department_id)

And I have my Elasticsearch document where the root element is a Country and it contains all Departments in that Country which in turn contain all Employees in respective Departments.

So the document structure looks like this:

    {
      "mappings": {
        "country": {
          "properties": {
            "country_id": { "type": "string"},
            "country_name": { "type": "string"},        
            "department": {
              "type": "nested",
              "properties": {
                "department_id": { "type": "string"},
                "department_name": { "type": "string"},
                "employee": {
                  "type": "nested",
                  "properties": {
                    "employee_id": { "type": "string"},
                    "employee_name": { "type": "string"}
                  }
                }
              }
            }
          }
        }
      }
    }           

I want to be able to have separate input jdbc queries running on each table and they should create/update/delete data in the elasticsearch document whenever the data in the base table are added/updated/deleted.

This is an example problem and actual tables and data structure are more complex. So I am not looking for solution limited to this.

Is there a way to achieve this?

Thanks.

1
  • I'm guessing you may have already solved this, however, could you not utilise a Oracle View to combine the required data into the document structure format (Country, Department, Employee) and have that as a single JDBC query, that way you would be able to create the elasticsearch document id as the lowest unique level (employee_id in this case) and manage changes there? Commented May 5, 2017 at 14:57

1 Answer 1

1

For level one, its straight forward using aggregate filter. You need to have a common id between them to reference.

filter {    

  aggregate {
    task_id => "%{id}"

    code => "     
      map['id'] = event.get('id')
      map['department'] ||= []
      map['department'] << event.to_hash.each do |key,value| { key => value } end    
    "
    push_previous_map_as_event => true
    timeout => 150000
    timeout_tags => ['aggregated']    
  } 

   if "aggregated" not in [tags] {
    drop {}
  }
}

Important : The output action should be update

    output {
        elasticsearch {
            action => "update"
             ...
           }
        }

One way to solve level 2 is to query the already indexed document and update it with the nested record. Again using aggregate filter; there should be a common id for the document so you can lookup and insert into the correct document.

filter {    
    #get the document from elastic based on id and store it in 'emp'
    elasticsearch {
            hosts => ["${ELASTICSEARCH_HOST}/${INDEX_NAME}/${INDEX_TYPE}"]
            query => "id:%{id}" 
            fields => { "employee" => "emp" }
         }



  aggregate {
    task_id => "%{id}"  
    code => "       
                map['id'] = event.get('id')
                map['employee'] = []
                employeeArr = []
                temp_emp = {}   

                event.to_hash.each do |key,value|                       
                    temp_emp[key] = value
                end     

                #push the objects into an array
                employeeArr.push(temp_emp)

                empArr = event.get('emp')                   

                for emp in empArr
                    emp['employee'] = employeeArr                       
                    map['employee'].push(emp)
                end
    "
    push_previous_map_as_event => true
    timeout => 150000
    timeout_tags => ['aggregated']

  } 

   if "aggregated" not in [tags] {
    drop {}
  } 

}

output {

elasticsearch {
        action => "update"    #important
         ...
        }
 }  

Also, in order to debug the ruby code, use the below in the output

output{
    stdout { codec => dots }
}
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.