I am glad I didn't try to freehand this as there were a bunch of fine points I had forgotten.
A few bits here that are worth noting: my two data structures InData and OutData and you'll need to configure those to keep track of whatever is going to be in the input/output buffers. As the comments state, there might be a clever way to clone the Buffer objects' properties but I didn't see how to. Define these to match the data types in your data flow and if you're lazy like me, use the same column names and you can copy/paste your way to success.
ApiCall is a dummy method that uses our cached values to ask the data cleansing service to do its thing. It needs to return the cleaned data so we can marry the inputs and outputs into a unified row. There's likely a better way to do this but hopefully it's good enough for getting your thought processes firing.
I created an SSIS level variable, @[User::ApiBatchSize] which you would initialize to 500. Using this approach will allow you to optimize the sent batch size without changing the core code. I initialize our local member variable in the PreExecute method because that's the constructor-ish for the script component.
Normally, in an asynchronous script component, you're working with the ProcessInputRow method and that's what I originally did but ran into an issue with the final batch if the size of the list an even multiple of the apiBatchSize. Turns out, the EndOfRowset() was never getting set to True in the method. No worries, we simply need to work with the ProcessInput method. In the "normal" world, process input method causes the process input row to process a row so we're going to skip the middle man and just work directly with the buffer in ProcessInput. I was lazy and didn't rename my Row references to Buffer as the auto-generated code originally addressed the parameter.
The pseudo logic here is
- While there's data row
-
- if we have hit our batch size, send our data collection off for processing
-
-
- For each processed row, add a row to the output buffer and fill it with clean data
-
- Empty our collection bucket (it's already been sent downstream)
- Add the current row to our collection bucket
The C# itself
using System;
using System.Data;
using System.Collections.Generic;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
/// <summary>
/// There might be a clever way to re-use the metadata from the Input/OutputBuffer
/// definition but I don't know how to access it so I redefine it here
/// </summary>
public struct InData
{
public string AddressLine1 { get; set; }
}
/// <summary>
/// There might be a clever way to re-use the metadata from the Input/OutputBuffer
/// definition but I don't know how to access it so I redefine it here
/// </summary>
public struct OutData
{
public string AddressLine1Clean { get; set; }
public string AddressCityClean { get; set; }
public string AddressStateClean { get; set; }
public string AddressPostalCodeClean { get; set; }
}
/// <summary>
/// This is the class to which to add your code. Do not change the name, attributes, or parent
/// of this class.
/// </summary>
[Microsoft.SqlServer.Dts.Pipeline.SSISScriptComponentEntryPointAttribute]
public class ScriptMain : UserComponent
{
List<InData> mData;
int mBatchSize;
/// <summary>
/// This method is called once, before rows begin to be processed in the data flow.
///
/// You can remove this method if you don't need to do anything here.
/// </summary>
public override void PreExecute()
{
base.PreExecute();
this.mData = new List<InData>();
this.mBatchSize = this.Variables.ApiBatchSize;
}
/// <summary>
/// This method is called after all the rows have passed through this component.
///
/// You can delete this method if you don't need to do anything here.
/// </summary>
public override void PostExecute()
{
base.PostExecute();
}
/// <summary>
/// We're going to work with ProcessInput versus PorcessInputRow as it is
/// "closer to the bare metal" and we need that
/// </summary>
/// <param name="Buffer"></param>
public override void Input0_ProcessInput(Input0Buffer Row)
{
//base.Input0_ProcessInput(Buffer);
while (Row.NextRow())
{
if (this.mData.Count >= this.mBatchSize)
{
foreach (var item in ApiCall())
{
Output0Buffer.AddRow();
var inRow = item.Key;
var outRow = item.Value;
// fill columns with original data
Output0Buffer.AddressLine1 = inRow.AddressLine1;
// etc
// fill columns with clean data
Output0Buffer.AddressLine1Clean = outRow.AddressLine1Clean;
Output0Buffer.AddressCityClean = outRow.AddressCityClean;
Output0Buffer.AddressStateClean = outRow.AddressStateClean;
Output0Buffer.AddressPostalCodeClean = outRow.AddressPostalCodeClean;
// etc
}
// TODO Remove this for production, just ensuring batching is working as intended
bool fireAgain = false;
string status = "Batch released. Conditions => mDataCount := " + this.mData.Count;
this.ComponentMetaData.FireInformation(0, "ApiProcessing", status, "", 0, ref fireAgain);
// Reset for next iteration
this.mData.Clear();
}
this.mData.Add(new InData() { AddressLine1 = Row.AddressLine1 });
}
// Handle the final possible partial batch
if (this.mData.Count > 0)
{
foreach (var item in ApiCall())
{
Output0Buffer.AddRow();
var inRow = item.Key;
var outRow = item.Value;
// fill columns with original data
Output0Buffer.AddressLine1 = inRow.AddressLine1;
// etc
// fill columns with clean data
Output0Buffer.AddressLine1Clean = outRow.AddressLine1Clean;
Output0Buffer.AddressCityClean = outRow.AddressCityClean;
Output0Buffer.AddressStateClean = outRow.AddressStateClean;
Output0Buffer.AddressPostalCodeClean = outRow.AddressPostalCodeClean;
// etc
}
// TODO Remove this for production, just ensuring batching is working as intended
bool fireAgain = false;
string status = "Final batch released. Conditions => mDataCount := " + this.mData.Count;
this.ComponentMetaData.FireInformation(0, "ApiProcessing", status, "", 0, ref fireAgain);
// Reset for next iteration
this.mData.Clear();
}
}
///// <summary>
///// This method is called once for every row that passes through the component from Input0.
///// We need to preserve rows in our own memory allocation
///// We're not getting the EndOfRowset call in time to release the final
///// </summary>
///// <param name="Row">The row that is currently passing through the component</param>
//public override void Input0_ProcessInputRow(Input0Buffer Row)
//{
//}
public override void CreateNewOutputRows()
{
// I don't think we need to do anything special here
// but I'm leaving it in in case you have some weird case
}
/// <summary>
/// Simulate data cleaning
/// </summary>
/// <returns></returns>
public Dictionary<InData, OutData> ApiCall()
{
int macGuffin = 0;
Dictionary<InData, OutData> cleanData = new Dictionary<InData, OutData>();
foreach (var item in this.mData)
{
cleanData.Add(item, new OutData() { AddressLine1Clean = "Clean" + item.AddressLine1, AddressCityClean = "Clean", AddressPostalCodeClean = "12345-1234", AddressStateClean = "CL" });
macGuffin = macGuffin % this.mBatchSize;
}
return cleanData;
}
}
Screenshots of the Script Component
This is where we make SSIS level variables available to the script component. I have selected ApiBatchSize

In the Input Columns tab, I have selected all the columns that need to pass through and I mark them as ReadOnly usage type.

In the Inputs and Outputs tab, the first thing I do is navigate to Output 0 and change the SynchronousInputID from something like "Script Component.Inputs[Input 0]" to None

Define all the columns you'll need. I duplicate my original columns (AddressLine1) and then add all the new columns my processing will be able to fill (AddressLine1Clean, city/state/postal code). Under Output 0, select the Output Columns collection and repeatedly push "Add Column" and configure. Besides providing a name, I changed all the data types to string (DT_STR) here as that's what I'm working with. The default is the 32 bit integer type (DT_I4)

Note that this screenshot does not have the original column(s) in there but you'll need to add it for the code to work.
There may be newer books out there, but this out of print book by the Program Manager for when SSIS was introduced is what I still reference when I run into scripting questions.
The Rational Guide to Scripting SQL Server 2005 Integration Services Beta Preview (Rational Guides)
by Donald Farmer, Derek Farmer
Paperback, 192 Pages, Published 2005
ISBN-10: 1-932577-21-1 / 1932577211
ISBN-13: 978-1-932577-21-1 / 9781932577211