2013-11-08 4 views
2

У нас есть приложение JSF на основе Java EE 5, которое работает на двух серверах приложений WebLogic, которые совместно используют базу данных Oracle.Блокировка базы данных для синхронизации приложений

В некоторых случаях важно, чтобы только один узел выполнял операции в базе данных, которые обычно являются постоянными фоновыми заданиями. Поэтому идея заключалась в том, что один узел («мастер») получает какую-то блокировку в базе данных, а другой узел («подчиненный») распознает блокировку и ничего не делает для тех случаев использования, пока мастер доступен , Только если первый узел становится недоступным, второй узел должен взять на себя работу и, следовательно, оттуда удерживает сам замок.

Вопрос теперь в том, как мы будем реализовывать это поведение (помните, JPA 1.0), и будет ли автоматическая освобождение блокировки в базе данных, если один узел опустится? Или все должно быть лучше сделано по-другому?

+0

Вы можете использовать таблицу с уникальным ограничением над столбцом, причем оба узла вставляют одно и то же значение в этот столбец. Когда узел освобождает блокировку, он удаляет запись. Если вы можете гарантировать, что блокировка будет получена для некоторого макс. что вы также можете вставить время получения, которое будет учитываться при приобретении блокировки (что означает удаление строки, если она истекло). В противном случае невозможно освободить блокировку в БД, когда узел опустится. –

+0

@ AndreiI: Спасибо за подсказку - одно разъяснение (также добавлено выше): рассматриваемые варианты использования - это не одиночные короткие операции, но постоянно выполняются фоновые задания - до тех пор, пока хозяин встает, ведомый никогда ничего не делает в отношении этих случаев использования. Следует только быть уверенным, что не оба узла выполняют эту работу. –

+0

Чтобы решить проблему с длительными процессами, вы можете сделать TimerService, который постоянно пишет (пинги) в БД в таблице строки [NODE-X, LAST_PING], так что другой узел Y может проверить LAST_PING NODE-X, если блокировка была получена этим узлом. –

ответ

0

Это простое решение, похожее на то, что ActiveMQ делает, чтобы иметь только одного мастера, в то время как другие запущенные экземпляры ждут, чтобы стать мастером.

package com.despegar.bookedia.message.broker.lock; 

import com.google.common.collect.ImmutableMap; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import org.springframework.dao.DataAccessResourceFailureException; 
import org.springframework.data.jdbc.support.DatabaseType; 
import org.springframework.jdbc.core.JdbcTemplate; 
import org.springframework.jdbc.core.PreparedStatementSetter; 
import org.springframework.jdbc.core.ResultSetExtractor; 
import org.springframework.jdbc.support.MetaDataAccessException; 
import org.springframework.jdbc.support.rowset.SqlRowSet; 
import org.springframework.transaction.annotation.Transactional; 

import java.sql.SQLException; 
import java.sql.Statement; 
import java.sql.Timestamp; 
import java.time.Instant; 
import java.util.Map; 

/** 
* Represents an exclusive lock on a database to avoid multiple brokers running 
* against the same logical database. 
* <p> 
* The Lease Database Locker lets the master broker acquire a lock that's valid for a fixed (usually short) duration after which it expires. 
* To retain the lock the master broker must periodically extend the lock's lease before it expires. 
* Simultaneously the slave broker also checks periodically to see if the lease has expired. If, for whatever reason, the master broker fails to update its 
* lease on the lock the slave will take ownership of the lock becoming the new master in the process. The leased lock can survive a DB replica failover. 
* </p> 
* Each broker in the master/slave pair must have a different leaseHolderId attribute, as it is this value that is used to reserve a lease. 
* <p> 
* In the simplest case, the clocks between master and slave must be in sync for this solution to work properly. If the clocks cannot be in sync, the 
* locker can use the system time from the database CURRENT TIME and adjust the timeouts in accordance with their local variance from the DB system time. 
* If maxAllowableDiffFromDBTime is greater than zero the local periods will be adjusted by any delta that exceeds maxAllowableDiffFromDBTime. 
* </p> 
*/ 
public class LeaseDatabaseLocker implements Locker, AutoCloseable { 

    private static final Logger LOG = LoggerFactory.getLogger(LeaseDatabaseLocker.class); 
    private static final int IM_THE_MASTER_RESULT = 1; 

    private int maxAllowableDiffFromDBTime; 
    private long diffFromCurrentTime = Long.MAX_VALUE; 
    private String leaseHolderId; 
    private JdbcTemplate jdbcTemplate; 
    private int queryTimeoutInSecs = -1; 
    private long lockAcquireSleepInterval; 
    private long lockHeldPeriod; 

    public LeaseDatabaseLocker(String leaseHolderId, JdbcTemplate jdbcTemplate, int queryTimeout, 
           long lockAcquireSleepInterval, int maxAllowableDiffFromDBTime, long lockHeldPeriod) { 
     this.maxAllowableDiffFromDBTime = maxAllowableDiffFromDBTime; 
     this.jdbcTemplate = jdbcTemplate; 
     this.queryTimeoutInSecs = queryTimeout; 
     this.lockAcquireSleepInterval = lockAcquireSleepInterval; 
     this.leaseHolderId = leaseHolderId; 
     this.lockHeldPeriod = lockHeldPeriod; 
    } 

    @Transactional 
    @Override 
    public void acquireLock() { 

     LOG.debug("Attempting to acquire the exclusive lock to become the Master broker '{}'", leaseHolderId); 

     String sql = Statements.LEASE_OBTAIN_STATEMENT; 

     initTimeDiff(); 

     long now = System.currentTimeMillis() + diffFromCurrentTime; 
     long nextLockCheck = now + lockHeldPeriod; 

     PreparedStatementSetter preparedStatementSetter = statement -> { 
      setQueryTimeoutInSecs(statement); 
      statement.setString(Statements.ACQUIRE_LOCK_BROKER_NAME_COL_POSITION, leaseHolderId); 
      statement.setLong(Statements.ACQUIRE_LOCK_NEXT_CHECK_COL_POSITION, nextLockCheck); 
      statement.setLong(Statements.ACQUIRE_LOCK_TIME_NOW_POSITION, now); 
     }; 

     LOG.trace("executing: '{}' to acquire lock with values {}, {}, {}", Statements.LEASE_OBTAIN_STATEMENT, leaseHolderId, nextLockCheck, now); 
     int result = jdbcTemplate.update(sql, preparedStatementSetter); 
     LOG.trace("Locking query result: updated rows count {}", result); 

     if (result == IM_THE_MASTER_RESULT) { 
      // we got the lease, verify we still have it 
      LOG.debug("Lock acquired for '{}'", leaseHolderId); 
      if (keepLockAlive()) { 
       LOG.info("Becoming the master on dataSource: {}", jdbcTemplate.getDataSource()); 
       return; 
      } 
     } 
     reportLeaseOwnerShipAndDuration(); 

     LOG.debug("{} failed to acquire lease. Sleeping for {} milli(s) before trying again...", leaseHolderId, lockAcquireSleepInterval); 
     throw new BrokerException.LockNotAcquiredException(leaseHolderId); 
    } 

    private void reportLeaseOwnerShipAndDuration() { 
     String sql = Statements.LEASE_OWNER_STATEMENT; 

     SqlRowSet rowSet = jdbcTemplate.queryForRowSet(sql); 
     while (rowSet.next()) { 
      LOG.debug("{} - Lease held by {} till {}", leaseHolderId, rowSet.getString(1), 
        Instant.ofEpochMilli(rowSet.getLong(2))); 
     } 
    } 

    private void setQueryTimeoutInSecs(Statement statement) throws SQLException { 
     if (queryTimeoutInSecs > 0) { 
      statement.setQueryTimeout(queryTimeoutInSecs); 
     } 
    } 

    private long initTimeDiff() { 
     if (Long.MAX_VALUE == diffFromCurrentTime) { 
      if (maxAllowableDiffFromDBTime > 0) { 
       diffFromCurrentTime = determineTimeDifference(); 
      } else { 
       diffFromCurrentTime = 0l; 
      } 
     } 
     return diffFromCurrentTime; 
    } 

    protected long determineTimeDifference() { 

     ResultSetExtractor<Timestamp> timestampExtractor = rs -> { 
      rs.next(); 
      return rs.getTimestamp(1); 
     }; 
     Timestamp timestamp = jdbcTemplate.query(Statements.utcTimestamp(jdbcTemplate), timestampExtractor); 

     long result = 0L; 
     long diff = System.currentTimeMillis() - timestamp.getTime(); 
     if (Math.abs(diff) > maxAllowableDiffFromDBTime) { 
      // off by more than maxAllowableDiffFromDBTime so lets adjust 
      result = -diff; 
     } 
     LOG.info("{} diff adjust from db: {}, db time: {}", leaseHolderId, result, timestamp); 
     return result; 
    } 

    @Transactional 
    public boolean keepLockAlive() { 
     boolean result; 
     final String sql = Statements.LEASE_UPDATE_STATEMENT; 

     initTimeDiff(); 

     final long now = System.currentTimeMillis() + diffFromCurrentTime; 
     final long nextLockCheck = now + lockHeldPeriod; 

     PreparedStatementSetter statementSetter = statement -> { 
      setQueryTimeoutInSecs(statement); 
      statement.setString(Statements.KEEP_LOCK_NEW_BROKER_NAME_COL_POSITION, leaseHolderId); 
      statement.setLong(Statements.KEEP_LOCK_NEXT_CHECK_COL_POSITION, nextLockCheck); 
      statement.setString(Statements.KEEP_LOCK_BROKER_NAME_COL_POSITION, leaseHolderId); 
     }; 

     LOG.trace("executing: '{}' to keep lock alive with values {}, {}", Statements.LEASE_UPDATE_STATEMENT, leaseHolderId, nextLockCheck); 
     result = jdbcTemplate.update(sql, statementSetter) == IM_THE_MASTER_RESULT; 

     if (!result) { 
      reportLeaseOwnerShipAndDuration(); 
     } 

     return result; 
    } 

    private void releaseLease() { 
     String sql = Statements.LEASE_UPDATE_STATEMENT; 

     final int lockReleaseTime = 1; 
     PreparedStatementSetter statementSetter = statement -> { 
      statement.setString(Statements.RELEASE_LOCK_NEW_BROKER_NAME_COL_POSITION, leaseHolderId); 
      statement.setLong(Statements.RELEASE_LOCK_NEXT_CHECK_COL_POSITION, lockReleaseTime); 
      statement.setString(Statements.RELEASE_LOCK_BROKER_NAME_COL_POSITION, leaseHolderId); 
     }; 

     LOG.trace("executing: '{}' to release lock with values {}, {}, {}", sql, leaseHolderId, 1, leaseHolderId); 
     if (jdbcTemplate.update(sql, statementSetter) == IM_THE_MASTER_RESULT) { 
      LOG.info("{}, released lease", leaseHolderId); 
     } 
    } 

    @Override 
    public void close() throws Exception { 
     releaseLease(); 
    } 

    static class Statements { 

     public static final String LOCK_TABLE_NAME = "MSG_BROKER_LOCK"; 

     public static final Map<DatabaseType, String> CURRENT_DATE_TIME_UTC = ImmutableMap.of(DatabaseType.MYSQL, "SELECT UTC_TIMESTAMP", 
                           DatabaseType.H2,  "SELECT CURRENT_TIMESTAMP"); 

     public static final String LEASE_UPDATE_STATEMENT = 
       String.format("UPDATE %s SET BROKER_NAME=?, %s.TIME=? WHERE BROKER_NAME=? AND ID = 1", LOCK_TABLE_NAME, LOCK_TABLE_NAME); 

     public static final String LEASE_OWNER_STATEMENT = 
       String.format("SELECT BROKER_NAME, %s.TIME FROM %s WHERE ID = 1", LOCK_TABLE_NAME, LOCK_TABLE_NAME); 

     public static final String LEASE_OBTAIN_STATEMENT = 
       String.format("UPDATE %s SET BROKER_NAME=?, %s.TIME=? WHERE (%s.TIME IS NULL OR %s.TIME < ?) AND ID = 1", 
           LOCK_TABLE_NAME, LOCK_TABLE_NAME, LOCK_TABLE_NAME, LOCK_TABLE_NAME); 

     //Acquire constants 
     public static final int ACQUIRE_LOCK_BROKER_NAME_COL_POSITION = 1; 
     public static final int ACQUIRE_LOCK_NEXT_CHECK_COL_POSITION = 2; 
     public static final int ACQUIRE_LOCK_TIME_NOW_POSITION = 3; 

     //Keep lock alive constants 
     public static final int KEEP_LOCK_NEW_BROKER_NAME_COL_POSITION = 1; 
     public static final int KEEP_LOCK_NEXT_CHECK_COL_POSITION = 2; 
     public static final int KEEP_LOCK_BROKER_NAME_COL_POSITION = 3; 

     //Release lock constants 
     public static final int RELEASE_LOCK_NEW_BROKER_NAME_COL_POSITION = 1; 
     public static final int RELEASE_LOCK_NEXT_CHECK_COL_POSITION = 2; 
     public static final int RELEASE_LOCK_BROKER_NAME_COL_POSITION = 3; 

     private Statements() {} 

     private static String utcTimestamp(JdbcTemplate jdbcTemplate) { 
      DatabaseType dbType; 
      try { 
       dbType = DatabaseType.fromMetaData(jdbcTemplate.getDataSource()); 
      } catch (MetaDataAccessException e) { 
       throw new DataAccessResourceFailureException("Unable to determine database type: ", e); 
      } 
      String query = CURRENT_DATE_TIME_UTC.get(dbType); 
      if(query == null) { 
       throw new RuntimeException("Unrecognized DatabaseType: " + dbType); 
      } 
      return query; 
     } 
    } 
} 
Смежные вопросы