1

I'm looking to implement a simplistic distributed work queue system with StackExchange.Redis.

I understand the reason for not having BLPOP etc, but as it stands the interface I'm working to is based on repeated TryRead calls with a timeout.

I'm hesistant with the below, since I'm unsubscribing in the handler, and setting a flag to cancel the timeout. Is there any chance something might be missed? Is there a different approach to achieve this?

    public string TryRead(string queueName, TimeSpan timeout)
    {
        string result = null;

        var chanName = $"qnot_{queueName}";
        var done = new ManualResetEvent(false);

        void Handler(RedisChannel chan, RedisValue val)
        {
            _sub.Unsubscribe(chanName, Handler);
            result = _database.ListRightPop($"qdata_{queueName}");
            done.Set();
        }

        _sub.Subscribe(chanName, Handler);
        done.WaitOne(timeout);

        return result;
    }

    public void Write(string queueName, string text)
    {
        _database.ListLeftPush($"qdata_{queueName}", text);
        _sub.Publish($"qnot_{queueName}", "");
    }

The above version will always timeout and return null in the case that there's an existing item on the queue (and nothing new is added). The below version now checks for existing data first, which works. But it has a bug, a race condition: if the first read check comes back negative, THEN something is pushed and a notification sent, THEN we subscribe and wait for the timeout.

    public string TryRead(string queueName, TimeSpan timeout)
    {
        var dataName = $"qdata_{queueName}";

        var result = (string)_database.ListRightPop(dataName);
        if (result != null)
        {
            return result;
        }

        var chanName = $"qnot_{queueName}";
        var done = new ManualResetEvent(false);

        void Handler(RedisChannel chan, RedisValue val)
        {
            _sub.Unsubscribe(chanName, Handler);
            result = _database.ListRightPop(dataName);
            done.Set();
        }

        _sub.Subscribe(chanName, Handler);
        done.WaitOne(timeout);

        return result;
    }

I can do RPOPs in a loop, but that would seem to absolutely suck. Anyone else done something similar?

2
  • I'm confused why you unsubscribe when handling - surely the reader should be essentially: reading everything? not just one? Commented Sep 29, 2017 at 13:50
  • I'm retrofitting this to a system which currently uses Azure and AWS queues, and to keep things consistent I'm looking to stick to the same signature for TryRead. It's self-contained and just blocks until timeout @MarcGravell Commented Sep 29, 2017 at 15:31

1 Answer 1

1

I ended up with this, which works, but I would still welcome other answers with a viable approach:

    public string TryRead(string queueName, TimeSpan timeout)
    {
        var timer = Stopwatch.StartNew();
        var dataName = $"{_keyPrefix}qdata_{queueName}";
        var chanName = $"{_keyPrefix}qnot_{queueName}";
        var done = new AutoResetEvent(false);
        string result;

        // subscribe - sets the 'done' flag when a new item is pushed
        void Handler(RedisChannel chan, RedisValue val)
        {
            done.Set();
        }

        _sub.Subscribe(chanName, Handler);

        do
        {
            // try to read right away (before waiting), in case there was data already there
            result = _database.ListRightPop(dataName);
            if (result != null)
            {
                continue;
            }

            // there wasn't an item right away, so wait for the timeout to expire
            // or the subscription to be fired.  if it fired, try the read again
            var remainingTime = timeout - timer.Elapsed;
            if (remainingTime.TotalMilliseconds <= 1.0)
            {
                break;
            }
            if (done.WaitOne(remainingTime))
            {
                result = _database.ListRightPop(dataName);
            }
        } while (result == null && timer.Elapsed < timeout);

        _sub.Unsubscribe(chanName, Handler);

        return result;
    }

Edit: updated w/AutoResetEvent and removed Unsubscribe from the handler. Note to those who find this, this seems to work for me as a drop-in replacement for a single, blocking read, but it's not going to be the recommended approach. I'm only using this because I'm looking to keep consistency with other queue implementations and am working to this specific TryRead signature.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.