1
my_spark = SparkSession \ .builder \ .appName("DGLE") \ 
 config("spark.mongodb.input.uri", "mongodb://127.0.0.1/local.DGLE") \ getOrCreate() 
JDBC.write.format("com.mongodb.spark.sql.DefaultSource").option("spark.mongodb.output.uri", "mongodb://127.0.0.1:27017/local.DGLE").save() 

This is my code, do not know where and what is the error, please help

my_spark = SparkSession \ builder \ .appName("DGLE") \ .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/local.DGLE") \ .getOrCreate() 
JDBC.write.format("com.mongodb.spark.sql.DefaultSource").option("spark.mongodb.output.uri", "mongodb://127.0.0.1:27017/local.DGLE").save()

2
  • Could you please explain you problem more. not just the code Commented Jan 12, 2019 at 8:01
  • Also please your code. I tried to edit but I dont really know how it should exactly be. Please edit your question unless I have to flag it Commented Jan 12, 2019 at 8:04

2 Answers 2

2

Efficient way to write into mongodb from pyspark is to use MongoDB Spark Connector. Connector will convert the data into BSON format and save it to mongodb. Let's say you have spark dataframe named df which you want to save in mongodb. You can try:

from pyspark.sql import SparkSession, SQLContext
from pyspark import SparkConf, SparkContext
sc = SparkContext()
spark = SparkSession(sc)   


df.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").option("spark.mongodb.output.uri","mongodb://username:password@server_details:27017/db_name.collection_name?authSource=admin").save()

If you are using notebook write this at the top-

%%configure
{"conf": {"spark.jars.packages": "org.mongodb.spark:mongo-spark-connector_2.11:2.3.2"}}

If you are using spark-submit command:

spark-submit --conf spark.pyspark.python=/usr/bin/anaconda/envs/py35/bin/python3.5 --packages org.mongodb.spark:mongo-spark-connector_2.11:2.3.1 file_name.py
Sign up to request clarification or add additional context in comments.

Comments

0
def save(message: DataFrame):
    message.write \
        .format("mongo") \
        .mode("append") \
        .option("database", "db_name") \
        .option("collection", "collection_name") \
        .save()
    pass

And this is how the SparkSession will look like

spark: SparkSession = SparkSession \
    .builder \
    .appName("MyApp") \
    .config("spark.mongodb.input.uri", "mongodb://localhost:27017") \
    .config("spark.mongodb.output.uri", "mongodb://localhost:27017") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
    .master("local") \
    .getOrCreate()

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.