1

I've a pyspark dataframe that contains 3 columns:

  • databricksPath
  • countryPartition
  • yearPartition

I'm creating this dataframe based on values coming from widgets via Data Factory: https://i.sstatic.net/8zIuO.png

pyspark dataframe: https://i.sstatic.net/ZcjZO.png

With this dataframe I wanted to create an output with all the combinations in which I have, for example, a json structure to send as output to the ADF for example with this command (dbutils.notebook.exit({'message': 'Success', 'databricksPath': databricksPath,'yearPartition': yearPartition,'countryPartition': countryPartition})) and be able to use it in foreach activity

Output example:

"output": {
                "value": [
                    {
                        "country": "PT",
                        "year": "2022",
                        "databricksPath": "/notebooks/1.Project/Notebook_1"
                    },
                    {
                        "country": "ES",
                        "year": "2022",
                        "databricksPath": "/notebooks/1.Project/Notebook_1"
                    },
                    {
                        "country": "IT",
                        "year": "2022",
                        "databricksPath": "/notebooks/1.Project/Notebook_1"
                    },
                    {
                        "country": "BE",
                        "year": "2022",
                        "databricksPath": "/notebooks/1.Project/Notebook_1"
                    },
                    {
                        "country": "PT",
                        "year": "2022",
                        "databricksPath": "/notebooks/1.Project/Notebook_2"
                    },
                    {
                        "country": "ES",
                        "year": "2022",
                        "databricksPath": "/notebooks/1.Project/Notebook_2"
                    },
                    {
                        "country": "IT",
                        "year": "2022",
                        "databricksPath": "/notebooks/1.Project/Notebook_2"
                    },
                    {
                        "country": "BE",
                        "year": "2022",
                        "databricksPath": "/notebooks/1.Project/Notebook_2"
                    }
                    ]
                    } 

Notebook that I'm using:

# Databricks notebook source
from pyspark.sql import functions as F 
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StringType
from datetime import datetime, timedelta
from pyspark.sql.functions import col, lit, row_number, instr, expr, when, current_date, months_between, coalesce, concat_ws, sum as Sum, first, round, monotonically_increasing_id, date_format, concat, substring, count
from pyspark.sql.window import Window
from pathlib import Path
from functools import reduce
from pyspark.sql import DataFrame
import traceback
import pyodbc
import uuid
import sys


# COMMAND ----------

dbutils.widgets.text("databricksPath", "['/notebooks/1.Project/Notebook_1','/notebooks/1.Project/Notebook_2']", "databricksPath")
dbutils.widgets.text("countryPartition", "['PT','ES','IT','BE']", "countryPartition")
dbutils.widgets.text("yearPartition", "['2022']", "yearPartition")


databricksPath = dbutils.widgets.get('databricksPath')
countryPartition = dbutils.widgets.get('countryPartition')
yearPartition = dbutils.widgets.get('yearPartition')

# COMMAND ----------

from pyspark.sql.types import StructType,StructField, StringType
schema = StructType([
  StructField('databricksPath', StringType(), True),
  StructField('countryPartition', StringType(), True),
  StructField('yearPartition', StringType(), True)
  ])

data2 = [(databricksPath,countryPartition,yearPartition)]
df = spark.createDataFrame(data=data2,schema=schema)

df2 = df.withColumn("databricksPath", concat_ws(",",col("databricksPath")))

display(df2)

# COMMAND ----------

dbutils.notebook.exit({'message': 'Success', 'databricksPath': databricksPath,'yearPartition': yearPartition,'countryPartition': countryPartition})

Can anyone please help me in achieving this

Thank you!

2
  • Can you please confirm this: You want to be able to build Output example in databricks notebook and pass it to ADF to be able to use the returned value in for each? Commented Sep 14, 2022 at 15:09
  • exactly. I want to pick up that dataframe, create a json with all combinations and pass to ADF Commented Sep 14, 2022 at 15:12

1 Answer 1

1

You can use the following code to achieve this:

dbutils.widgets.text("databricksPath", "['/notebooks/1.Project/Notebook_1','/notebooks/1.Project/Notebook_2']", "databricksPath")
dbutils.widgets.text("countryPartition", "['PT','ES','IT','BE']", "countryPartition")
dbutils.widgets.text("yearPartition", "['2022']", "yearPartition")
#dbutils.widgets.text("partitionColumn", "['dbo.table1|country', 'dbo.table2|country_year']", "partitionColumn")

databricksPath = dbutils.widgets.get('databricksPath')
countryPartition = dbutils.widgets.get('countryPartition')
yearPartition = dbutils.widgets.get('yearPartition')
#partitionColumn = dbutils.widgets.get('partitionColumn')

#creating seperate dataframe for each of the above.
path_df = spark.createDataFrame(data=[[i] for i in eval(dbutils.widgets.get('databricksPath'))],schema=['path'])
cp_df = spark.createDataFrame(data=[[i] for i in eval(dbutils.widgets.get('countryPartition'))],schema=['country'])
y_df = spark.createDataFrame(data=[[i] for i in eval(dbutils.widgets.get('yearPartition'))],schema=['year'])
#p_df = spark.createDataFrame(data=[[i] for i in eval(dbutils.widgets.get('partitionColumn'))],schema=['partition_col'])


#applying cross join to get all combination results.
from pyspark.sql.functions import broadcast
final_df= broadcast(broadcast(cp_df).crossJoin(y_df)).crossJoin(path_df)
#final_df= broadcast(broadcast(broadcast(cp_df).crossJoin(y_df)).crossJoin(path_df)).crossJoin(p_df)

#from pyspark.sql.functions import split
#fdf = final_df.select('country','year','path',split(final_df['partition_col'],'[|]').getItem(0).alias('table'),split(final_df['partition_col'],'[|]').getItem(1).alias('partition'))

#from pyspark.sql.functions import array
#fdf = fdf.withColumn('countryYear', array(col('country'),col('year')))

#get the result dataframe as a dictionary
output = [eval(i) for i in final_df.toJSON().collect()]
#output = [eval(i) for i in fdf.toJSON().collect()]

#returning the above output dictionary/JSON to data factory
import json
dbutils.notebook.exit(json.dumps(output))
  • Using the code, the value of output will be array of objects (like the output example)

enter image description here

  • When I run this notebook with a notebook activity in azure data factory, it gives the following result:

enter image description here

UPDATE: This is the output image for the updated requirement.

Sign up to request clarification or add additional context in comments.

5 Comments

super thanks Saideep this is exactly what i wanted, amazing :)! Just one question before putting it as solved. Is it possible for example to have a parameter: partitionColumn which contains for example "partitionColumn": [ "dbo.table1|country", "dbo.table2|country_year"] which gives rise to two columns in this case synapseTable and partitionDesc? ie a split ? and have country and year join in one? eg final output { "databricksTable: "dbo.table1", "country_year": [ "PT", "2022" ], "partitionDesc": "country", "country": "PT" , "year":"2022", "path", "/notebooks/1.Project/Notebook_1"},....?.
Yes, it can be done. You need to add additional code to the solution like using an additional dataframe to apply cross join on (split or other requierd operations can be done on final dataframe after using cross join). Cross join here, is a key to get combinations. As commented, please do consider marking the solution as an answer if it helped.
Once again thanks a lot Saideep, I’ll try to implement that with your solution and after when I achieved it I’ll put your answer as a solution !
Added the code to achieve above specified requirement (additional code as comments to the previous one). Please do check if it helps.
thanks a lot Saideep for the quick solution and for helping me !

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.