input {
jdbc{
jdbc_validate_connection => true
jdbc_connection_string => "jdbc:mysql://172.17.0.2:3306/_db"
jdbc_user => "root"
jdbc_password => "admin"
jdbc_driver_library => "/home/ilsa/mysql-connector-java-5.1.36-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
clean_run => true
statement => "SELECT
u.id as employee_number, u.email as email, u.username as username,
up.id as post_id, up.text_content as content,
pc.id as comment_id , pc.user_post_id as comment_post_id, pc.comment as comment_text
FROM users u join user_posts up on up.user_id = u.id
LEFT JOIN post_comments pc ON pc.user_post_id = up.id
ORDER BY up.id ASC"
}
}
filter {
aggregate {
task_id => "%{employee_number}"
code => "
map['employee_number'] = event.get('employee_number')
map['email'] = event.get('email')
map['username'] = event.get('username')
map['posts'] ||= []
map['posts'] << {
'post_id' => event.get('post_id'),
'content' => event.get('content'),
'comments' => [] << {
'comment_id' => event.get('comment_id'),
'comment_post_id' => event.get('comment_post_id'),
'comment_text' => event.get('comment_text')
}
}
event.cancel()"
push_previous_map_as_event => true
timeout => 30
}
}
output {
stdout{ codec => rubydebug }
elasticsearch{
action => "index"
index => "_dev"
document_type => "_doc"
document_id => "%{employee_number}"
hosts => "localhost:9200"
}
}