У меня есть приложение с 3 распределенными источниками данных (com.atomikos.jdbc.AtomikosDataSourceBean). Я использую диспетчер транзакций Atomikos как реализацию JTA. Каждый dataSource работает с базой данных PostgreSQL. Теперь я вызываю свои запросы последовательно каждому источнику данных, и все работает нормально.Вызов нескольких запросов к различным источникам данных одновременно с использованием JTA в рамках одной глобальной транзакции
Мне интересно, если это возможно, используя JTA для параллельного вызова моих запросов (многопоточность, параллельно)?
Я попытался просто вызвать запрос во вновь созданный поток, используя jdbcTemplate (Spring). Во-первых, я столкнулся с проблемой весны. Spring хранит транзакционный контекст в поле ThreadLocal, поэтому он не был правильно разрешен в моем новом потоке (Spring transaction manager and multithreading). Я решил эту проблему, установив один и тот же контекст транзакций в поток ThreadLocal из недавно созданного потока. Но та же проблема, с которой я столкнулся в коде Atomikos. Они также сохраняют CompositeTransactionImp на карте с привязкой к потоку (BaseTrancationManager # getCurrentTx). Но в случае Atomikos невозможно установить значения для нового потока. Поэтому я не могу выполнять свои запросы одновременно, потому что кажется, что Atomicos не поддерживает такой подход. Но я также просмотрел спецификацию JTA и нашел следующее: «Несколько потоков могут одновременно быть связаны с одной и той же глобальной транзакцией». («3,2 TransactionManager интерфейс», http://download.oracle.com/otndocs/jcp/jta-1.1-spec-oth-JSpec/?submit=Download)
ВОПРОС: Как я могу ссылаться на два или несколько запросов к различным источникам данных одновременно, используя JTA (2 фазы фиксации), в рамках одной глобальной транзакции?
DataSources конфигурации в TOMCAT контексте: менеджер
<Resource name="jdbc/db1" auth="Container" type="com.atomikos.jdbc.AtomikosDataSourceBean"
factory="com.company.package.AtomikosDataSourceBeanFactory"
xaDataSourceClassName="org.postgresql.xa.PGXADataSource"
xaProperties.serverName="localhost"
xaProperties.portNumber="5451"
xaProperties.databaseName="db1"
uniqueResourceName="jdbc/db1"
xaProperties.user="secretpassword"
xaProperties.password="secretpassword"
minPoolSize="5"
maxPoolSize="10"
testQuery="SELECT 1" />
<Resource name="jdbc/db2" auth="Container" type="com.atomikos.jdbc.AtomikosDataSourceBean"
factory="com.company.package.AtomikosDataSourceBeanFactory"
xaDataSourceClassName="org.postgresql.xa.PGXADataSource"
xaProperties.serverName="localhost"
xaProperties.portNumber="5451"
xaProperties.databaseName="db2"
uniqueResourceName="jdbc/db2"
xaProperties.user="secretpassword"
xaProperties.password="secretpassword"
minPoolSize="5"
maxPoolSize="10"
testQuery="SELECT 1" />
<Resource name="jdbc/db3" auth="Container" type="com.atomikos.jdbc.AtomikosDataSourceBean"
factory="com.company.package.AtomikosDataSourceBeanFactory"
xaDataSourceClassName="org.postgresql.xa.PGXADataSource"
xaProperties.serverName="localhost"
xaProperties.portNumber="5451"
xaProperties.databaseName="db3"
uniqueResourceName="jdbc/db3"
xaProperties.user="secretpassword"
xaProperties.password="secretpassword"
minPoolSize="5"
maxPoolSize="10"
testQuery="SELECT 1" />
транзакции конфигурации в контексте пружинной:
<bean id="transactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"
init-method="init" destroy-method="close" lazy-init="true">
<property name="forceShutdown" value="false" />
</bean>
Код:
final SqlParameterSource parameters = getSqlParameterSourceCreator().convert(entity);
// Solving Spring's ThreadLocal issue: saving thread local params
final Map<Object, Object> resourceMap = TransactionSynchronizationManager.getResourceMap();
final List<TransactionSynchronization> synchronizations = TransactionSynchronizationManager.getSynchronizations();
final boolean actualTransactionActive = TransactionSynchronizationManager.isActualTransactionActive();
final String currentTransactionName = TransactionSynchronizationManager.getCurrentTransactionName();
final AtomicReference<Throwable> exceptionHolder = new AtomicReference<Throwable>();
// Running query in a separate thread.
final Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
// Solving Spring's ThreadLocal issue: setting thread local values to newly created thread.
for (Map.Entry<Object, Object> entry : resourceMap.entrySet()) {
TransactionSynchronizationManager.bindResource(entry.getKey(), entry.getValue());
}
if (synchronizations != null && !synchronizations.isEmpty()) {
TransactionSynchronizationManager.initSynchronization();
for (TransactionSynchronization synchronization : synchronizations) {
TransactionSynchronizationManager.registerSynchronization(synchronization);
}
}
TransactionSynchronizationManager.setActualTransactionActive(actualTransactionActive);
TransactionSynchronizationManager.setCurrentTransactionName(currentTransactionName);
// Executing query.
final String query = "insert into ...";
NamedParameterJdbcTemplate template = new NamedParameterJdbcTemplate(dataSourceOne);
template.update(query, parameters);
} catch (final Throwable ex) {
exceptionHolder.set(ex);
}
}
});
thread.start();
// ... same code as above for other dataSources.
// allThreds.join(); - joining to all threads.