1

I'm trying to convert a Pandas DataFrame on each worker node (an RDD where each element is a Pandas DataFrame) into a Spark DataFrame across all worker nodes.

Example:

def read_file_and_process_with_pandas(filename):
    data = pd.read(filename)
    """
    some additional operations using pandas functionality
    here the data is a pandas dataframe, and I am using some datetime
    indexing which isn't available for spark dataframes
    """
    return data

filelist = ['file1.csv','file2.csv','file3.csv']
rdd = sc.parallelize(filelist)
rdd = rdd.map(read_file_and_process_with_pandas)

The previous operations work, so I have an RDD of Pandas DataFrames. How can I convert this then into a Spark DataFrame after I'm done with the Pandas processing?

I tried doing rdd = rdd.map(spark.createDataFrame), but when I do something like rdd.take(5), i get the following error:

PicklingError: Could not serialize object: Py4JError: An error occurred while calling o103.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at py4j.Gateway.invoke(Gateway.java:272)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:748)

Is there a way to convert Pandas DataFrames in each worker node into a distributed DataFrame?

3
  • You want an rdd of spark DataFrames? I don't think that's possible. Why are you trying to do this? What's the end goal? Commented Jun 5, 2019 at 3:17
  • Which version of your spark ? Commented Jun 5, 2019 at 3:37
  • I need to use Panda's datetime indexing, which isn't possible with spark RDDs or dataframes Commented Jun 5, 2019 at 17:00

2 Answers 2

3

See this question: https://stackoverflow.com/a/51231046/7964197

I've had to deal with the same problem, which seems quite common (reading many files using pandas, e.g. excel/pickle/any other non-spark format, and converting the resulting RDD into a spark dataframe)

The supplied code adds a new method on the SparkSession that uses pyarrow to convert the pd.DataFrame objects into arrow record batches which are then directly converted to a pyspark.DataFrame object

spark_df = spark.createFromPandasDataframesRDD(prdd) # prdd is an RDD of pd.DataFrame objects

For large amounts of data, this is orders of magnitude faster than converting to an RDD of Row() objects.

Sign up to request clarification or add additional context in comments.

1 Comment

Thanks for your response! I think this is a pretty common problem when you need to use the datetime indexing powers of pandas. It would be great if spark could natively handle the conversions.
2

Pandas dataframes can not direct convert to rdd. You can create a Spark DataFrame from Pandas

spark_df = context.createDataFrame(pandas_df)

Reference: Introducing DataFrames in Apache Spark for Large Scale Data Science

2 Comments

I already tried this, you can see in my answer. This isn't really what i'm looking for, i'm trying to map a pandas dataframe which is in an RDD on each worker node into a spark dataframe across all nodes.
Ok~ In that case , I think you better write some UDF , and put panda in your UDF.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.