0

The issue that my inner copy activity from foreach activity isn't published to ADF. I don't see that activity in ADF UI and even after I add it manually I not able not debug, publish and even view activity code =(

When I click on publish I see error 'Publishing: Got error while publishing' When I'm trying to debug:

Failed
{"__zone_symbol__currentTask":{"type":"microTask","state":"notScheduled","source":"Promise.then","zone":"angular","cancelFn":null,"runCount":0}}

Clicking on 'Code' button doesn't do any actions.

Anything I missing? Should I add copy activity somewhere else?

Foreach/copy activity in c# code is following :

private Activity CreateDocumentIteratorActivity(
        string lookupDocumentsActivityName,
        string sourceDatasetName,
        string destinationDatasetName,
        string logsServiceName)
    {
        return new ForEachActivity()
        {
            Name = "ForEachDocument",
            IsSequential = false,
            BatchCount = BackupSettings.Value.ParallelThreads,
            Items = new Expression { Value = "@activity('" + lookupDocumentsActivityName + "').output.value" },
            DependsOn = new List<ActivityDependency>() {
                new ActivityDependency() {
                    Activity = lookupDocumentsActivityName,
                    DependencyConditions = new List<string>() {
                        DependencyCondition.Succeeded
                    }
                }
            },
            Activities = new[] {
                CreateCopyDocToFileActivity(sourceDatasetName, destinationDatasetName, logsServiceName, BackupSettings.Value.BackupLogsFolder)
            }
        };
    }

private CopyActivity CreateCopyDocToFileActivity(string sourceDatasetName, string destinationDatasetName, string logsServiceName, string logsPath)
    {
        return new CopyActivity()
        {
            Name = "CopyDocToFile",
            Policy = new ActivityPolicy()
            {
                Retry = BackupSettings.Value.RetryPolicyConfiguration.RetryCount,
                RetryIntervalInSeconds = (int)BackupSettings.Value.RetryPolicyConfiguration.DeltaBackOff.TotalSeconds,
                Timeout = BackupSettings.Value.RetryPolicyConfiguration.MaxBackOff
            },
            Source = new DocumentDbCollectionSource()
            {
                Query = new Expression(@"select value c from c where c.id = '@{item().id}'")
            },
            Inputs = new[] { new DatasetReference() {
                ReferenceName = sourceDatasetName,
                Parameters = new Dictionary<string, object>
                {
                    { "collectionName", new Expression("@pipeline().parameters.collectionName") }
                }
            }},
            Outputs = new[] { new DatasetReference() {
                ReferenceName = destinationDatasetName,
                Parameters = new Dictionary<string, object> {
                     { "fileName", new Expression("@concat(pipeline().parameters.collectionName,  '/', item().PartitionKey, '/', item().id)") },
                     { "backupDateStr", new Expression("@pipeline().TriggerTime") }
                }
            }},
            Sink = new BlobSink()
            {
                CopyBehavior = CopyBehaviorType.PreserveHierarchy,
            },

            ParallelCopies = BackupSettings.Value.ParallelThreads,
            EnableSkipIncompatibleRow = true,
            RedirectIncompatibleRowSettings = new RedirectIncompatibleRowSettings()
            {
                LinkedServiceName = logsServiceName,
                Path = logsPath
            }
        };
    }

Response from /pipelines api call:

{
"value": [
{
    "id": "/subscriptions/d2259601-012b-4253-895b-02916ef0f7f7/resourceGroups/Test-Data/providers/Microsoft.DataFactory/factories/backup-data-factory/pipelines/cosmosBackup",
    "name": "cosmosBackup",
    "type": "Microsoft.DataFactory/factories/pipelines",
    "properties":
    {
        "activities": [
        {
            "type": "Lookup",
            "typeProperties":
            {
                "source":
                {
                    "type": "DocumentDbCollectionSource",
                    "query":
                    {
                        "value": "select root.id, root.PartitionKey from root",
                        "type": "Expression"
                    }
                },
                "dataset":
                {
                    "referenceName": "source_cosmosdb_collection",
                    "parameters":
                    {
                        "collectionName": "@pipeline().parameters.collectionName"
                    },
                    "type": "DatasetReference"
                },
                "firstRowOnly": false
            },
            "policy":
            {
                "timeout": "02:00:00",
                "retry": 3,
                "retryIntervalInSeconds": 30
            },
            "name": "GetDocumentsIds"
        },
        {
            "type": "ForEach",
            "typeProperties":
            {
                "isSequential": false,
                "batchCount": 4,
                "items":
                {
                    "value": "@activity('GetDocumentsIds').output.value",
                    "type": "Expression"
                },
                "activities": [
                {
                    "type": "Copy",
                    "typeProperties":
                    {
                        "source":
                        {
                            "type": "DocumentDbCollectionSource",
                            "query":
                            {
                                "value": "select value c from c where c.id = '@{item().id}'",
                                "type": "Expression"
                            }
                        },
                        "sink":
                        {
                            "type": "BlobSink",
                            "copyBehavior": "PreserveHierarchy"
                        },
                        "parallelCopies": 4,
                        "enableSkipIncompatibleRow": true,
                        "redirectIncompatibleRowSettings":
                        {
                            "linkedServiceName": "destination_cosmosdb_collection_service",
                            "path": "backup/logs"
                        }
                    },
                    "inputs": [
                    {
                        "referenceName": "source_cosmosdb_collection",
                        "parameters":
                        {
                            "collectionName":
                            {
                                "value": "@pipeline().parameters.collectionName",
                                "type": "Expression"
                            }
                        },
                        "type": "DatasetReference"
                    }],
                    "outputs": [
                    {
                        "referenceName": "destination_cosmosdb_collection",
                        "parameters":
                        {
                            "fileName":
                            {
                                "value": "@concat(pipeline().parameters.collectionName,  '/', item().PartitionKey, '/', item().id)",
                                "type": "Expression"
                            },
                            "backupDateStr":
                            {
                                "value": "@pipeline().TriggerTime",
                                "type": "Expression"
                            }
                        },
                        "type": "DatasetReference"
                    }],
                    "policy":
                    {
                        "timeout": "02:00:00",
                        "retry": 3,
                        "retryIntervalInSeconds": 30
                    },
                    "name": "CopyDocToFile"
                }]
            },
            "name": "ForEachDocument",
            "dependsOn": [
            {
                "activity": "GetDocumentsIds",
                "dependencyConditions": ["Succeeded"]
            }]
        }],
        "parameters":
        {
            "collectionName":
            {
                "type": "String"
            }
        }
    },
    "etag": "01006ed4-0000-0000-0000-5b3b38450000"
}]
}
7
  • Could you press F2, and then click refresh button, and then get the response of list pipelines api in network tab? Could you share the json of that pipeline in the response? Looks like there are something which is not recognized by UI. Commented Jul 3, 2018 at 10:37
  • Updated question. Thanks for helping trick, didn't though about that way of viewing pipeline code =) Commented Jul 3, 2018 at 11:08
  • Your pipeline json looks good. You mean, when double click the foreach acivity, you didn’t see the copy activity? Is there any error in console when double click the foreach activity? Commented Jul 3, 2018 at 11:52
  • And does your dataset shown out successfully? Commented Jul 3, 2018 at 11:53
  • click on foreach activity works fine and opens actually this activity but it's empty. Also I see zero inner activities when looking on description of foreach activity. here's sceenshot: screencast.com/t/uLoojzKz5zN Commented Jul 3, 2018 at 12:00

1 Answer 1

1

Found out the issue. This part is not right.

 "enableSkipIncompatibleRow": true,
  "redirectIncompatibleRowSettings": {
      "linkedServiceName": "destination_cosmosdb_collection_service",
          "path": "backup/logs"
}

It shoud be

 "enableSkipIncompatibleRow": true,
   "redirectIncompatibleRowSettings": {
                 "linkedServiceName": {
                   "referenceName": "destination_cosmosdb_collection_service",
                    "type": "LinkedServiceReference"
         },
   "path": "backup/logs"
                            },
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.