1

I am trying to create azure data factory pipelines and resources using python. I was successful with certain ADF activities like Lookup, Copy .. but the problem I am facing here is I am trying to copy few tables from SQL to blob using FOR Each activity and it is throwing the below error

How would you create activities inside for each activity? Any inputs is greatly appreciated. Thanks!

Ref: https://learn.microsoft.com/en-us/python/api/azure-mgmt-datafactory/azure.mgmt.datafactory.models.foreachactivity?view=azure-python

Error Message

TypeError: 'CopyActivity' object is not iterable

Code Block

## Lookup Activity
ls_sql_name = 'ls_'+project_name+'_'+src_svr_type+'_dev'
linked_service_name =LinkedServiceReference(reference_name=ls_sql_name)
lkp_act_name ='Get Table Names'
sql_reader_query = "SELECT top 3 name from sys.tables where name like '%dim'"
source = SqlSource(sql_reader_query= sql_reader_query)
dataset= {"referenceName": "ds_sql_Dim_input","type": "DatasetReference"}
LookupActivity_ = LookupActivity(name =lkp_act_name, linked_service_name= linked_service_name, source = source, dataset = dataset
                                ,first_row_only =False)


#create copy activity
ds_name = 'ds_sql_dim_input' #these datasets already created
dsOut_name ='ds_blob_dim_output' #these datasets already created

copy_act_name = 'Copy SQL to Blob(parquet)'
sql_reader_query =  {"value": "@item().name","type": "Expression"}
sql_source = SqlSource(sql_reader_query=sql_reader_query)
blob_sink = ParquetSink()
dsin_ref = DatasetReference(reference_name=ds_name)
dsOut_ref = DatasetReference(reference_name=dsOut_name)
copy_activity = CopyActivity(name=copy_act_name,inputs=[dsin_ref], outputs=[dsOut_ref], source=sql_source, sink=blob_sink)

## For Each Activity
pl_name ='pl_Test'
items= {"value": "@activity('Get Table Names').output.value","type": "Expression"}
dependsOn = [{"activity": "Get Table Names","dependencyConditions": ["Succeeded"]}]
ForEachActivity_= ForEachActivity(name = 'Copy tables in loop',items=items,depends_on=dependsOn ,activities =copy_activity)

params_for_pipeline = {}
p_obj = PipelineResource(activities=[LookupActivity_,ForEachActivity_], parameters=params_for_pipeline)
p = adf_client.pipelines.create_or_update(rg_name, df_name, pl_name, p_obj)

1 Answer 1

2

Activities needs to be a list of Activity, and you are passing a single one. Try creating a list and add the copy activity to it, and the pass that list in the activities parameter.

Sign up to request clarification or add additional context in comments.

1 Comment

I know this (Activities as list) but not sure how did I missed that. Thanks so much for your inputs! Much appreciated.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.