I'm looking to implement a simplistic distributed work queue system with StackExchange.Redis.
I understand the reason for not having BLPOP etc, but as it stands the interface I'm working to is based on repeated TryRead calls with a timeout.
I'm hesistant with the below, since I'm unsubscribing in the handler, and setting a flag to cancel the timeout. Is there any chance something might be missed? Is there a different approach to achieve this?
public string TryRead(string queueName, TimeSpan timeout)
{
string result = null;
var chanName = $"qnot_{queueName}";
var done = new ManualResetEvent(false);
void Handler(RedisChannel chan, RedisValue val)
{
_sub.Unsubscribe(chanName, Handler);
result = _database.ListRightPop($"qdata_{queueName}");
done.Set();
}
_sub.Subscribe(chanName, Handler);
done.WaitOne(timeout);
return result;
}
public void Write(string queueName, string text)
{
_database.ListLeftPush($"qdata_{queueName}", text);
_sub.Publish($"qnot_{queueName}", "");
}
The above version will always timeout and return null in the case that there's an existing item on the queue (and nothing new is added). The below version now checks for existing data first, which works. But it has a bug, a race condition: if the first read check comes back negative, THEN something is pushed and a notification sent, THEN we subscribe and wait for the timeout.
public string TryRead(string queueName, TimeSpan timeout)
{
var dataName = $"qdata_{queueName}";
var result = (string)_database.ListRightPop(dataName);
if (result != null)
{
return result;
}
var chanName = $"qnot_{queueName}";
var done = new ManualResetEvent(false);
void Handler(RedisChannel chan, RedisValue val)
{
_sub.Unsubscribe(chanName, Handler);
result = _database.ListRightPop(dataName);
done.Set();
}
_sub.Subscribe(chanName, Handler);
done.WaitOne(timeout);
return result;
}
I can do RPOPs in a loop, but that would seem to absolutely suck. Anyone else done something similar?
TryRead. It's self-contained and just blocks until timeout @MarcGravell