1

I've got a DataFrame in Azure Databricks using PySpark. I need to serialize it as JSON into one or more files. Those files will eventually be uploaded to Cosmos so it's vital for the JSON to be well-formed.

I know how to connect directly to Cosmos to serialize the data directly, but I'm required to create the JSON files for upload to Cosmos at a later time.


I can't give data from my actual DataFrame, but the structure is complex. Each row has embedded objects and some of those have their own embedded objects and arrays of objects.

I assume the problem is with how I'm trying to serialize the data, not how I've transformed it. I've created this simple DataFrame, df, which I think will suffice as an example.

+---------+-------------+
|property1|       array1|
+---------+-------------+
|   value1|["a","b","c"]|
|   value2|["x","y","z"]|
+---------+-------------+

I serialize it to Azure Data Lake Storage Gen2 like so.

df.coalesce(1).write.json(outpath, lineSep=",")

The file will contain this JSON. The rows are not elements in an array and the last row has a trailing comma so this JSON will not cooperate with Cosmos.

{"property1":"value1","array1":["a","b","c"]},
{"property1":"value2","array1":["x","y","z"]},

This JSON uploads as expected.

[{"property1":"value1","array1":["a","b","c"]},
{"property1":"value2","array1":["x","y","z"]}]

I've successfully uploaded single JSON objects (i.e. without [] enclosing them) so any solution that writes each DataFrame row to its own JSON file is a potential winner.

I've tried that by repartitioning but there are always files with multiple rows in them.

1 Answer 1

2

I came up with two methods.

The first creates a list of JSON string rows using df.toJSON().collect(), slices the array into batches, then builds a JSON array string.

def batchWriteDataFrame(dataframe):
  rows = dataframe.toJSON().collect()
  batches = [rows[i * batch_size:(i + 1) * batch_size] for i in range((len(rows) + batch_size - 1) // batch_size)] # slice the rows into batches
  batch_num = 1

  for batch in batches:
    dbutils.fs.put(outpath + "batch/" + str(batch_num) + ".json", "[" + ",".join([row for row in batch]) + "]")
    batch_num += 1

The second writes each row to its own file.

def writeDataFrameRows(dataframe):
  i = 0
  for row in dataframe.toJSON().collect():
    dbutils.fs.put(outpath + "single/" + str(i) + ".json", row)
    i += 1
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.