0

This is my terminal command to run strm.py file

$SPARK_HOME/bin/spark-submit --master local --driver-memory 4g --num-executors 2 --executor-memory 4g --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 org.apache.spark:spark-cassandra-connector_2.11:2.4.0 strm.py

Error:

Cannot load main class from JAR org.apache.spark:spark-cassandra-connector_2.11:2.4.0 with URI org.apache.spark. Please specify a class through --class. at org.apache.spark.deploy.SparkSubmitArguments.error(SparkSubmitArguments.scala:657) atorg.apache.spark.deploy.SparkSubmitArguments.loadEnvironmentArguments(SparkSubmitArguments.scala:224) at org.apache.spark.deploy.SparkSubmitArguments.(SparkSubmitArguments.scala:116) at org.apache.spark.deploy.SparkSubmit$$anon$2$$anon$1.(SparkSubmit.scala:907) at org.apache.spark.deploy.SparkSubmit$$anon$2.parseArguments(SparkSubmit.scala:907) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:81) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

So can anyone help me out what is the issue with this why it can't loading.

1
  • This is the code i wrote for storing streaming data to Cassandra table . query1 = query.writeStream\ .option("checkpointLocation", '/tmp/check_point/')\ .format("org.apache.spark.sql.cassandra")\ .option("keyspace","test")\ .option("table", "my_tables")\ .start()\ .awaitTermination() Commented Mar 4, 2020 at 9:03

1 Answer 1

1

You have 2 problems:

  • you're incorrectly submitting your application - you don't have a comma between org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 and org.apache.spark:spark-cassandra-connector_2.11:2.4.0, so spark-submit treats cassandra connector as a jar, instead of using your python file.

  • current version of Spark Cassandra Connector doesn't support direct write for Spark Structured Streaming data - this functionality is available only in DSE Analytics. But you can workaround this by using foreachBatch, something like this (not tested, the working Scala code is available here):

def foreach_batch_function(df, epoch_id):
    df.format("org.apache.spark.sql.cassandra").option("keyspace","test")\
       .option("table", "my_tables").mode('append').save()

query.writeStream.foreachBatch(foreach_batch_function).start()  
Sign up to request clarification or add additional context in comments.

6 Comments

After writing above function i am getting some error : pyspark.sql.utils.AnalysisException: 'Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;
And also what is epoch_id and what is this used for in def foreach_batch_function(df, epoch_id): ?
Regarding node - adjust it to your needs... as error says - you may need to set watermark, but it’s up to your logic. Epoch ID is a number that could be used for some tracking - you can ignore as of now, but it should be in function signature
I tried this query to store streaming data to Cassandra table:>defwriteToCassandra(writeDF, epochId):writeDF.write\.format("org.apache.spark.sql.cassandra")\.options(table="my_tables", keyspace="test")\ .mode("append") \.save() query = df.writeStream \ .trigger(processingTime="10 seconds") \ .outputMode("update") \ .foreachBatch(writeToCassandra) \ .start()\ .awaitTermination() But i got some errors org.apache.spark.sql.cassandra. Please find packages at spark.apache.org/third-party-projects.html
it looks like that package isn't loaded... before going with the spark streaming job, just check if you can access Cassandra from pyspark at all: github.com/datastax/spark-cassandra-connector/blob/master/doc/…
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.