2014-10-26 1 views
6

Дано:Как использовать CompletableFuture.thenComposeAsync()?

public class Test 
{ 
    public static void main(String[] args) 
    { 
     int nThreads = 1; 
     Executor e = Executors.newFixedThreadPool(nThreads); 

     CompletableFuture.runAsync(() -> 
     { 
      System.out.println("Task 1. Thread: " + Thread.currentThread().getId()); 
     }, e).thenComposeAsync((Void unused) -> 
     { 
      return CompletableFuture.runAsync(() -> 
      { 
       System.out.println("Task 2. Thread: " + Thread.currentThread().getId()); 
      }, e); 
     }, e).join(); 
     System.out.println("finished"); 
    } 
} 

Я ожидаю одного ИСПОЛНИТЕЛЬ нить для выполнения задачи 1, затем задачи 2. Вместо этого, код висит, если nThreads меньше 2.

  1. Пожалуйста, объясните, почему код висит. Я вижу, что он заблокирован в CompletableFuture:616, ожидая завершения Future, но непонятно, почему.
  2. Если я разрешаю использовать 2 потока, для чего используется каждая нить?

Короче говоря, пожалуйста, помогите мне понять, как работает thenComposeAsync(). Javadoc выглядит как она была написана для роботов, а не людей :)

+0

Как правило, не используйте блокирующие операции в задачах (здесь join()) в задачах, выполняемых в пулах потоков, особенно в пулах фиксированных потоков. Разрешить блокировку только для независимых потоков. –

+0

@AlexeiKaigorodov, это хороший момент, но в приведенном выше примере 'join()' выполняется вне задачи, не так ли? – Gili

+0

есть, мой ошибка. –

ответ

8
  1. Метод thenComposeAsync ставит новую задачу для своего исполнителя, который захватывает один поток и ждет ваш Task 2 завершить. Но у этого нет больше потоков для запуска. Вместо этого вы можете использовать метод , который выполняется в том же потоке, что и Task 1, чтобы избежать тупика.

  2. Один поток выполняет Task 1 и Task 2, а второй занимается составлением результатов двух.

Примечание: CompletableFuture (ы) лучше всего работают с ForkJoinPool, что является более эффективным в задачах обработки, которые порождают новые задачи. По умолчанию ForkJoinPool был добавлен в Java 8 для этой цели и используется по умолчанию, если вы не укажете исполнителя для выполнения своих задач.

Вот хорошая презентация о том, как эти новые функции светятся и как они работают: Reactive Programming Patterns with Java 8 Futures.

+0

Ссылка на презентацию нарушена. Является ли [это] (https://wn.com/reactive_programming_patterns_with_java_8_futures) заменой? –

+0

Да, я обновил ссылку – dcernahoschi

4

Он блокирует runAsync, что находится внутри thenComposeAsync. thenComposeAsync запускает поставляемую функцию в потоке внутри исполнителя e. Но функция, которую вы ей дали, пытается сама выполнить тело runAsync внутри одного и того же исполнителя.

Вы можете лучше видеть, что происходит, добавив еще один вывод трассировки:

CompletableFuture.runAsync(() -> { 
    System.out.println("Task 1. Thread: " + Thread.currentThread().getId()); 
}, e).thenComposeAsync((Void unused) -> { 
    System.out.println("Task 1 1/2. Thread: " + Thread.currentThread().getId()); 
    return CompletableFuture.runAsync(() -> { 
     System.out.println("Task 2. Thread: " + Thread.currentThread().getId()); 
    }, e); 
}, e).join(); 

Теперь, если вы запустите его с 2-жильный исполнителя, вы увидите, что Задача 1 1/2 и 2 задачи работают на разные потоки.

Способ исправить это заменить thenComposeAsync только с регулярными thenCompose. Я не уверен, почему вы когда-либо использовали thenComposeAsync. Если у вас есть метод, который возвращает CompletableFuture, предположительно, этот метод не блокируется и не должен выполняться асинхронно.

+1

'Я не уверен, почему вы когда-либо использовали thenComposeAsync. Если у вас есть метод, который возвращает CompletableFuture, предположительно, этот метод не блокируется и не должен запускаться асинхронно. «Это очень хороший момент. – Gili

Смежные вопросы