1

perhaps there's someone out there that can help me. I'm trying to read data from ES using PySpark. My Jupyter Notebook code is pretty simple:

import pyspark
conf = pyspark.SparkConf().setAppName('Test').setMaster('spark://spark-master:7077')
sc = pyspark.SparkContext(conf=conf)
es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
    keyClass="org.apache.hadoop.io.NullWritable",
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
    conf={
        "es.resource": "some-log/doc",
        "es.nodes": "192.168.1.25",
        "es.port": "9200"
    })

I have Spark and Jupyter Notebook installed on the host running the NB. The spark-defaults.conf file is loading the "elasticsearch-hadoop-6.4.0.jar" via: spark.jars /opt/maya/es-hadoop/elasticsearch-hadoop-6.4.0.jar

I can connect to the ES instance and read from it by using other tools like elasticsearch-py, the Test app shows up in the Spark Master UI. However, when I execute the code above, I keep getting this error:

    ---------------------------------------------------------------------------
    Py4JJavaError                             Traceback (most recent call last)
    <ipython-input-5-c990f37c388b> in <module>
          6         "es.resource": "logs-dfir-winevent-security-*/doc",
          7         "es.nodes": "192.168.248.131",
    ----> 8         "es.port": "9200"
          9     })
         10 #es_rdd.first()

    /opt/anaconda/lib/python3.6/site-packages/pyspark/context.py in newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter, valueConverter, conf, batchSize)
        715         jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputFormatClass, keyClass,
        716                                                    valueClass, keyConverter, valueConverter,
    --> 717                                                    jconf, batchSize)
        718         return RDD(jrdd, self)
        719 

    /opt/anaconda/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args)
       1255         answer = self.gateway_client.send_command(command)
       1256         return_value = get_return_value(
    -> 1257             answer, self.gateway_client, self.target_id, self.name)
       1258 
       1259         for temp_arg in temp_args:

    /opt/anaconda/lib/python3.6/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
        326                 raise Py4JJavaError(
        327                     "An error occurred while calling {0}{1}{2}.\n".
    --> 328                     format(target_id, ".", name), value)
        329             else:
        330                 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD.
: java.lang.ClassNotFoundException: org.elasticsearch.hadoop.mr.LinkedMapWritable
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.apache.spark.util.Utils$.classForName(Utils.scala:238)
    at org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDDFromClassNames(PythonRDD.scala:302)
    at org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDD(PythonRDD.scala:286)
    at org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD(PythonRDD.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

I've searched and cannot see that the error is in the code itself, I'm having a feeling this issue is more related to bad Spark configuration within the host running the Jupyter Notebook. Any insight would be much appreciated!

2 Answers 2

1

Please refer to this question : pyspark: ship jar dependency with spark-submit

What you need to do is pass the jar of the dependency with the configuration. If you're using a Jupyter notebook, you can add it via SparkConf() such as :

conf = SparkConf().option('spark.driver.extraClassPath', 'full/path/to/jar')

Just change your code to :

conf = pyspark.SparkConf().setAppName('Test').setMaster('spark://spark-master:7077').option('spark.driver.extraClassPath', 'full/path/to/jar')
Sign up to request clarification or add additional context in comments.

Comments

0

Another method is:

import os

os.environ['PYSPARK_SUBMIT_ARGS'] = \
    '--jars /full/path/to/your/jar.jar pyspark-shell'

jars could be download from https://www.elastic.co/downloads/hadoop

works on spark 2.3 and elasticsearch 6.4

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.