3

I was trying to write to MongoDB a Spark dataframe that contains the string representation of an ObjectId that is the _id of another collection.

The point is that PySpark has no support for ObjectId (Scala and Java ObjectId support explained here: https://github.com/mongodb/mongo-spark/blob/master/doc/1-sparkSQL.md) so, how could I insert an ObjectId into MongoDB from PySpark using the Spark Connector?

4 Answers 4

2

The accepted answer seem to be outdated as of today. It really led me to a working version, thank you.

Here is my working version of the code :

import pyspark.sql.functions as sfunc
from pyspark.sql.types import *

# This user defined function creates from an str ID like "5b8f7fe430c49e04fdb91599"
# the following Object : { "oid" : "5b8f7fe430c49e04fdb91599"}
# which will be recognized as an ObjectId by MongoDB
udf_struct_id = sfunc.udf(
    lambda x: tuple((str(x),)), 
    StructType([StructField("oid",  StringType(), True)])
)

df = df.withColumn('future_object_id_field', udf_struct_id('string_object_id_column'))

My setup : MongoDB 4.0, Docker image for Spark gettyimages/spark:2.3.1-hadoop-3.0, python 3.6

The documentation for the pyspark mongo connector gave me the idea to call the field oid, which is needed for mongo to recognize fields as ObjectId type.

Sign up to request clarification or add additional context in comments.

2 Comments

Your code doesn't work with latest version, could you see this code and help with correcting it?
It's not working for me either do you guys have a working solution today ?
0

I perform a conversion of the column to a Spark StructField that inserted in MongoDB gets automatically converted to ObjectId

import pyspark.sql.functions as sfunc
import pyspark.sql.types as stypes

udf_struct_id = sfunc.UserDefinedFunction(
    x: tuple((str(x),)), 
    StructType((stypes.StructField( stypes.StringType(), True),))
)

df = df.withColumn('future_object_id_field', udf_struct_id(df['string_object_id_column']))

And then you can perform the SparkSession write to Mongo of that DataFrame and future_object_id_field will become an ObjectId.

NOTE: the field has to be nullable true, if it's false, the field will become an Object with a string inside.

Comments

0

For some reason, if you create dataframe from dict.

mongo_schema = StructType([
    StructField("SomeField", StringType()),
    StructField("ObjectIdField", StructType([StructField('oid', StringType())]))
])

some_dict = {'SomeField': some_field,
             'ObjectIdField': (object_id,)}

df = s.createDataFrame(data=some_dict, schema=mongo_schema)

Comments

0

adding to this... on v10.1 of MongoDb spark connector, you have to set the option convertJson to true in the write configuration (https://www.mongodb.com/docs/spark-connector/current/configuration/write/#std-label-spark-write-conf) , otherwise it will be an object with the property oid instead of ObjectId

1 Comment

Hey could you provide the code that worked for you? Or could you look at the following question and see what's wrong with the code?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.