1

I want to know how to load/import a CSV file in to mongodb using pyspark. I have a csv file named cal.csv placed in the desktop. Can somebody share the code snippet.

2
  • you want to read the csv from desktop using pyspark and then save it in mongodb, right? Commented Sep 28, 2018 at 14:13
  • yes! absolutely correct. I want to import the CSV file and store it in mongodb Commented Sep 29, 2018 at 15:06

2 Answers 2

1

First read the csv as pyspark dataframe.

from pyspark import SparkConf,SparkContext
from pyspark.sql import SQLContext

sc = SparkContext(conf = conf)
sql = SQLContext(sc)

df = sql.read.csv("cal.csv", header=True, mode="DROPMALFORMED")

Then write it to mongodb,

df.write.format('com.mongodb.spark.sql.DefaultSource').mode('append')\
        .option('database',NAME).option('collection',COLLECTION_MONGODB).save()

Specify the NAME and COLLECTION_MONGODB as created by you.

Also, you need to give conf and packages alongwith spark-submit according to your version,

/bin/spark-submit --conf "spark.mongodb.inuri=mongodb://127.0.0.1/DATABASE.COLLECTION_NAME?readPreference=primaryPreferred"
                  --conf "spark.mongodb.output.uri=mongodb://127.0.0.1/DATABASE.COLLECTION_NAME" 
                  --packages org.mongodb.spark:mongo-spark-connector_2.11:2.2.0
                  tester.py

Specify COLLECTION_NAME and DATABASE above. tester.py assuming name of the code file. For more information, refer this.

Sign up to request clarification or add additional context in comments.

1 Comment

Did you find the answer useful?
1

This worked for me. database:people Collection:con

pyspark --conf "spark.mongodb.input.uri=mongodb://127.0.0.1/people.con?readPreference=primaryPreferred" \
    --conf "spark.mongodb.output.uri=mongodb://127.0.0.1/people.con" \
    --packages org.mongodb.spark:mongo-spark-connector_2.11:2.3.0


from pyspark.sql import SparkSession

my_spark = SparkSession \
         .builder \
         .appName("myApp") \
         .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/people.con") \
         .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/people.con") \
         .getOrCreate()

df = spark.read.csv(path = "file:///home/user/Desktop/people.csv", header=True, inferSchema=True)

df.printSchema()

df.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").option("database","people").option("collection", "con").save()

Next go to mongo and check if collection is wrtten by following below steps

mongo
show dbs
use people
show collections
db.con.find().pretty()

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.