I am learning about Tasks and also writing PowerShell cmdlets in C#. I found that connecting to remote machines was very slow, so I wrote this bit of code to speed it up using Tasks. I am hoping to get some feedback regarding the parallel processing and any other pointers in general.
The code is committed to a Github repo.
TaskCmdlet.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Management.Automation;
using System.Threading;
using System.Threading.Tasks;
namespace PoshTasks.Cmdlets
{
public abstract class TaskCmdlet<TIn, TOut> : Cmdlet where TIn : class
where TOut : class
{
#region Parameters
[Parameter(ValueFromPipeline = true)]
public TIn[] InputObject { get; set; }
#endregion
#region Abstract methods
/// <summary>
/// Performs an action on <paramref name="server"/>
/// </summary>
/// <param name="input">The <see cref="object"/> to be processed; null if not processing input</param>
/// <returns>A <see cref="T"/></returns>
protected abstract TOut ProcessTask(TIn input = null);
#endregion
#region Virtual methods
/// <summary>
/// Generates a collection of tasks to be processed
/// </summary>
/// <returns>A collection of tasks</returns>
protected virtual IEnumerable<Task<TOut>> GenerateTasks()
{
List<Task<TOut>> tasks = new List<Task<TOut>>();
if (InputObject != null)
foreach (TIn input in InputObject)
tasks.Add(Task.Run(() => ProcessTask(input)));
else
tasks.Add(Task.Run(() => ProcessTask()));
return tasks;
}
/// <summary>
/// Performs the pipeline output for this cmdlet
/// </summary>
/// <param name="result"></param>
protected virtual void PostProcessTask(TOut result)
{
WriteObject(result, true);
}
#endregion
#region Processing
/// <summary>
/// Processes cmdlet operation
/// </summary>
protected override void ProcessRecord()
{
IEnumerable<Task<TOut>> tasks = GenerateTasks();
foreach (Task<Task<TOut>> bucket in Interleaved(tasks))
{
try
{
Task<TOut> task = bucket.Result;
TOut result = task.Result;
PostProcessTask(result);
}
catch (Exception e) when (e is PipelineStoppedException || e is PipelineClosedException)
{
// do nothing if pipeline stops
}
catch (Exception e)
{
WriteError(new ErrorRecord(e, e.GetType().Name, ErrorCategory.NotSpecified, this));
}
}
}
/// <summary>
/// Interleaves the tasks
/// </summary>
/// <param name="tasks">The collection of <see cref="Task{TOut}"/></param>
/// <returns>An array of task tasks</returns>
protected Task<Task<TOut>>[] Interleaved(IEnumerable<Task<TOut>> tasks)
{
TaskCompletionSource<Task<TOut>>[] buckets = new TaskCompletionSource<Task<TOut>>[tasks.Count()];
Task<Task<TOut>>[] results = new Task<Task<TOut>>[buckets.Length];
for (int i = 0; i < buckets.Length; i++)
{
buckets[i] = new TaskCompletionSource<Task<TOut>>();
results[i] = buckets[i].Task;
}
int nextTaskIndex = -1;
foreach (Task<TOut> task in tasks)
task.ContinueWith(completed =>
{
TaskCompletionSource<Task<TOut>> bucket = buckets[Interlocked.Increment(ref nextTaskIndex)];
bucket.TrySetResult(completed);
},
CancellationToken.None,
TaskContinuationOptions.None,
TaskScheduler.Default);
return results;
}
#endregion
}
}
GetRemoteService.cs (a sample implementation)
using PoshTasks.Cmdlets;
using System.Collections.Generic;
using System.Linq;
using System.Management.Automation;
using System.ServiceProcess;
namespace PoshTasks.Sample
{
[Cmdlet(VerbsCommon.Get, "RemoteService")]
public class GetRemoteService : TaskCmdlet<string, ServiceController[]>
{
#region Parameters
/// <summary>
/// Gets or sets the collection of requested service names
/// </summary>
[Parameter]
public string[] Name { get; set; }
#endregion
#region Processing
/// <summary>
/// Processes a single remote service lookup
/// </summary>
/// <param name="server">The remote machine name</param>
/// <returns>A collection of <see cref="ServiceController"/>s from the remote machine</returns>
protected override ServiceController[] ProcessTask(string server)
{
ServiceController[] services = ServiceController.GetServices(server);
if (Name != null)
return services.Where(s => Name.Contains(s.DisplayName)).ToArray();
return services;
}
/// <summary>
/// Generates custom service object and outputs to pipeline
/// </summary>
/// <param name="result">The collection of remote services</param>
protected override void PostProcessTask(ServiceController[] result)
{
List<dynamic> services = new List<dynamic>();
foreach (ServiceController service in result)
services.Add(new
{
Name = service.DisplayName,
Status = service.Status,
ComputerName = service.MachineName,
CanPause = service.CanPauseAndContinue
});
WriteObject(services, true);
}
#endregion
}
}
If not cloning the repo you will need the Microsoft.PowerShell.5.ReferenceAssemblies Nuget package and to reference System.ServiceProcess.