2

I would like to find out a good way to go about implementing a jobs queue using postgres and PDO (php).

Basically I have an events table where the app's events are logged and some form of scheduled processor (say proc) that will regularly take care of retrieving an event at a time and execute certain routines in response to it (and depending on the nature of the event it self).

Clearly, as soon as an instance of proc starts working on an event, I need to mark the row as ongoing, like that:

UPDATE events SET status = "ongoing" WHERE id = 3; -- >> QUERY 1 <<

Fine! proc can now do its business according to the type of event and its payload and no other thread will deal with the event of id = 3 as it is now ongoing.

When proc is done with event 3 it marks it as 'resolved' so that, again, no other thread will, in the future, take care of event 3. Here we go:

UPDATE events SET status = "resolved" WHERE id = 3; -- >> QUERY 2 <<

Now my concern is that this must be done inside a transaction, so I would have:

BEGIN; 
-- QUERY 1
-- VARIOUS OTHER QUERIES TAKING A LOT OF TIME
-- QUERY 2
COMMIT;

As far as I know, when inside a transaction, the change operated by QUERY 1 is only visible to other threads when the whole transaction is committed. That implies that while proc (instance 1) is doing the time consuming work (the code between QUERY 1 and QUERY 2) some other instance of it might read the events table and think that no one is taking care of event 3 and move on doing stuff with it. Clearly that would mess up the whole thing and corrupt the state of the queue.

So my question is: how do I preserve the transactional style of proc and, at the same time, make the change of state of event 3 (from free to ongoing) immediately visible from outside the transaction?

4 Answers 4

1

As written, another worker trying to claim the job would block at query 1. It can see the old version of the row, but cannot update it--it would block.

So don't do it in a single transaction. Claim and commit; do the work; then resolve and commit. Any workers coming along will see that the row is already claimed. Also, you can see that it is claimed, which will help you in debugging and monitoring.

When you claim the row you should mark with something distinctive (a pid, if there only one worker machine, or a hostname and pid, if there are several) rather than simply with 'ongoing'. That way if a worker dies you can manually clean up after it.

Sign up to request clarification or add additional context in comments.

Comments

0

As it is presented, this is not possible. PostgreSQL doesn't have dirty reads, and QUERY1 is pointless since its effect will be overrided by QUERY2 before ever being visible.

But even if it was committed and visible immediately (if committed independantly), this wouldn't be satisfying anyway. In a high concurrency environment, the time between the SELECT of a row in the queue and its UPDATE with the ongoing state is enough for another worker to SELECT it too and create the confusion you want to avoid.

I think a close alternative to your design that should work can be achieved by replacing your QUERY1 with an advisory lock on the queue ID.

Pseudo-code:

BEGIN;
SELECT pg_try_advisory_xact_lock(3) INTO result;
IF result=true THEN
  -- grabbed the exclusive right to process this entry
  -- recheck the status now that the lock is taken
  SELECT status INTO var_status FROM events WHERE id=3;
  IF var_status='needs-to-be-done' THEN
     -- do the work...
     -- work is done
     UPDATE events SET status = 'resolved' WHERE id = 3;
  END IF;
ELSE
 -- nothing to do, another worker is on it
END IF;
COMMIT;

This kind of lock is automatically released at the end of the transaction. Contrary to the SELECT followed by UPDATE, the lock is guaranteed to be granted or denied atomically.

2 Comments

This has a race condition. The release of the advisory lock can be seen by other transactions immediately when the locking transaction commits, but the update it did cannot be seen by others until the others start a new transaction.
@jjanes: yes, good point. I think this should be fixed by having the worker recheck the status from its own snapshot and once the lock is taken. I've changed the code accordingly.
0

You cannot see changes made in a transaction, from outside of that transaction by definition.

Transactions are a fundamental concept of all database systems. The essential point of a transaction is that it bundles multiple steps into a single, all-or-nothing operation. The intermediate states between the steps are not visible to other concurrent transactions, and if some failure occurs that prevents the transaction from completing, then none of the steps affect the database at all.

For concurrency issues, I would recommend using the serializable transaction isolation level and / or row-level locking.

2 Comments

Fine. This is not an answer to my problem though. What strategy shall I adopt to solve my problem?
Fine. Answer to your question is: no way. But, you can solve your problem with serializable transactions and/or row-level locking (mixing the two, or just one of them), but they are more complicated than a single answer - you should check them at postgres' own docs.
0

Postgres does have a simple way to implement dirty reads:

Query 1: Read the first available record and mark it as ongoing

    UPDATE events SET status='ongoing', status_updated_at=clock_timestamp() WHERE id IN (
        SELECT id FROM events
        WHERE status!='ongoing'
        AND status!='resolved'
        ORDER BY id LIMIT 10
        FOR UPDATE SKIP LOCKED
    ) RETURNING *;

Process those events in the code that doesn't lock these records or the entire table. If they are events assumption here that they are non-mutable anyways, so no one should be updating them.

Query 2: Mark those events as resolved once processed.

    UPDATE events SET status='resolved', status_updated_at=clock_timestamp()
      WHERE id IN (<eventIds captured from above>)

Let me know if i am missing anything here 🙏

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.