2

Im currently working on a project where I try to store AVRO files on Azure Blob Storage using PySpark, but without any luck. Writing csv files is also not working. Using spark-submit and providing the --jars argument with the paths to the jar files yields the same error.

I am running the code sample in a Jupyter Notebook within VsCode.

Could someone please help me resolve this error?

Sample Code:

import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as f


STORAGE_ACCOUNT_NAME = "mystorageaccount"
CONTAINER_NAME = "mycontainer"
SPARK_AVRO_JAR_PATH = "/path_to_jar/spark-avro_2.12-3.2.0.jar"
AZURE_STORAGE_BLOB_JAR_PATH = "/path_to_jar/azure-storage-blob-12.14.3.jar"
AZURE_STORAGE_JAR_PATH = "/path_to_jar/azure-storage-8.6.6.jar"
HADOOP_AZURE_JAR_PATH = "/path_to_jar/hadoop-azure-3.3.1.jar"
DESTINATION_CONTAINER_STR = f"wasbs://{CONTAINER_NAME}@{STORAGE_ACCOUNT_NAME}.blob.core.windows.net/"

cpu = multiprocessing.cpu_count() -1
spark = SparkSession.builder \
    .master(f"local[{cpu}]") \
    .appName("MyApp") \
    .config('spark.jars', f"{SPARK_AVRO_JAR_PATH},{AZURE_STORAGE_BLOB_JAR_PATH},{AZURE_STORAGE_JAR_PATH},{HADOOP_AZURE_JAR_PATH}")\
    .config("spark.driver.memory", "15g")\
    .getOrCreate()
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark.sparkContext.setLogLevel("WARN")
spark.conf.set(
    f"fs.azure.account.key.{STORAGE_ACCOUNT_NAME}.blob.core.windows.net",
    f"{STORAGE_ACCOUNT_ACCESS_KEY}"
)

file_name = "test"
df = spark.createDataFrame(["10","11","13"], "string").toDF("age")
df.write.format("avro").save(f"{DESTINATION_CONTAINER_STR}{file_name}")

Error messages:

Py4JJavaError                             Traceback (most recent call last)
/var/folders/93/_09pzbms1d9cyfm7mkt6hnhm0000gn/T/ipykernel_74366/438907801.py in <module>
     81 file_name = "test"
     82 df = spark.createDataFrame(["10","11","13"], "string").toDF("age")
---> 83 df.write.format("csv").save(f"{DESTINATION_CONTAINER_STR}{file_name}")

~/jens/.venv/lib/python3.9/site-packages/pyspark/sql/readwriter.py in save(self, path, format, mode, partitionBy, **options)
    738             self._jwrite.save()
    739         else:
--> 740             self._jwrite.save(path)
    741 
    742     @since(1.4)

~/jens/.venv/lib/python3.9/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1307 
   1308         answer = self.gateway_client.send_command(command)
-> 1309         return_value = get_return_value(
   1310             answer, self.gateway_client, self.target_id, self.name)
   1311 

~/jens/.venv/lib/python3.9/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
    109     def deco(*a, **kw):
    110         try:
--> 111             return f(*a, **kw)
    112         except py4j.protocol.Py4JJavaError as e:
    113             converted = convert_exception(e.java_exception)

~/jens/.venv/lib/python3.9/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    324             value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325             if answer[1] == REFERENCE_TYPE:
--> 326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
    328                     format(target_id, ".", name), value)

Py4JJavaError: An error occurred while calling o152.save.
: java.lang.NoClassDefFoundError: org/eclipse/jetty/util/ajax/JSON$Convertor
    at org.apache.hadoop.fs.azure.NativeAzureFileSystem.createDefaultStore(NativeAzureFileSystem.java:1441)
    at org.apache.hadoop.fs.azure.NativeAzureFileSystem.initialize(NativeAzureFileSystem.java:1366)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3469)
    at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
    at org.apache.spark.sql.execution.datasources.DataSource.planForWritingFileFormat(DataSource.scala:461)
    at org.apache.spark.sql.execution.datasources.DataSource.planForWriting(DataSource.scala:556)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.eclipse.jetty.util.ajax.JSON$Convertor
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    ... 25 more

Versions:

(.venv) jens % pyspark --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.2.0
      /_/
                        
Using Scala version 2.12.15, OpenJDK 64-Bit Server VM, 1.8.0_302
Branch HEAD
Compiled by user ubuntu on 2021-10-06T12:46:30Z
Revision 5d45a415f3a29898d92380380cfd82bfc7f579ea
Url https://github.com/apache/spark
Type --help for more information.
(.venv) jens % python --version
Python 3.9.7
1
  • Your error doesn't match your code: df.write.format("csv").save(f"{DESTINATION_CONTAINER_STR}{file_name}"). Please update your code and/or error info to accurately reflect what you are running Commented Aug 14, 2023 at 19:42

1 Answer 1

1

It doesn't look like access error to me it seems more like code error.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.