4

I am using Spark 2.3 (Pyspark) to read data from an Elasticsearch 6.6 index.
The Spark job is attempting to create a df and is failing with a parse issue:

Spark Code:

df = spark.read.format("org.elasticsearch.spark.sql").option("es.resource.read", index_name).option("es.nodes", hosts).load()

Error Message:

org.elasticsearch.hadoop.rest.EsHadoopParsingException: Cannot parse value [2019/05/06 19:31:21] for field [GenerateTime]

I believe this is caused in part by the source date format not being in a recognized ISO 8601 format.

Also, in reading the Time/Date Mapping docs, I understand this can be addressed by creating a mapping but this will only affect new indexes and wont change the mapping of the historical indexes.

Question:

Is there a way to address this issue so that I can successfully read from the historical indexes via Spark (e.g. prior to any mapping changes that may be required)? I also, tried .option("es.mapping.date.rich", False) without any luck.

1
  • hey I've updated my answer with some more details. I just hope it helps!! Commented May 14, 2019 at 17:29

1 Answer 1

6

I've created a sample document based on your data in ES 6.4/Spark 2.1 version and made use of the below code, in order to read GenerateTime field as text instead of date type in spark.

Mapping in ES

PUT somedateindex
{
  "mappings": {
    "mydocs":{
      "properties": {
        "GenerateTime": {
          "type": "date",
          "format": "yyyy/MM/dd HH:mm:ss"
        }
      }
    }
  }
}

Notice that the field is of date type in ES.

Spark Code to use date field in ES as String

Note that I've made use of the config option("es.mapping.date.rich", false)

    val spark = SparkSession
                .builder()
                .appName("Spark SQL basic example")
                .config("spark.master", "local")
                .getOrCreate()

    // For implicit conversions like converting RDDs to DataFrames
    import spark.implicits._

    val df = spark.read.format("org.elasticsearch.spark.sql")
            .option("es.resource.read","somedateindex")
            .option("es.nodes", "some_host_name")
            .option("es.mapping.date.rich", false)
            .option("es.port","9200")
            .load()

   df.show()
   df.printSchema()

Spark Code Result in my Eclipse Console:

19/05/13 03:10:53 INFO DAGScheduler: Job 1 finished: show at Elasticsearch.scala:134, took 9.424294 s
19/05/13 03:10:53 INFO CodeGenerator: Code generated in 21.256205 ms
+-------------------+
|       GenerateTime|
+-------------------+
|2019/05/06 19:31:21|
+-------------------+

root
 |-- GenerateTime: string (nullable = true)

19/05/13 03:10:53 INFO SparkUI: Stopped Spark web UI at....

Notice that printSchema shows that table has a single column GenerateTime which is of type string.

If you do not want to go ahead and change the mappings the above should help you.

I recommend to have date fields in date format rather than text and that too in ISO-8601 supported format, that way when type inference kicks-in, you end up getting data in correct type in Spark and you can simply focus on business logic, many times the correct solution lies in how we store the data rather than how we process it.

Spark code to convert String into Timestamp/Date

However, if for some reason you are not able to change the mappings from the source i.e. elasticsearch, you can further add the below code to transform the string value into timestamp using the below code:

    import org.apache.spark.sql.functions._

    //String into Timestamp Transformation
    val df2_timestamp = df.withColumn("GenerateTime_timestamp",  from_unixtime(unix_timestamp($"GenerateTime", "yyyy/MM/dd HH:mm:ss")).cast(TimestampType))
    df2_timestamp.show(false)
    df2_timestamp.printSchema();

If you run the above code, you'd see the output as below:

19/05/14 11:33:10 INFO CodeGenerator: Code generated in 23.742359 ms
+-------------------+----------------------+
|GenerateTime       |GenerateTime_timestamp|
+-------------------+----------------------+
|2019/05/06 19:31:21|2019-05-06 19:31:21.0 |
+-------------------+----------------------+

root
 |-- GenerateTime: string (nullable = true)
 |-- GenerateTime_timestamp: timestamp (nullable = true)

19/05/14 11:33:10 INFO SparkContext: Invoking stop() from shutdown hook

Also note that my solution is in Scala. Let me know if it helps!

Sign up to request clarification or add additional context in comments.

7 Comments

Thank you. I upvoted this because it was a fantastic answer that was very insightful but I continue to have problems. For example, when option("es.mapping.date.rich", false) the job fails with: Caused by: scala.MatchError: Buffer(beats_input_codec_plain_applied) (of class scala.collection.convert.Wrappers$JListWrapper)
Sure, let me check. I like to think this is because ES has nested/array field but not sure. I will test it. Meanwhile, could you post a sample document on which this error could be coming. You can run the spark for one particular id and see if you catch this error. Or atlernatively you can share me the mapping, just to be sure!!
@user9074332 Apologies for the delay. Do you have any mapping info that I can use to test? I've come across some scenarios where I get some errors but want your mapping info to reproduce the error. Let me know if you have any queries.
@Appy22 Sure, give me a day and I will update you on that.
hey @Appy22 Apologies I was a bit held up on other tasks at work. I will surely look into this tomorrow and update you. You should see an update around this same time tomorrow. Once again, apologies!!
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.