To be honest I believe that your approach outlined above needs a considerable amount of work before releasing to production. Imagine the situation where 1000 users click on your site - you now have 1000 background tasks all trying to send messages at the same time. This will probably bottleneck your system for disk and network IO.
While there are a number of ways to approach this problem, one of the most common is to use a producer-consumer queue, preferably using a thread safe collection such as ConcurrentQueue with the number of active long running, e.g. emailing threads in process at any time controlled by a synchronization mechanism such as SemaphoreSlim
I've created a very simple application to demonstrate this approach as shown below. The key classes in this are
- The MessageProcessor class which maintains the queue and controls access for both adding items AddToQueue method and sending messages ReadFromQueue. The class itself implements the Singleton pattern to ensure only one instance is ever present in the application (you don't want multiple queues). The ReadFromQueue method also implements a timer (set for 2 seconds) which specifies how often a task should be spawned to send a message.
- The SingletonBase class is just an abstract class for implementing the Singleton pattern
- The MessageSender class is used for the actual work of sending the message
- The CreateMessagesForTest class simply simulates creating test messages for the purpose of this answer
Hope this helps
using System;
using System.Collections.Concurrent;
using System.Globalization;
using System.Reactive.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApplication9
{
internal class Program
{
private static void Main(string[] args)
{
MessagingProcessor.Instance.ReadFromQueue(); // starts the message sending tasks
var createMessages = new CreateMessagesForTest();
createMessages.CreateTestMessages(); // creates sample test messages for processing
Console.ReadLine();
}
}
/// <summary>
/// Simply creates test messages every second for sending
/// </summary>
public class CreateMessagesForTest
{
public void CreateTestMessages()
{
IObservable<long> observable = Observable.Interval(TimeSpan.FromSeconds(1));
// Token for cancelation
var source = new CancellationTokenSource();
// Create task to execute.
Action action = (CreateMessage);
// Subscribe the obserable to the task on execution.
observable.Subscribe(x =>
{
var task = new Task(action);
task.Start();
}, source.Token);
}
private static void CreateMessage()
{
var message = new Message {EMailAddress = "[email protected]", MessageBody = "abcdefg"};
MessagingProcessor.Instance.AddToQueue(message);
}
}
/// <summary>
/// The conents of the email to send
/// </summary>
public class Message
{
public string EMailAddress { get; set; }
public string MessageBody { get; set; }
}
/// <summary>
/// Handles all aspects of processing the messages, only one instance of this class is allowed
/// at any time
/// </summary>
public class MessagingProcessor : SingletonBase<MessagingProcessor>
{
private MessagingProcessor()
{
}
private ConcurrentQueue<Message> _messagesQueue = new ConcurrentQueue<Message>();
// create a semaphore to limit the number of threads which can send an email at any given time
// In this case only allow 2 to be processed at any given time
private static readonly SemaphoreSlim Semaphore = new SemaphoreSlim(2, 2);
public void AddToQueue(Message message)
{
_messagesQueue.Enqueue(message);
}
/// <summary>
/// Used to start the process for sending emails
/// </summary>
public void ReadFromQueue()
{
IObservable<long> observable = Observable.Interval(TimeSpan.FromSeconds(2));
// Token for cancelation
var source = new CancellationTokenSource();
// Create task to execute.
Action action = (SendMessages);
// Subscribe the obserable to the task on execution.
observable.Subscribe(x =>
{
var task = new Task(action);
task.Start();
}, source.Token);
}
/// <summary>
/// Handles dequeing and syncronisation to the queue
/// </summary>
public void SendMessages()
{
try
{
Semaphore.Wait();
Message message;
while (_messagesQueue.TryDequeue(out message)) // if we have a message to send
{
var messageSender = new MessageSender();
messageSender.SendMessage(message);
}
}
finally
{
Semaphore.Release();
}
}
}
/// <summary>
/// Sends the emails
/// </summary>
public class MessageSender
{
public void SendMessage(Message message)
{
// do some long running task
}
}
/// <summary>
/// Implements singleton pattern on all classes which derive from it
/// </summary>
/// <typeparam name="T">Derived class</typeparam>
public abstract class SingletonBase<T> where T : class
{
public static T Instance
{
get { return SingletonFactory.Instance; }
}
/// <summary>
/// The singleton class factory to create the singleton instance.
/// </summary>
private class SingletonFactory
{
static SingletonFactory()
{
}
private SingletonFactory()
{
}
internal static readonly T Instance = GetInstance();
private static T GetInstance()
{
var theType = typeof (T);
T inst;
try
{
inst = (T) theType
.InvokeMember(theType.Name,
BindingFlags.CreateInstance | BindingFlags.Instance
| BindingFlags.NonPublic,
null, null, null,
CultureInfo.InvariantCulture);
}
catch (MissingMethodException ex)
{
var exception = new TypeLoadException(string.Format(
CultureInfo.CurrentCulture,
"The type '{0}' must have a private constructor to " +
"be used in the Singleton pattern.", theType.FullName)
, ex);
//LogManager.LogException(LogManager.EventIdInternal, exception, "error in instantiating the singleton");
throw exception;
}
return inst;
}
}
}
}