2013-09-17 2 views
8

У меня есть приложение с 3 распределенными источниками данных (com.atomikos.jdbc.AtomikosDataSourceBean). Я использую диспетчер транзакций Atomikos как реализацию JTA. Каждый dataSource работает с базой данных PostgreSQL. Теперь я вызываю свои запросы последовательно каждому источнику данных, и все работает нормально.Вызов нескольких запросов к различным источникам данных одновременно с использованием JTA в рамках одной глобальной транзакции

Мне интересно, если это возможно, используя JTA для параллельного вызова моих запросов (многопоточность, параллельно)?

Я попытался просто вызвать запрос во вновь созданный поток, используя jdbcTemplate (Spring). Во-первых, я столкнулся с проблемой весны. Spring хранит транзакционный контекст в поле ThreadLocal, поэтому он не был правильно разрешен в моем новом потоке (Spring transaction manager and multithreading). Я решил эту проблему, установив один и тот же контекст транзакций в поток ThreadLocal из недавно созданного потока. Но та же проблема, с которой я столкнулся в коде Atomikos. Они также сохраняют CompositeTransactionImp на карте с привязкой к потоку (BaseTrancationManager # getCurrentTx). Но в случае Atomikos невозможно установить значения для нового потока. Поэтому я не могу выполнять свои запросы одновременно, потому что кажется, что Atomicos не поддерживает такой подход. Но я также просмотрел спецификацию JTA и нашел следующее: «Несколько потоков могут одновременно быть связаны с одной и той же глобальной транзакцией». («3,2 TransactionManager интерфейс», http://download.oracle.com/otndocs/jcp/jta-1.1-spec-oth-JSpec/?submit=Download)

ВОПРОС: Как я могу ссылаться на два или несколько запросов к различным источникам данных одновременно, используя JTA (2 фазы фиксации), в рамках одной глобальной транзакции?

DataSources конфигурации в TOMCAT контексте: менеджер

<Resource name="jdbc/db1" auth="Container" type="com.atomikos.jdbc.AtomikosDataSourceBean" 
      factory="com.company.package.AtomikosDataSourceBeanFactory" 
      xaDataSourceClassName="org.postgresql.xa.PGXADataSource" 
      xaProperties.serverName="localhost" 
      xaProperties.portNumber="5451" 
      xaProperties.databaseName="db1" 
      uniqueResourceName="jdbc/db1" 
      xaProperties.user="secretpassword" 
      xaProperties.password="secretpassword" 
      minPoolSize="5" 
      maxPoolSize="10" 
      testQuery="SELECT 1" /> 

<Resource name="jdbc/db2" auth="Container" type="com.atomikos.jdbc.AtomikosDataSourceBean" 
      factory="com.company.package.AtomikosDataSourceBeanFactory" 
      xaDataSourceClassName="org.postgresql.xa.PGXADataSource" 
      xaProperties.serverName="localhost" 
      xaProperties.portNumber="5451" 
      xaProperties.databaseName="db2" 
      uniqueResourceName="jdbc/db2" 
      xaProperties.user="secretpassword" 
      xaProperties.password="secretpassword" 
      minPoolSize="5" 
      maxPoolSize="10" 
      testQuery="SELECT 1" /> 

<Resource name="jdbc/db3" auth="Container" type="com.atomikos.jdbc.AtomikosDataSourceBean" 
      factory="com.company.package.AtomikosDataSourceBeanFactory" 
      xaDataSourceClassName="org.postgresql.xa.PGXADataSource" 
      xaProperties.serverName="localhost" 
      xaProperties.portNumber="5451" 
      xaProperties.databaseName="db3" 
      uniqueResourceName="jdbc/db3" 
      xaProperties.user="secretpassword" 
      xaProperties.password="secretpassword" 
      minPoolSize="5" 
      maxPoolSize="10" 
      testQuery="SELECT 1" /> 

транзакции конфигурации в контексте пружинной:

<bean id="transactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" 
    init-method="init" destroy-method="close" lazy-init="true"> 
    <property name="forceShutdown" value="false" /> 
</bean> 

Код:

final SqlParameterSource parameters = getSqlParameterSourceCreator().convert(entity); 

    // Solving Spring's ThreadLocal issue: saving thread local params 
    final Map<Object, Object> resourceMap = TransactionSynchronizationManager.getResourceMap(); 
    final List<TransactionSynchronization> synchronizations = TransactionSynchronizationManager.getSynchronizations(); 
    final boolean actualTransactionActive = TransactionSynchronizationManager.isActualTransactionActive(); 
    final String currentTransactionName = TransactionSynchronizationManager.getCurrentTransactionName(); 
    final AtomicReference<Throwable> exceptionHolder = new AtomicReference<Throwable>(); 

    // Running query in a separate thread. 
    final Thread thread = new Thread(new Runnable() { 
     @Override 
     public void run() { 
      try { 
       // Solving Spring's ThreadLocal issue: setting thread local values to newly created thread. 
       for (Map.Entry<Object, Object> entry : resourceMap.entrySet()) { 
        TransactionSynchronizationManager.bindResource(entry.getKey(), entry.getValue()); 
       } 
       if (synchronizations != null && !synchronizations.isEmpty()) { 
        TransactionSynchronizationManager.initSynchronization(); 
        for (TransactionSynchronization synchronization : synchronizations) { 
         TransactionSynchronizationManager.registerSynchronization(synchronization); 
        } 
       } 
       TransactionSynchronizationManager.setActualTransactionActive(actualTransactionActive); 
       TransactionSynchronizationManager.setCurrentTransactionName(currentTransactionName); 

       // Executing query. 
       final String query = "insert into ..."; 
       NamedParameterJdbcTemplate template = new NamedParameterJdbcTemplate(dataSourceOne); 

       template.update(query, parameters); 
      } catch (final Throwable ex) { 
       exceptionHolder.set(ex); 
      } 
     } 
    }); 
    thread.start(); 

    // ... same code as above for other dataSources. 

    // allThreds.join(); - joining to all threads. 

ответ

1

Я думаю, что ваша идея обходиться с тем, что TransactionSynchronizationManager должен играть с однопоточной транзакцией, интересен, но, вероятно, опасен.

В TransactionSynchronizationManager транзакционные ресурсы хранятся на карте ThreadLocal, где ключ является фабрикой ресурсов, и мне интересно, что будет добавлено, когда вы будете выполнять это обходное решение с несколькими потоками с использованием той же фабрики ресурсов - возможно, т применяется в вашем случае, так как у вас есть 3 источника данных. (На первый взгляд я бы сказал, что один из ваших транзакционных ресурсов будет заменен другим, но, может быть, я что-то пропустил ...).

В любом случае, я думаю, вы можете попробовать использовать javax.transaction.TransactionManager.resume() для достижения того, что вы пытаетесь сделать.

Идея состоит в том, чтобы использовать JTA api напрямую и, таким образом, обходить однопоточную поддержку Spring-транзакций.

Вот код, чтобы проиллюстрировать, что я имею в виду:

@Autowired 
JtaTransactionManager txManager; //from Spring 

javax.transaction.TransactionManager jtaTransactionManager; 

public void parallelInserts() { 
    jtaTransactionManager = txManager.getTransactionManager(); //we are getting the underlying implementation 
    jtaTransactionManager.begin(); 
    final Transaction jtaTransaction = jtaTransactionManager.getTransaction(); 
    try { 
     Thread t1 = new Thread(){ 
     @Override 
     public void run() { 
      try { 
       jtaTransactionManager.resume(jtaTransaction); 
       //... do the insert 
      } catch (InvalidTransactionException e) { 
       try { 
        jtaTransaction.setRollbackOnly(); 
       } catch (SystemException e1) { 
        e1.printStackTrace(); 
       } 
       e.printStackTrace(); 
      } catch (SystemException e) { 
       e.printStackTrace(); 
      } 
     } 
     }; 
     t1.start(); 
     //same with t2 and t3 
    } catch (Exception ex) { 
     jtaTransactionManager.setRollbackOnly(); 
     throw ex; 
    } 
    //join threads and commit 
    jtaTransactionManager.commit(); 
} 

Я думаю, что это решение может работать (я должен сказать, что я не пробовал сам).Единственное ограничение, которое я вижу сейчас, заключается в том, что вы не можете повторно использовать потоки, потому что нет контр-части вызова resume(), а во второй раз, когда вы вызовеете resume(), у вас, вероятно, будет исключение IllegalStateException.

Смежные вопросы