4

We have a Java EE 5 based JSF application which runs on 2 WebLogic application servers, which share an Oracle database.

For some use cases it's crucial, that only one node does the operations within the database, which are typically permanent background jobs. Therefore the idea was, that one node (the 'master') obtains some sort of lock within the database, and the other node (the 'slave') recognizes the lock and does nothing for those use cases as long as the master is available. Only if the first node becomes unavailable, the second node should take over the work and thus from there on holds the lock itself.

My question now is, how would we implement this behaviour (remember, JPA 1.0) and will the lock be automatically released in the database if one node goes down? Or should the whole thing be better done in a different way?

8
  • You could use a table with a unique constraint over a column, both nodes inserting the same value in that column. When a node frees the lock, it deletes the entry. If you can guarantee that the lock is acquired for some max. time, that you could also insert the acquiring time, which will be considered when the lock is acquired (meaning deleting the row if it expired). Otherwise, there is no way to release the lock in DB, when a node goes down. Commented Nov 8, 2013 at 15:39
  • @AndreiI: Thanks for the hint - one clarification (also added above): The use cases in question are no single, short operations, but are doing background jobs permanently - so as long as the master is up, the slave never does anything regarding these use cases. It should only be assured, that not both nodes do the job. Commented Nov 8, 2013 at 15:46
  • In order to solve the problem with long-taking processes, you could make a TimerService that constantly writes (pings) in DB in a table a row [NODE-X, LAST_PING], so that the another node Y can check the LAST_PING of NODE-X if the lock was acquired by that node. Commented Nov 8, 2013 at 17:43
  • I was thinking about that too, so thanks for confirming! Commented Nov 12, 2013 at 21:29
  • It seems you have thought already about some solutions, but you haven't described them in your question. So what else have you thought about? Do you have any better/different ideas? Commented Nov 12, 2013 at 21:39

1 Answer 1

1

Here is a simple solution, similar to what ActiveMQ does to have only one master doing stuff while other running instances are waiting to become master.

package com.despegar.bookedia.message.broker.lock;

import com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.data.jdbc.support.DatabaseType;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.PreparedStatementSetter;
import org.springframework.jdbc.core.ResultSetExtractor;
import org.springframework.jdbc.support.MetaDataAccessException;
import org.springframework.jdbc.support.rowset.SqlRowSet;
import org.springframework.transaction.annotation.Transactional;

import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.Map;

/**
 * Represents an exclusive lock on a database to avoid multiple brokers running
 * against the same logical database.
 * <p>
 * The Lease Database Locker lets the master broker acquire a lock that's valid for a fixed (usually short) duration after which it expires.
 * To retain the lock the master broker must periodically extend the lock's lease before it expires.
 * Simultaneously the slave broker also checks periodically to see if the lease has expired. If, for whatever reason, the master broker fails to update its
 * lease on the lock the slave will take ownership of the lock becoming the new master in the process. The leased lock can survive a DB replica failover.
 * </p>
 * Each broker in the master/slave pair must have a different leaseHolderId attribute, as it is this value that is used to reserve a lease.
 * <p>
 * In the simplest case, the clocks between master and slave must be in sync for this solution to work properly. If the clocks cannot be in sync, the
 * locker can use the system time from the database CURRENT TIME and adjust the timeouts in accordance with their local variance from the DB system time.
 * If maxAllowableDiffFromDBTime is greater than zero the local periods will be adjusted by any delta that exceeds maxAllowableDiffFromDBTime.
 * </p>
 */
public class LeaseDatabaseLocker implements Locker, AutoCloseable {

    private static final Logger LOG = LoggerFactory.getLogger(LeaseDatabaseLocker.class);
    private static final int IM_THE_MASTER_RESULT = 1;

    private int maxAllowableDiffFromDBTime;
    private long diffFromCurrentTime = Long.MAX_VALUE;
    private String leaseHolderId;
    private JdbcTemplate jdbcTemplate;
    private int queryTimeoutInSecs = -1;
    private long lockAcquireSleepInterval;
    private long lockHeldPeriod;

    public LeaseDatabaseLocker(String leaseHolderId, JdbcTemplate jdbcTemplate, int queryTimeout,
                               long lockAcquireSleepInterval, int maxAllowableDiffFromDBTime, long lockHeldPeriod) {
        this.maxAllowableDiffFromDBTime = maxAllowableDiffFromDBTime;
        this.jdbcTemplate = jdbcTemplate;
        this.queryTimeoutInSecs = queryTimeout;
        this.lockAcquireSleepInterval = lockAcquireSleepInterval;
        this.leaseHolderId = leaseHolderId;
        this.lockHeldPeriod = lockHeldPeriod;
    }

    @Transactional
    @Override
    public void acquireLock() {

        LOG.debug("Attempting to acquire the exclusive lock to become the Master broker '{}'", leaseHolderId);

        String sql = Statements.LEASE_OBTAIN_STATEMENT;

        initTimeDiff();

        long now = System.currentTimeMillis() + diffFromCurrentTime;
        long nextLockCheck = now + lockHeldPeriod;

        PreparedStatementSetter preparedStatementSetter = statement -> {
            setQueryTimeoutInSecs(statement);
            statement.setString(Statements.ACQUIRE_LOCK_BROKER_NAME_COL_POSITION, leaseHolderId);
            statement.setLong(Statements.ACQUIRE_LOCK_NEXT_CHECK_COL_POSITION, nextLockCheck);
            statement.setLong(Statements.ACQUIRE_LOCK_TIME_NOW_POSITION, now);
        };

        LOG.trace("executing: '{}' to acquire lock with values {}, {}, {}", Statements.LEASE_OBTAIN_STATEMENT, leaseHolderId, nextLockCheck, now);
        int result = jdbcTemplate.update(sql, preparedStatementSetter);
        LOG.trace("Locking query result: updated rows count {}", result);

        if (result == IM_THE_MASTER_RESULT) {
            // we got the lease, verify we still have it
            LOG.debug("Lock acquired for '{}'", leaseHolderId);
            if (keepLockAlive()) {
                LOG.info("Becoming the master on dataSource: {}", jdbcTemplate.getDataSource());
                return;
            }
        }
        reportLeaseOwnerShipAndDuration();

        LOG.debug("{} failed to acquire lease.  Sleeping for {} milli(s) before trying again...", leaseHolderId, lockAcquireSleepInterval);
        throw new BrokerException.LockNotAcquiredException(leaseHolderId);
    }

    private void reportLeaseOwnerShipAndDuration() {
        String sql = Statements.LEASE_OWNER_STATEMENT;

        SqlRowSet rowSet = jdbcTemplate.queryForRowSet(sql);
        while (rowSet.next()) {
            LOG.debug("{} -  Lease held by {} till {}", leaseHolderId, rowSet.getString(1),
                     Instant.ofEpochMilli(rowSet.getLong(2)));
        }
    }

    private void setQueryTimeoutInSecs(Statement statement) throws SQLException {
        if (queryTimeoutInSecs > 0) {
            statement.setQueryTimeout(queryTimeoutInSecs);
        }
    }

    private long initTimeDiff() {
        if (Long.MAX_VALUE == diffFromCurrentTime) {
            if (maxAllowableDiffFromDBTime > 0) {
                diffFromCurrentTime = determineTimeDifference();
            } else {
                diffFromCurrentTime = 0l;
            }
        }
        return diffFromCurrentTime;
    }

    protected long determineTimeDifference() {

        ResultSetExtractor<Timestamp> timestampExtractor = rs -> {
            rs.next();
            return rs.getTimestamp(1);
        };
        Timestamp timestamp = jdbcTemplate.query(Statements.utcTimestamp(jdbcTemplate), timestampExtractor);

        long result = 0L;
        long diff = System.currentTimeMillis() - timestamp.getTime();
        if (Math.abs(diff) > maxAllowableDiffFromDBTime) {
            // off by more than maxAllowableDiffFromDBTime so lets adjust
            result = -diff;
        }
        LOG.info("{} diff adjust from db: {}, db time: {}", leaseHolderId, result, timestamp);
        return result;
    }

    @Transactional
    public boolean keepLockAlive() {
        boolean result;
        final String sql = Statements.LEASE_UPDATE_STATEMENT;

        initTimeDiff();

        final long now = System.currentTimeMillis() + diffFromCurrentTime;
        final long nextLockCheck = now + lockHeldPeriod;

        PreparedStatementSetter statementSetter = statement -> {
            setQueryTimeoutInSecs(statement);
            statement.setString(Statements.KEEP_LOCK_NEW_BROKER_NAME_COL_POSITION, leaseHolderId);
            statement.setLong(Statements.KEEP_LOCK_NEXT_CHECK_COL_POSITION, nextLockCheck);
            statement.setString(Statements.KEEP_LOCK_BROKER_NAME_COL_POSITION, leaseHolderId);
        };

        LOG.trace("executing: '{}' to keep lock alive with values {}, {}", Statements.LEASE_UPDATE_STATEMENT, leaseHolderId, nextLockCheck);
        result = jdbcTemplate.update(sql, statementSetter) == IM_THE_MASTER_RESULT;

        if (!result) {
            reportLeaseOwnerShipAndDuration();
        }

        return result;
    }

    private void releaseLease() {
        String sql = Statements.LEASE_UPDATE_STATEMENT;

        final int lockReleaseTime = 1;
        PreparedStatementSetter statementSetter = statement -> {
            statement.setString(Statements.RELEASE_LOCK_NEW_BROKER_NAME_COL_POSITION, leaseHolderId);
            statement.setLong(Statements.RELEASE_LOCK_NEXT_CHECK_COL_POSITION, lockReleaseTime);
            statement.setString(Statements.RELEASE_LOCK_BROKER_NAME_COL_POSITION, leaseHolderId);
        };

        LOG.trace("executing: '{}' to release lock with values {}, {}, {}", sql, leaseHolderId, 1, leaseHolderId);
        if (jdbcTemplate.update(sql, statementSetter) == IM_THE_MASTER_RESULT) {
            LOG.info("{}, released lease", leaseHolderId);
        }
    }

    @Override
    public void close() throws Exception {
        releaseLease();
    }

    static class Statements {

        public static final String LOCK_TABLE_NAME = "MSG_BROKER_LOCK";

        public static final Map<DatabaseType, String> CURRENT_DATE_TIME_UTC = ImmutableMap.of(DatabaseType.MYSQL,   "SELECT UTC_TIMESTAMP",
                                                                                              DatabaseType.H2,      "SELECT CURRENT_TIMESTAMP");

        public static final String LEASE_UPDATE_STATEMENT =
                String.format("UPDATE %s SET BROKER_NAME=?, %s.TIME=?  WHERE BROKER_NAME=? AND ID = 1", LOCK_TABLE_NAME, LOCK_TABLE_NAME);

        public static final String LEASE_OWNER_STATEMENT =
                String.format("SELECT BROKER_NAME, %s.TIME FROM %s WHERE ID = 1", LOCK_TABLE_NAME, LOCK_TABLE_NAME);

        public static final String LEASE_OBTAIN_STATEMENT =
                String.format("UPDATE %s SET BROKER_NAME=?, %s.TIME=? WHERE (%s.TIME IS NULL OR %s.TIME < ?) AND ID = 1",
                              LOCK_TABLE_NAME, LOCK_TABLE_NAME, LOCK_TABLE_NAME, LOCK_TABLE_NAME);

        //Acquire constants
        public static final int ACQUIRE_LOCK_BROKER_NAME_COL_POSITION = 1;
        public static final int ACQUIRE_LOCK_NEXT_CHECK_COL_POSITION = 2;
        public static final int ACQUIRE_LOCK_TIME_NOW_POSITION = 3;

        //Keep lock alive constants
        public static final int KEEP_LOCK_NEW_BROKER_NAME_COL_POSITION = 1;
        public static final int KEEP_LOCK_NEXT_CHECK_COL_POSITION = 2;
        public static final int KEEP_LOCK_BROKER_NAME_COL_POSITION = 3;

        //Release lock constants
        public static final int RELEASE_LOCK_NEW_BROKER_NAME_COL_POSITION = 1;
        public static final int RELEASE_LOCK_NEXT_CHECK_COL_POSITION = 2;
        public static final int RELEASE_LOCK_BROKER_NAME_COL_POSITION = 3;

        private Statements() {}

        private static String utcTimestamp(JdbcTemplate jdbcTemplate) {
            DatabaseType dbType;
            try {
                dbType = DatabaseType.fromMetaData(jdbcTemplate.getDataSource());
            } catch (MetaDataAccessException e) {
                throw new DataAccessResourceFailureException("Unable to determine database type: ", e);
            }
            String query = CURRENT_DATE_TIME_UTC.get(dbType);
            if(query == null) {
                throw new RuntimeException("Unrecognized DatabaseType: " + dbType);
            }
            return query;
        }
    }
}
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.