1

As we know, the rest apis of Elasticsearch returns json response.But, I need CSV responses from those apis.

I am looking to similar feature to that is available in Solr.Solr provides CSV response writer using which, we can easily get responses in csv form.

How do I achieve this is Elasticsearch ?

Note: I am not looking to just export all the contents of ElasticSearch cluster into csv format. I want to query elasticsearch rest apis and get responses in csv format instead of json.

UPDATE

I have been trying to use logstash using the approach recommended by @Val in the answer.

Below is the contents of logstash-plain.log

    [2017-01-23T18:28:35,762][INFO ][logstash.pipeline        ] Starting pipeline {"id"=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500}
[2017-01-23T18:28:35,783][INFO ][logstash.pipeline        ] Pipeline main started
[2017-01-23T18:28:35,827][ERROR][logstash.pipeline        ] A plugin had an unrecoverable error. Will restart this plugin.
  Plugin: <LogStash::Inputs::Elasticsearch hosts=>["localhost:9200"], index=>"megacorp", query=>"_index:megacorp AND first_name:Jane", id=>"9a67b0421108afd201382b21693e2173243dd144-1", enable_metric=>true, codec=><LogStash::Codecs::JSON id=>"json_60457023-6344-4af7-a2c5-1e89d1fe08aa", enable_metric=>true, charset=>"UTF-8">, size=>1000, scroll=>"1m", docinfo=>false, docinfo_target=>"@metadata", docinfo_fields=>["_index", "_type", "_id"], ssl=>false>
  Error: [400] {"error":{"root_cause":[{"type":"parse_exception","reason":"Failed to derive xcontent"}],"type":"parse_exception","reason":"Failed to derive xcontent"},"status":400}
[2017-01-23T18:28:35,881][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2017-01-23T18:28:36,838][ERROR][logstash.pipeline        ] A plugin had an unrecoverable error. Will restart this plugin.
  Plugin: <LogStash::Inputs::Elasticsearch hosts=>["localhost:9200"], index=>"megacorp", query=>"_index:megacorp AND first_name:Jane", id=>"9a67b0421108afd201382b21693e2173243dd144-1", enable_metric=>true, codec=><LogStash::Codecs::JSON id=>"json_60457023-6344-4af7-a2c5-1e89d1fe08aa", enable_metric=>true, charset=>"UTF-8">, size=>1000, scroll=>"1m", docinfo=>false, docinfo_target=>"@metadata", docinfo_fields=>["_index", "_type", "_id"], ssl=>false>
  Error: [400] {"error":{"root_cause":[{"type":"parse_exception","reason":"Failed to derive xcontent"}],"type":"parse_exception","reason":"Failed to derive xcontent"},"status":400}
[2017-01-23T18:28:37,848][ERROR][logstash.pipeline        ] A plugin had an unrecoverable error. Will restart this plugin.
  Plugin: <LogStash::Inputs::Elasticsearch hosts=>["localhost:9200"], index=>"megacorp", query=>"_index:megacorp AND first_name:Jane", id=>"9a67b0421108afd201382b21693e2173243dd144-1", enable_metric=>true, codec=><LogStash::Codecs::JSON id=>"json_60457023-6344-4af7-a2c5-1e89d1fe08aa", enable_metric=>true, charset=>"UTF-8">, size=>1000, scroll=>"1m", docinfo=>false, docinfo_target=>"@metadata", docinfo_fields=>["_index", "_type", "_id"], ssl=>false>
  Error: [400] {"error":{"root_cause":[{"type":"parse_exception","reason":"Failed to derive xcontent"}],"type":"parse_exception","reason":"Failed to derive xcontent"},"status":400}
[2017-01-23T18:28:38,865][ERROR][logstash.pipeline        ] A plugin had an unrecoverable error. Will restart this plugin.

Below is the contents of elasticsearch.log

    2017-01-23T19:06:38,633][INFO ][o.e.n.Node               ] [] initializing ...
[2017-01-23T19:06:38,751][INFO ][o.e.e.NodeEnvironment    ] [TgbIozs] using [1] data paths, mounts [[/ (/dev/sda8)]], net usable_space [36.9gb], net total_space [139.6gb], spins? [possibly], types [ext4]
[2017-01-23T19:06:38,752][INFO ][o.e.e.NodeEnvironment    ] [TgbIozs] heap size [1.9gb], compressed ordinary object pointers [true]
[2017-01-23T19:06:38,760][INFO ][o.e.n.Node               ] node name [TgbIozs] derived from node ID [TgbIozsCR5WWSm_8iU-Rdw]; set [node.name] to override
[2017-01-23T19:06:38,761][INFO ][o.e.n.Node               ] version[5.1.2], pid[7239], build[c8c4c16/2017-01-11T20:18:39.146Z], OS[Linux/3.16.0-70-generic/amd64], JVM[Oracle Corporation/Java HotSpot(TM) 64-Bit Server VM/1.8.0_77/25.77-b03]
[2017-01-23T19:06:39,764][INFO ][o.e.p.PluginsService     ] [TgbIozs] loaded module [aggs-matrix-stats]
[2017-01-23T19:06:39,765][INFO ][o.e.p.PluginsService     ] [TgbIozs] loaded module [ingest-common]
[2017-01-23T19:06:39,765][INFO ][o.e.p.PluginsService     ] [TgbIozs] loaded module [lang-expression]
[2017-01-23T19:06:39,765][INFO ][o.e.p.PluginsService     ] [TgbIozs] loaded module [lang-groovy]
[2017-01-23T19:06:39,765][INFO ][o.e.p.PluginsService     ] [TgbIozs] loaded module [lang-mustache]
[2017-01-23T19:06:39,766][INFO ][o.e.p.PluginsService     ] [TgbIozs] loaded module [lang-painless]
[2017-01-23T19:06:39,766][INFO ][o.e.p.PluginsService     ] [TgbIozs] loaded module [percolator]
[2017-01-23T19:06:39,766][INFO ][o.e.p.PluginsService     ] [TgbIozs] loaded module [reindex]
[2017-01-23T19:06:39,766][INFO ][o.e.p.PluginsService     ] [TgbIozs] loaded module [transport-netty3]
[2017-01-23T19:06:39,766][INFO ][o.e.p.PluginsService     ] [TgbIozs] loaded module [transport-netty4]
[2017-01-23T19:06:39,767][INFO ][o.e.p.PluginsService     ] [TgbIozs] no plugins loaded
[2017-01-23T19:06:42,342][INFO ][o.e.n.Node               ] initialized
[2017-01-23T19:06:42,342][INFO ][o.e.n.Node               ] [TgbIozs] starting ...
[2017-01-23T19:06:42,595][INFO ][o.e.t.TransportService   ] [TgbIozs] publish_address {127.0.0.1:9300}, bound_addresses {[::1]:9300}, {127.0.0.1:9300}
[2017-01-23T19:06:42,610][WARN ][o.e.b.BootstrapCheck     ] [TgbIozs] max file descriptors [4096] for elasticsearch process is too low, increase to at least [65536]
[2017-01-23T19:06:42,611][WARN ][o.e.b.BootstrapCheck     ] [TgbIozs] max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]
[2017-01-23T19:06:45,816][INFO ][o.e.c.s.ClusterService   ] [TgbIozs] new_master {TgbIozs}{TgbIozsCR5WWSm_8iU-Rdw}{U2MjduBXTcOYx50aXsY-CQ}{127.0.0.1}{127.0.0.1:9300}, reason: zen-disco-elected-as-master ([0] nodes joined)
[2017-01-23T19:06:45,860][INFO ][o.e.h.HttpServer         ] [TgbIozs] publish_address {127.0.0.1:9200}, bound_addresses {[::1]:9200}, {127.0.0.1:9200}
[2017-01-23T19:06:45,861][INFO ][o.e.n.Node               ] [TgbIozs] started
[2017-01-23T19:06:46,211][INFO ][o.e.g.GatewayService     ] [TgbIozs] recovered [1] indices into cluster_state
[2017-01-23T19:06:47,046][INFO ][o.e.c.r.a.AllocationService] [TgbIozs] Cluster health status changed from [RED] to [YELLOW] (reason: [shards started [[megacorp][0]] ...]).
[2017-01-23T19:07:35,357][DEBUG][o.e.c.s.ClusterService   ] [TgbIozs] processing [cluster_update_settings]: took [18ms] done applying updated cluster_state (version: 7, uuid: Wc1Xm4H5SSOcJ6lIM--Stg)
[2017-01-23T19:07:35,357][DEBUG][o.e.c.s.ClusterService   ] [TgbIozs] processing [reroute_after_cluster_update_settings]: execute
[2017-01-23T19:07:35,363][DEBUG][o.e.c.s.ClusterService   ] [TgbIozs] processing [reroute_after_cluster_update_settings]: took [4ms] no change in cluster_state
[2017-01-23T19:07:35,370][DEBUG][i.n.h.c.c.ZlibCodecFactory] -Dio.netty.noJdkZlibDecoder: false
[2017-01-23T19:07:35,372][DEBUG][i.n.h.c.c.ZlibCodecFactory] -Dio.netty.noJdkZlibEncoder: false
[2017-01-23T19:07:35,674][DEBUG][r.suppressed             ] path: /megacorp/_search, params: {size=1000, scroll=1m, index=megacorp}
org.elasticsearch.ElasticsearchParseException: Failed to derive xcontent
    at org.elasticsearch.common.xcontent.XContentFactory.xContent(XContentFactory.java:239) ~[elasticsearch-5.1.2.jar:5.1.2]
    at org.elasticsearch.rest.action.search.RestSearchAction.parseSearchRequest(RestSearchAction.java:103) ~[elasticsearch-5.1.2.jar:5.1.2]
    at org.elasticsearch.rest.action.search.RestSearchAction.prepareRequest(RestSearchAction.java:81) ~[elasticsearch-5.1.2.jar:5.1.2]
    at org.elasticsearch.rest.BaseRestHandler.handleRequest(BaseRestHandler.java:66) ~[elasticsearch-5.1.2.jar:5.1.2]
    at org.elasticsearch.rest.RestController.executeHandler(RestController.java:243) ~[elasticsearch-5.1.2.jar:5.1.2]
    at org.elasticsearch.rest.RestController.dispatchRequest(RestController.java:200) [elasticsearch-5.1.2.jar:5.1.2]
    at org.elasticsearch.http.HttpServer.dispatchRequest(HttpServer.java:113) [elasticsearch-5.1.2.jar:5.1.2]
    at org.elasticsearch.http.netty4.Netty4HttpServerTransport.dispatchRequest(Netty4HttpServerTransport.java:507) [transport-netty4-5.1.2.jar:5.1.2]
    at org.elasticsearch.http.netty4.Netty4HttpRequestHandler.channelRead0(Netty4HttpRequestHandler.java:69) [transport-netty4-5.1.2.jar:5.1.2]
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at org.elasticsearch.http.netty4.pipelining.HttpPipeliningHandler.channelRead(HttpPipeliningHandler.java:66) [transport-netty4-5.1.2.jar:5.1.2]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) [netty-codec-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.handler.codec.MessageToMessageCodec.channelRead(MessageToMessageCodec.java:111) [netty-codec-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) [netty-codec-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) [netty-codec-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293) [netty-codec-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267) [netty-codec-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:651) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:536) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:490) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:450) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873) [netty-common-4.1.6.Final.jar:4.1.6.Final]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_77]

A response generated in the logstash console:

Command Entered: logstash_csv.sh "first_name:Jane" "first_name,last_name"

STARTING logstash_csv script......
Sending Logstash's logs to /home/sagarhp/installations/logstash-5.1.2/logs which is now configured via log4j2.properties
[2017-01-23T19:49:25,103][INFO ][logstash.pipeline        ] Starting pipeline {"id"=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500}
[2017-01-23T19:49:25,131][INFO ][logstash.pipeline        ] Pipeline main started
[2017-01-23T19:49:25,239][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2017-01-23T19:49:25,386][INFO ][logstash.outputs.csv     ] Opening file {:path=>"/home/sagarhp/mybin/test.csv"}
{
    "@timestamp" => 2017-01-23T14:04:25.361Z,
         "about" => "I like to collect rock albums",
      "@version" => "1",
     "last_name" => "Smith",
     "interests" => [
        [0] "music"
    ],
    "first_name" => "Jane",
           "age" => 32
}
[2017-01-23T19:49:28,159][WARN ][logstash.agent           ] stopping pipeline {:id=>"main"}

UPDATE: replaced logstash 5.1.2 with 2.4.1. The elasticsearch version is 5.1.2 as before.

Below is the contents of elasticsearch.log:

[2017-01-24T11:35:18,909][INFO ][o.e.n.Node               ] [] initializing ...
[2017-01-24T11:35:19,101][INFO ][o.e.e.NodeEnvironment    ] [T7CEo0J] using [1] data paths, mounts [[/ (/dev/sda8)]], net usable_space [35.7gb], net total_space [139.6gb], spins? [possibly], types [ext4]
[2017-01-24T11:35:19,102][INFO ][o.e.e.NodeEnvironment    ] [T7CEo0J] heap size [1.9gb], compressed ordinary object pointers [true]
[2017-01-24T11:35:19,111][INFO ][o.e.n.Node               ] node name [T7CEo0J] derived from node ID [T7CEo0J8SOqX13kNEAPAvg]; set [node.name] to override
[2017-01-24T11:35:19,122][INFO ][o.e.n.Node               ] version[5.1.2], pid[8973], build[c8c4c16/2017-01-11T20:18:39.146Z], OS[Linux/3.16.0-70-generic/amd64], JVM[Oracle Corporation/Java HotSpot(TM) 64-Bit Server VM/1.8.0_77/25.77-b03]
[2017-01-24T11:35:20,209][INFO ][o.e.p.PluginsService     ] [T7CEo0J] loaded module [aggs-matrix-stats]
[2017-01-24T11:35:20,209][INFO ][o.e.p.PluginsService     ] [T7CEo0J] loaded module [ingest-common]
[2017-01-24T11:35:20,209][INFO ][o.e.p.PluginsService     ] [T7CEo0J] loaded module [lang-expression]
[2017-01-24T11:35:20,210][INFO ][o.e.p.PluginsService     ] [T7CEo0J] loaded module [lang-groovy]
[2017-01-24T11:35:20,210][INFO ][o.e.p.PluginsService     ] [T7CEo0J] loaded module [lang-mustache]
[2017-01-24T11:35:20,210][INFO ][o.e.p.PluginsService     ] [T7CEo0J] loaded module [lang-painless]
[2017-01-24T11:35:20,210][INFO ][o.e.p.PluginsService     ] [T7CEo0J] loaded module [percolator]
[2017-01-24T11:35:20,210][INFO ][o.e.p.PluginsService     ] [T7CEo0J] loaded module [reindex]
[2017-01-24T11:35:20,210][INFO ][o.e.p.PluginsService     ] [T7CEo0J] loaded module [transport-netty3]
[2017-01-24T11:35:20,211][INFO ][o.e.p.PluginsService     ] [T7CEo0J] loaded module [transport-netty4]
[2017-01-24T11:35:20,211][INFO ][o.e.p.PluginsService     ] [T7CEo0J] no plugins loaded
[2017-01-24T11:35:22,810][INFO ][o.e.n.Node               ] initialized
[2017-01-24T11:35:22,811][INFO ][o.e.n.Node               ] [T7CEo0J] starting ...
[2017-01-24T11:35:23,039][INFO ][o.e.t.TransportService   ] [T7CEo0J] publish_address {127.0.0.1:9300}, bound_addresses {[::1]:9300}, {127.0.0.1:9300}
[2017-01-24T11:35:23,054][WARN ][o.e.b.BootstrapCheck     ] [T7CEo0J] max file descriptors [4096] for elasticsearch process is too low, increase to at least [65536]
[2017-01-24T11:35:23,055][WARN ][o.e.b.BootstrapCheck     ] [T7CEo0J] max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]
[2017-01-24T11:35:26,258][INFO ][o.e.c.s.ClusterService   ] [T7CEo0J] new_master {T7CEo0J}{T7CEo0J8SOqX13kNEAPAvg}{rOR6BRP9S6CqXOChtboGLA}{127.0.0.1}{127.0.0.1:9300}, reason: zen-disco-elected-as-master ([0] nodes joined)
[2017-01-24T11:35:26,319][INFO ][o.e.h.HttpServer         ] [T7CEo0J] publish_address {127.0.0.1:9200}, bound_addresses {[::1]:9200}, {127.0.0.1:9200}
[2017-01-24T11:35:26,320][INFO ][o.e.n.Node               ] [T7CEo0J] started
[2017-01-24T11:35:26,616][INFO ][o.e.g.GatewayService     ] [T7CEo0J] recovered [1] indices into cluster_state
[2017-01-24T11:35:27,494][INFO ][o.e.c.r.a.AllocationService] [T7CEo0J] Cluster health status changed from [RED] to [YELLOW] (reason: [shards started [[megacorp][1]] ...]).
[2017-01-24T11:35:55,245][DEBUG][o.e.c.s.ClusterService   ] [T7CEo0J] processing [cluster_update_settings]: took [31ms] done applying updated cluster_state (version: 7, uuid: RYMpMgAlT1yXJu8Wkdf-pg)
[2017-01-24T11:35:55,245][DEBUG][o.e.c.s.ClusterService   ] [T7CEo0J] processing [reroute_after_cluster_update_settings]: execute
[2017-01-24T11:35:55,253][DEBUG][o.e.c.s.ClusterService   ] [T7CEo0J] processing [reroute_after_cluster_update_settings]: took [7ms] no change in cluster_state
[2017-01-24T11:36:12,203][DEBUG][r.suppressed             ] path: /megacorp/_search, params: {size=1000, scroll=1m, index=megacorp, search_type=scan}
java.lang.IllegalArgumentException: No search type for [scan]
    at org.elasticsearch.action.search.SearchType.fromString(SearchType.java:107) ~[elasticsearch-5.1.2.jar:5.1.2]
    at org.elasticsearch.rest.action.search.RestSearchAction.parseSearchRequest(RestSearchAction.java:114) ~[elasticsearch-5.1.2.jar:5.1.2]
    at org.elasticsearch.rest.action.search.RestSearchAction.prepareRequest(RestSearchAction.java:81) ~[elasticsearch-5.1.2.jar:5.1.2]
    at org.elasticsearch.rest.BaseRestHandler.handleRequest(BaseRestHandler.java:66) ~[elasticsearch-5.1.2.jar:5.1.2]
    at org.elasticsearch.rest.RestController.executeHandler(RestController.java:243) ~[elasticsearch-5.1.2.jar:5.1.2]
    at org.elasticsearch.rest.RestController.dispatchRequest(RestController.java:200) [elasticsearch-5.1.2.jar:5.1.2]
    at org.elasticsearch.http.HttpServer.dispatchRequest(HttpServer.java:113) [elasticsearch-5.1.2.jar:5.1.2]
    at org.elasticsearch.http.netty4.Netty4HttpServerTransport.dispatchRequest(Netty4HttpServerTransport.java:507) [transport-netty4-5.1.2.jar:5.1.2]
    at org.elasticsearch.http.netty4.Netty4HttpRequestHandler.channelRead0(Netty4HttpRequestHandler.java:69) [transport-netty4-5.1.2.jar:5.1.2]
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at org.elasticsearch.http.netty4.pipelining.HttpPipeliningHandler.channelRead(HttpPipeliningHandler.java:66) [transport-netty4-5.1.2.jar:5.1.2]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) [netty-codec-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.handler.codec.MessageToMessageCodec.channelRead(MessageToMessageCodec.java:111) [netty-codec-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) [netty-codec-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) [netty-codec-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293) [netty-codec-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267) [netty-codec-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:651) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:536) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:490) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:450) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873) [netty-common-4.1.6.Final.jar:4.1.6.Final]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_77]

Below is what i got in the logstash console :

STARTING logstash_csv script......
Settings: Default pipeline workers: 4
A plugin had an unrecoverable error. Will restart this plugin.
  Plugin: <LogStash::Inputs::Elasticsearch hosts=>["localhost:9200"], index=>"megacorp", query=>"{\"query\":{\"query_string\": {\"query\": \"first_name:Jane\"}}}", codec=><LogStash::Codecs::JSON charset=>"UTF-8">, scan=>true, size=>1000, scroll=>"1m", docinfo=>false, docinfo_target=>"@metadata", docinfo_fields=>["_index", "_type", "_id"], ssl=>false>
  Error: [400] {"error":{"root_cause":[{"type":"illegal_argument_exception","reason":"No search type for [scan]"}],"type":"illegal_argument_exception","reason":"No search type for [scan]"},"status":400} {:level=>:error}
Pipeline main started
A plugin had an unrecoverable error. Will restart this plugin.
  Plugin: <LogStash::Inputs::Elasticsearch hosts=>["localhost:9200"], index=>"megacorp", query=>"{\"query\":{\"query_string\": {\"query\": \"first_name:Jane\"}}}", codec=><LogStash::Codecs::JSON charset=>"UTF-8">, scan=>true, size=>1000, scroll=>"1m", docinfo=>false, docinfo_target=>"@metadata", docinfo_fields=>["_index", "_type", "_id"], ssl=>false>
  Error: [400] {"error":{"root_cause":[{"type":"illegal_argument_exception","reason":"No search type for [scan]"}],"type":"illegal_argument_exception","reason":"No search type for [scan]"},"status":400} {:level=>:error}

2 Answers 2

3

If you're open to use Logstash, then you can very easily do this with an elasticsearch input making the query and then a csv output for dumping the data into a CSV file. It'd look like this:

input {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "your_index"
    query => '{"query": {"match_all": {}}}'
  }
}
output {
  csv {
    fields => ["field1", "field2", "field3"]
    path => "/path/to/file.csv"
  }
}

UPDATE

If you need to invoke this dynamically, you could generate this logstash configuration dynamically based on a query that you'd give as input to the shell script:

#!/bin/sh

if [ -z "$LOGSTASH_HOME" ]; then
    echo "WARNING: The LOGSTASH_HOME environment variable is not set!"
    exit 0
fi

LS_CONF="input {
   elasticsearch {
     hosts => [\"localhost:9200\"]
     index => 'megacorp'
     query => '{\"query\":{\"query_string\": {\"query\": \"$1\"}}}'
   }
}
output {
   csv {
     fields => [$2]
     path => \"/path/to/file.csv\"
   }
}"

$LOGSTASH_HOME/bin/logstash -e "$LS_CONF"

Then you can invoke that script with the query my_field:123456 like this

./es_to_csv.sh "my_field:123456" "field1,field2,field3"

This will have the same effect as calling {{elasticUrl}}/_search?q=my_field:123456 and produce a CSV file with the columns field1,field2,field3

Sign up to request clarification or add additional context in comments.

40 Comments

thank you for your response. As, far as I understand, I will have to provide this in Logstash configuration and run the logstash. So, if I provide custom query to elasticsearch such as: {{elasticUrl}}/_search?q=.... , all the documents satisfying the given query will be exported to file.csv. Am I understanding this correctly ? I am asking to make sure that the documents will be filtered according to the query and then only written to csv.
Yes, the query part will look like this query => '...' where ... is whatever you'd put behind the q=... parameter in your URL
There's no request URL when running logstash, you simply run bin/logstash -f yourconfig.conf
Okay thanks. And what about the fields configuration ? Do, they also respect what we pass in the URL ? Or, are they static and will not change ?
Yes, that's called document_type => 'supervisors' which you need to add to the elasticsearch input
|
1

This is somewhat difficult, because inherently - JSON is a hierarchical data structure, and CSV is not.

There isn't a trivial way of reducing one to the other as a result - anything you do will be custom.

However you can do something like:

#!/usr/bin/env perl

use strict;
use warnings;

use LWP;
use JSON;

my $url =
  'http://localhost:9200/index-name/path/AVm7dsU_mwKGPn0NRXkK';

my $agent    = LWP::UserAgent->new;
my $response = $agent->get($url);

if ( $response->code ) {
   my $json = from_json( $response->content );
   my @fields = sort keys %{ $json->{_source} };

   #column headings
   print join ",", @fields, "\n";
   #column values
   print join ",", @{ $json->{_source} }{@fields}, "\n";
}

It's a bit crude, and assumes that with _source there's a flat key-value relationship. With multiple records you'd need to wrap it in a loop to print multiple - this is just an example with a single document.

It would be better - if at all possible - to change whatever is wanting the CSV, to handle a multi dimensional data format in the first place.

2 Comments

Thanks for your response. Actually, i don't only have flat key-value relationship. I have json data in nested form.I am using ElasticSearch currently. Before, I was using Apache Solr where I was capable of generating csv data from nested json. Solr stored the deeply nested json in flattened form and could return csv simply by specifying wt=csv in the request param. Like Solr, ElasticSearch too is built on top of Lucene. And , I was hoping that ElasticSearch too was capable of producing csv responses similarly.
Well, the above will allow you to flatten out the json, but you'll have to decide how you want to do it. e.g. if it's an array, do you want to concat the keys into a single field? Or unroll the data structure so it's multiple rows (and bear in mind - that means you'd possibly have a lot of rows if you've many multi-value fields).

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.