1

I used following function in Python to initialize an index in Elasticsearch.

def init_index():
    constants.ES_CLIENT.indices.create(
        index = constants.INDEX_NAME,
        body = {
                "settings": {
                    "index": {
                        "type": "default"
                    },
                    "number_of_shards": 1,
                    "number_of_replicas": 1,
                "analysis": {
                    "filter": {
                        "ap_stop": {
                            "type": "stop",
                            "stopwords_path": "stoplist.txt"
                        },
                        "shingle_filter" : {
                            "type" : "shingle",
                            "min_shingle_size" : 2,
                            "max_shingle_size" : 5,
                            "output_unigrams": True
                        }
                    },
                    "analyzer": {
                        constants.ANALYZER_NAME : {
                            "type": "custom",
                            "tokenizer": "standard",
                            "filter": ["standard",
                                       "ap_stop",
                                       "lowercase",
                                       "shingle_filter",
                                       "snowball"]
                        }
                    }
                }
            }
        }
    )

    new_mapping = {
        constants.TYPE_NAME: {
            "properties": {
                "text": {
                    "type": "string",
                    "store": True,
                    "index": "analyzed",
                    "term_vector": "with_positions_offsets_payloads",
                    "search_analyzer": constants.ANALYZER_NAME,
                    "index_analyzer": constants.ANALYZER_NAME
                }
            }
        }
    }

    constants.ES_CLIENT.indices.put_mapping (
        index = constants.INDEX_NAME,
        doc_type = constants.TYPE_NAME,
        body = new_mapping
    )

Using this function I was able to create an index by user-defined specs.

I recently started to work with scala and spark. For integrating elasticsearch into this I can either use Spark's API i.e. org.elasticsearch.spark or I can use Hadoop org.elasticsearch.hadoop. Most of the examples I see are related to Hadoop's methodology but I don't wish to use Hadoop here. I went through Spark-elasticsearch documentation and was able to atleast index documents without including Hadoop but I noticed that this created everything default, I can't even specify _id there. It generates _id on its own.

In scala I use the following code for indexing (not complete code):

val document = mutable.Map[String, String]()
document("id") = docID
document("text") = textChunk.mkString(" ") //textChunk is a list of Strings
sc.makeRDD(Seq(document)).saveToEs("es_park_ap/document")

This created an index this way:

{
   "es_park_ap": {
      "mappings": {
         "document": {
            "properties": {
               "id": {
                  "type": "string"
               },
               "text": {
                  "type": "string"
               }
            }
         }
      },
      "settings": {
         "index": {
            "creation_date": "1433006647684",
            "uuid": "QNXcTamgQgKx7RP-h8FVIg",
            "number_of_replicas": "1",
            "number_of_shards": "5",
            "version": {
               "created": "1040299"
            }
         }
      }
   }
}

So if I pass a document to it, a following document is created:

     {
        "_index": "es_park_ap",
        "_type": "document",
        "_id": "AU2l2ixcAOrl_Gagnja5",
        "_score": 1,
        "_source": {
           "text": "some large text",
           "id": "12345"
        }
     }

Just like Python, how can I use Spark and Scala to create an index with user defined specifications?

1 Answer 1

2

I think we should divide your question to several smaller issues.

If you want to create an index with specific mapping / settings you should use elasticsearch JAVA api directly (You can use it from Scala code of course). You can use the following sources for examples of index creating using Scala:

Creating index and adding mapping in Elasticsearch with java api gives missing analyzer errors

Define custom ElasticSearch Analyzer using Java API

Elasticsearch Hadoop / Spark plugin is used in order transport data easily from HDFS to ES. ES maintenance should be done separately.

The fact that you still seeing automatically generated id is because you must specify to the plugin your id field using the following syntax:

EsSpark.saveToEs(rdd, "spark/docs", Map("es.mapping.id" -> "your_id_field"))

Or in your case:

sc.makeRDD(Seq(document)).saveToEs("es_park_ap/document", Map("es.mapping.id" -> "your_id_field"))

You can find more details about syntax and proper use here:

https://www.elastic.co/guide/en/elasticsearch/hadoop/master/spark.html

Michael

Sign up to request clarification or add additional context in comments.

1 Comment

Agree. Spark objective is different than maintaining and working with ES cluster. As such Spark works as abstraction on top of ES. You should use any scala client (if you are using scala) listed here for working with ES cluster elastic.co/guide/en/elasticsearch/client/community/current/…

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.