1

There are methods to save data of org.apache.spark.sql.DataFrame to file system or Hive. But how to save data from DataFrame created on MongoDB data back to MongoDB?

Edit : I created DataFrame using

SparkContext sc = new SparkContext()
Configuration config = new Configuration();
config.set("mongo.input.uri","mongodb://localhost:27017:testDB.testCollection);
JavaRDD<Tuple2<Object, BSONObject>> mongoJavaRDD = sc.newAPIHadoopRDD(config, MongoInputFormat.class, Object.class,
            BSONObject.class).toJavaRDD();
JavaRDD<Object> mongoRDD = mongoJavaRDD.flatMap(new FlatMapFunction<Tuple2<Object, BSONObject>, Object>()
    {
        @Override
        public Iterable<Object> call(Tuple2<Object, BSONObject> arg)
        {
            BSONObject obj = arg._2();
            Object javaObject = generateJavaObjectFromBSON(obj, clazz);
            return Arrays.asList(javaObject);
        }
    });

sqlContext = new SqlContext(sc);
 DataFrame df = sqlContext.createDataFrame(mongoRDD, Person.class).registerTempTable("Person");
1
  • OK, my Java is rusty at best, but I really don't understand why to create a single element list just to flatMap. Simple map should be enough. Also what is going on inside generateJavaObjectFromBSON? Commented Jul 20, 2015 at 13:17

1 Answer 1

3

Using PySpark and assuming you have a local MongoDB instance:

import pymongo
from toolz import dissoc

# First, lets create some dummy collection
client = pymongo.MongoClient()
client["foo"]["bar"].insert([{"k": "foo", "v": 1}, {"k": "bar", "v": 2}])
client.close()

config = {
    "mongo.input.uri": "mongodb://localhost:27017/foo.bar",
    "mongo.output.uri": "mongodb://localhost:27017/foo.barplus"
}

# Read data from MongoDB
rdd = sc.newAPIHadoopRDD(
    "com.mongodb.hadoop.MongoInputFormat",
     "org.apache.hadoop.io.Text",
     "org.apache.hadoop.io.MapWritable",
     None, None, config)

# Drop _id field and create data frame
dt = sqlContext.createDataFrame(rdd.map(lambda (k, v): dissoc(v, "_id")))
dt_plus_one = dt.select(dt["k"], (dt["v"] + 1).alias("v"))

(dt_plus_one.
   rdd. # Extract rdd
   map(lambda row: (None, row.asDict())). # Map to (None, dict) pairs
   saveAsNewAPIHadoopFile(
       "file:///placeholder", # Ignored
       # From org.mongodb.mongo-hadoop:mongo-hadoop-core
       "com.mongodb.hadoop.MongoOutputFormat", 
        None, None, None, None, config))

See also: Getting Spark, Python, and MongoDB to work together

Sign up to request clarification or add additional context in comments.

2 Comments

I am using and did not find any createDataFrame() method for the ist of json objects...
So, problem is with loading to data frame as well?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.