1

I'm trying to save a Spark-DataFrame using PyMongo connecter. Following is my code, but every-time I run the code I get an error:

java.io.IOException: No FileSystem for scheme: mongodb

following is my code:

import pymongo
import pymongo_spark
pymongo_spark.activate()
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
conf = SparkConf()
sc = SparkContext(conf=conf)
sqlContext=SQLContext(sc)
from pyspark.sql import SparkSession
from pyspark.sql import SparkSession
path = "hdfs://localhost:9000/home/hadoop/h_data/sales_ord_univ.csv"
df=sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load(path)
collections=df.collect()
df.write.format('mongodb://localhost:27017/test.sales_order_2').save()

I have a pretty naive code since Im a newbie to this, but any help towards this would be greatly appreciated. Im using Spark- 2.0.0, Python 2.7.6,MongoDB: 3.2.9

1 Answer 1

1

I'm trying to save a Spark-DataFrame using PyMongo connector

You can try to use MongoDB Connector for Spark. Using your setup environment of Apache Spark v2.0.x, Python v2.7.x, and MongoDB v3.2.x, you can do something like below:

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Application Name").getOrCreate()
dataframe = spark.read.csv("path/to/file.csv", header=True, mode="DROPMALFORMED")
dataframe.write.format("com.mongodb.spark.sql.DefaultSource")\
               .option("spark.mongodb.output.uri", "mongodb://localhost:27017/database.collection")\
               .save()

The full version of Python example file can be found on MongoDB PySpark Docker: examples.py . Which includes an example to use MongoDB Aggregation in Spark, and Spark SQL.

If you are familiar with docker, you can execute the git project MongoDB PySpark Docker using docker-compose and run some of the PySpark examples.

You may find the following resources useful:

Sign up to request clarification or add additional context in comments.

2 Comments

That's a good solution. However, can we do exception handling with this Spark connector in PySpark? Because, chances are there that the dataframe can easily exceed the MongoDB's document size limit of 16MB
You can always enclose it in try/except statement. Note that a CSV row would be a single document, not the whole CSV become a single document. See also definition of MongoDB Document. If a CSV row value is over than 16MB, you may want to rethink the schema/model.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.