2

I am still learning how to create custom components in SSIS. Lets say I have an input row count of 500 rows, I need to read/batch/process 100 rows at a time from the input pipeline buffer and then send them to a 3rd party app, once I get the results I need to update the pipeline buffer columns with the new data and then read/batch/process the next 100 rows and so on until I have processed all 500 rows.

My question is, can I loop/read through the input pipeline buffer more than once, so I can update the buffer with the returned data from the 3rd party app?

I thought I read that you can read in all the data and store it into cache and then for example sort the data, but I am not sure how to get that data back to the output from cache. I am also not sure where this should be done and how to access the input pipeline buffer, PrimeOutput or ProcessInput or another override method I don't know about?

I am trying to create a custom asynchronous data flow component to solve this issue.

Any help or ideas would be greatly appreciated and/or point me in the right direction!

Thanks!

7
  • Going back and updating rows runs counter to the idea of the pipeline of SSIS. Why not break these stages up into a pipeline so SSIS can apply its own buffering logic? Your component can process 100 rows at a time if it wants, but then just output the row plus new/updated data. Commented Sep 11, 2020 at 13:34
  • @JeroenMostert thanks for the reply! I am not sure I totally understand. So I can create a new pipeline buffer? Or just loop through the input pipeline buffer in ProcessInput() 100 rows at a time and then send that batch to the 3rd party and then put the results returned into a separate output buffer so I don't have to loop back through the input buffer? Sorry, I am still new and trying to grasp all of this. Commented Sep 11, 2020 at 13:52
  • Right, never mind. I was confused about SSIS' programming model (it's been a while since I've seen it). Of course you can't just stream rows to an output as needed, that would be simple and intuitive. :-P Commented Sep 11, 2020 at 14:12
  • An sync component in SSIS is one that has a 1:1 mapping between source rows and output. An async is whatever you define. The default Sort component has 1:1 but the output data is in a different buffer than the input buffer. It sounds like your process is batch up to 100 rows, do work, add those 100 rows + work enrichment on to the output buffer. What I don't see is where you'd be "looping" on the source rows. Can you expand a bit on that idea? Commented Sep 11, 2020 at 16:55
  • @billinkc Yes, I have a Ole DB source that returns 500 rows. I need to batch a 100 rows at a time and send them in a batch to a Smarty Streets address verification API and then I will get the return results back. From here I am stumped, the looping idea was to go back through the input pipeline buffer and update with return data. Sounds like I don't need to reuse the input pipeline buffer, so where or what do I need to do to get this data to an output buffer? Where or how can I access this buffer? As to where I would be looping would be in the ProcessInput() method. Commented Sep 11, 2020 at 17:27

1 Answer 1

1

I am glad I didn't try to freehand this as there were a bunch of fine points I had forgotten.

A few bits here that are worth noting: my two data structures InData and OutData and you'll need to configure those to keep track of whatever is going to be in the input/output buffers. As the comments state, there might be a clever way to clone the Buffer objects' properties but I didn't see how to. Define these to match the data types in your data flow and if you're lazy like me, use the same column names and you can copy/paste your way to success.

ApiCall is a dummy method that uses our cached values to ask the data cleansing service to do its thing. It needs to return the cleaned data so we can marry the inputs and outputs into a unified row. There's likely a better way to do this but hopefully it's good enough for getting your thought processes firing.

I created an SSIS level variable, @[User::ApiBatchSize] which you would initialize to 500. Using this approach will allow you to optimize the sent batch size without changing the core code. I initialize our local member variable in the PreExecute method because that's the constructor-ish for the script component.

Normally, in an asynchronous script component, you're working with the ProcessInputRow method and that's what I originally did but ran into an issue with the final batch if the size of the list an even multiple of the apiBatchSize. Turns out, the EndOfRowset() was never getting set to True in the method. No worries, we simply need to work with the ProcessInput method. In the "normal" world, process input method causes the process input row to process a row so we're going to skip the middle man and just work directly with the buffer in ProcessInput. I was lazy and didn't rename my Row references to Buffer as the auto-generated code originally addressed the parameter.

The pseudo logic here is

  • While there's data row
    • if we have hit our batch size, send our data collection off for processing
      • For each processed row, add a row to the output buffer and fill it with clean data
    • Empty our collection bucket (it's already been sent downstream)
  • Add the current row to our collection bucket

The C# itself

using System;
using System.Data;
using System.Collections.Generic;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;

/// <summary>
/// There might be a clever way to re-use the metadata from the Input/OutputBuffer 
/// definition but  I don't know how to access it so I redefine it here
/// </summary>
public struct InData
{
    public string AddressLine1 { get; set; }
}

/// <summary>
/// There might be a clever way to re-use the metadata from the Input/OutputBuffer 
/// definition but  I don't know how to access it so I redefine it here
/// </summary>
public struct OutData
{
    public string AddressLine1Clean { get; set; }
    public string AddressCityClean { get; set; }
    public string AddressStateClean { get; set; }
    public string AddressPostalCodeClean { get; set; }
}

/// <summary>
/// This is the class to which to add your code.  Do not change the name, attributes, or parent
/// of this class.
/// </summary>
[Microsoft.SqlServer.Dts.Pipeline.SSISScriptComponentEntryPointAttribute]
public class ScriptMain : UserComponent
{
    List<InData> mData;
    int mBatchSize;

    /// <summary>
    /// This method is called once, before rows begin to be processed in the data flow.
    ///
    /// You can remove this method if you don't need to do anything here.
    /// </summary>
    public override void PreExecute()
    {
        base.PreExecute();

        this.mData = new List<InData>();
        this.mBatchSize = this.Variables.ApiBatchSize;
    }

    /// <summary>
    /// This method is called after all the rows have passed through this component.
    ///
    /// You can delete this method if you don't need to do anything here.
    /// </summary>
    public override void PostExecute()
    {
        base.PostExecute();

    }

    /// <summary>
    /// We're going to work with ProcessInput versus PorcessInputRow as it is
    /// "closer to the bare metal" and we need that
    /// </summary>
    /// <param name="Buffer"></param>
    public override void Input0_ProcessInput(Input0Buffer Row)
    {
        //base.Input0_ProcessInput(Buffer);

        while (Row.NextRow())
        {
            if (this.mData.Count >= this.mBatchSize)
            {
                foreach (var item in ApiCall())
                {
                    Output0Buffer.AddRow();
                    var inRow = item.Key;
                    var outRow = item.Value;

                    // fill columns with original data
                    Output0Buffer.AddressLine1 = inRow.AddressLine1;
                    // etc

                    // fill columns with clean data
                    Output0Buffer.AddressLine1Clean = outRow.AddressLine1Clean;
                    Output0Buffer.AddressCityClean = outRow.AddressCityClean;
                    Output0Buffer.AddressStateClean = outRow.AddressStateClean;
                    Output0Buffer.AddressPostalCodeClean = outRow.AddressPostalCodeClean;
                    // etc
                }

                // TODO Remove this for production, just ensuring batching is working as intended
                bool fireAgain = false;
                string status = "Batch released. Conditions => mDataCount := " + this.mData.Count;
                this.ComponentMetaData.FireInformation(0, "ApiProcessing", status, "", 0, ref fireAgain);

                // Reset for next iteration
                this.mData.Clear();
            }

            this.mData.Add(new InData() { AddressLine1 = Row.AddressLine1 });
        }

        // Handle the final possible partial batch
        if (this.mData.Count > 0)
        {
            foreach (var item in ApiCall())
            {
                Output0Buffer.AddRow();
                var inRow = item.Key;
                var outRow = item.Value;

                // fill columns with original data
                Output0Buffer.AddressLine1 = inRow.AddressLine1;
                // etc

                // fill columns with clean data
                Output0Buffer.AddressLine1Clean = outRow.AddressLine1Clean;
                Output0Buffer.AddressCityClean = outRow.AddressCityClean;
                Output0Buffer.AddressStateClean = outRow.AddressStateClean;
                Output0Buffer.AddressPostalCodeClean = outRow.AddressPostalCodeClean;
                // etc
            }

            // TODO Remove this for production, just ensuring batching is working as intended
            bool fireAgain = false;
            string status = "Final batch released. Conditions => mDataCount := " + this.mData.Count;
            this.ComponentMetaData.FireInformation(0, "ApiProcessing", status, "", 0, ref fireAgain);

            // Reset for next iteration
            this.mData.Clear();

        }
    }

    ///// <summary>
    ///// This method is called once for every row that passes through the component from Input0.
    ///// We need to preserve rows in our own memory allocation
    ///// We're not getting the EndOfRowset call in time to release the final
    ///// </summary>
    ///// <param name="Row">The row that is currently passing through the component</param>
    //public override void Input0_ProcessInputRow(Input0Buffer Row)
    //{
    //}

    public override void CreateNewOutputRows()
    {
        // I don't think we need to do anything special here
        // but I'm leaving it in in case you have some weird case
    }

    /// <summary>
    /// Simulate data cleaning
    /// </summary>
    /// <returns></returns>
    public Dictionary<InData, OutData> ApiCall()
    {
        int macGuffin = 0;
        Dictionary<InData, OutData> cleanData = new Dictionary<InData, OutData>();
        foreach (var item in this.mData)
        {
            cleanData.Add(item, new OutData() { AddressLine1Clean = "Clean" + item.AddressLine1, AddressCityClean = "Clean", AddressPostalCodeClean = "12345-1234", AddressStateClean = "CL"  });
            macGuffin = macGuffin % this.mBatchSize;
        }

        return cleanData;
    }

}

Screenshots of the Script Component

This is where we make SSIS level variables available to the script component. I have selected ApiBatchSize The Script Tab lists the SSIS Variable User::ApiBatchSize is a member of the ReadOnlyVariables collection

In the Input Columns tab, I have selected all the columns that need to pass through and I mark them as ReadOnly usage type. Input Columns tab lists the column AddressLine1 as ReadOnly

In the Inputs and Outputs tab, the first thing I do is navigate to Output 0 and change the SynchronousInputID from something like "Script Component.Inputs[Input 0]" to None

A screenshot setting the Properties of SynchronousInputID to None on the Output buffer definition

Define all the columns you'll need. I duplicate my original columns (AddressLine1) and then add all the new columns my processing will be able to fill (AddressLine1Clean, city/state/postal code). Under Output 0, select the Output Columns collection and repeatedly push "Add Column" and configure. Besides providing a name, I changed all the data types to string (DT_STR) here as that's what I'm working with. The default is the 32 bit integer type (DT_I4)

Screenshot of having added the Clean columns. AddressLine1Clean is shown with a data type of DT_STR and a non-default length of 70

Note that this screenshot does not have the original column(s) in there but you'll need to add it for the code to work.

There may be newer books out there, but this out of print book by the Program Manager for when SSIS was introduced is what I still reference when I run into scripting questions.

The Rational Guide to Scripting SQL Server 2005 Integration Services Beta Preview (Rational Guides) by Donald Farmer, Derek Farmer Paperback, 192 Pages, Published 2005 ISBN-10: 1-932577-21-1 / 1932577211 ISBN-13: 978-1-932577-21-1 / 9781932577211

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.