2016-02-25 5 views
2

IdeaCompletableFuture против Весенних Сделок

У меня есть метод обработки, который принимает в списке элементов и обрабатывает их асинхронно с помощью внешних веб-службы. Этапы процесса также сохраняют данные во время обработки. В конце всего процесса я хочу сохранить весь процесс вместе с каждым обработанным результатом.

Проблема

преобразовать каждый элемент в списке в CompletableFuture и запустить задачу обработки на них, и положил их обратно в массив фьючерсов. Теперь, используя свой метод .ofAll (в методе последовательностей), чтобы завершить будущее, когда все предоставленные задачи будут завершены, и возвратите еще один CompletableFuture, который содержит результат.

Когда я хочу получить этот результат, я вызываю .whenComplete(..) и хотел бы установить возвращаемый результат в мой объект как данные, а затем сохранить его в базе данных, однако вызов сохранения хранилища просто ничего не делает и продолжается потоки просто продолжаются он не проходит через вызов сохранения репозитория.

@Transactional 
public void process(List<Item> items) { 
    List<Item> savedItems = itemRepository.save(items); 

    final Process process = createNewProcess(); 

    final List<CompletableFuture<ProcessData>> futures = savedItems.stream() 
    .map(item -> CompletableFuture.supplyAsync(() -> doProcess(item, process), executor)) 
    .collect(Collectors.toList()); 

    sequence(futures).whenComplete((data, throwable) -> { 
    process.setData(data); 
    processRepository.save(process); // <-- transaction lost? 
    log.debug("Process DONE"); // <-- never reached 
    }); 
    } 

метод Sequence

private static <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> futures) { 
    CompletableFuture<Void> allDoneFuture = 
     CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])); 
    return allDoneFuture.thenApply(v -> 
     futures.stream().map(CompletableFuture::join).collect(Collectors.toList()) 
    ); 
    } 

Что происходит? Почему вызов persist не проходит. Является ли поток, который запустил транзакцию, не способную совершить транзакцию или где она потерялась? Все обработанные данные верны и все хорошо. Я пробовал разные стратегии транзакций, но как можно контролировать, какой поток должен завершить транзакцию, если это так?

Любые советы?

+0

Как насчет метода 'sequence', вы можете его показать? – stjepano

+0

Это код клей-кода jus для выполнения allOf в списке фьючерсов. – Vaelyr

+1

Я не на это 100%, но похоже, что «Транзакция» начинает транзакцию при запуске метода и совершает по завершении метода, «Транзакция» не ждет, пока ваши обратные вызовы будут запущены. Вы можете проверить это с помощью простого потока. – stjepano

ответ

3

Причина вашей проблемы состоит в том, что, как сказано выше, транзакция завершается , когда достигается возврат процесса (..).

Что вы можете сделать, это создать транзакцию вручную, которая дает вам полный контроль над тем, когда он начинается и заканчивается.

Удалить @Transactional

Autowire TransactionManager затем в процессе (..):

TransactionDefinition txDef = new DefaultTransactionDefinition(); 
    TransactionStatus txStatus = transactionManager.getTransaction(txDef); 
    try { 
    //do your stuff here like 
     doWhateverAsync().then(transactionManager.commit(txStatus);) 
    } catch (Exception e) { 
     transactionManager.rollback(txStatus); 
     throw e; 
    } 
+1

Важно отметить, что doWhateverAsync должен работать в том же созданном транзакционном контексте. Следует соблюдать осторожность при выполнении операций в других потоках, поскольку такой же контекст может быть недоступен. – vishr

+0

@vishr просто добавляет, что любая работа, выполняемая в CompletableFuture, будет иметь транзакцию, даже если мы вручную закрываем транзакцию после завершения будущего, потому что, согласно моему пониманию, completablefuture будет работать на новом потоке (или исполняемом потоке пул), а не поток, на который подавался наш запрос. И новый поток может не иметь такого же контекста. – pannu

0

В случае Спринг загрузки приложения, вам необходимо следующими конфигураций.

Основной метод применения должен быть аннотирован с помощью @EnableAsync.

@ Аннотации @Async должны быть в верхней части метода с @Transactional аннотацией. Это необходимо для указания, что обработка будет происходить в дочерней цепочке.