2016-06-26 2 views
2

У меня есть следующие компоненты:Объединение нескольких CompletableFutures

private JobInfo aggregateJobInfo() { 
    final JobsResult jobsResult = restClient().getJobs(); 
    final List<String> jobIds = extractJobIds(jobsResult); 

    //fetch details, exceptions and config for each job 
    final List<JobDetails> jobDetails = jobIds.stream().map(jobId -> { 
     final JobDetailResult jobDetailResult = restClient().getJobDetails(jobId); 
     final JobExceptionsResult jobExceptionsResult = restClient().getJobExceptions(jobId); 
     final JobConfigResult jobConfigResult = restClient().getJobConfig(jobId); 
     return new JobDetails(jobDetailResult, jobExceptionsResult, jobConfigResult); 
    }).collect(Collectors.toList()); 
    return new JobInfo(jobsResult, jobDetails); 
} 

private static List<String> extractJobIds(final JobsResult jobsResult) { 
    final ArrayList<String> jobIds = new ArrayList<>(); 
    jobIds.addAll(jobsResult.getRunning()); 
    jobIds.addAll(jobsResult.getFinished()); 
    jobIds.addAll(jobsResult.getCanceled()); 
    jobIds.addAll(jobsResult.getFailed()); 
    return jobIds; 
} 

Это просто вызывает некоторые конечные точки и aggergates некоторые данные. Теперь я пытаюсь сделать это без блокировки с помощью CompletableFutures, что я действительно не использовал раньше ..

private CompletableFuture<JobInfo> aggregateJobInfo() { 
    final CompletableFuture<JobsResult> jobsResultFuture = restClient().getJobs(); 
    final CompletableFuture<List<String>> jobIdsFuture = jobsResultFuture.thenApply(JobInfoCollector::extractJobIds); 

    //fetch details, exceptions and config for each job 
    final CompletableFuture<List<CompletableFuture<JobDetails>>> jobDetailsFuture = jobIdsFuture.thenApply(jobIds -> { 
     return jobIds.stream().map(jobId -> { 
      final CompletableFuture<JobDetailResult> jobDetailsResultFuture = restClient().getJobDetails(jobId); 
      final CompletableFuture<JobExceptionsResult> jobExceptionsFuture = restClient().getJobExceptions(jobId); 
      final CompletableFuture<JobConfigResult> jobConfigFuture = restClient().getJobConfig(jobId); 
      return jobDetailsResultFuture.thenCompose(jobDetailResult -> { 
       return jobExceptionsFuture.thenCombine(jobConfigFuture, (jobExceptionsResult, jobConfigResult) -> { 
        return new JobDetails(jobDetailResult, jobExceptionsResult, jobConfigResult); 
       }); 
      }); 

     }).collect(Collectors.toList()); 
    }); 
    return null; 

Моя проблема заключается в том, чтобы создать CompletableFuture здесь, когда JobInfo является `новый JobInfo (jobsResult, jobDetails) ?!

Как я уже сказал, я новичок в этом, может быть, мой подход плох и есть лучшие решения?

Любые идеи оценили, спасибо

Первая рабочая версия:

private CompletableFuture<JobInfo> aggregateJobInfo() { 

    final CompletableFuture<JobsResult> jobsResultFuture = restClient().getJobs(); 
    final CompletableFuture<List<String>> jobIdsFuture = jobsResultFuture.thenApply(JobInfoCollector::extractJobIds); 

    final CompletableFuture<List<CompletableFuture<JobDetails>>> jobDetailsFutureListFuture = 
      jobIdsFuture.thenApply(jobIds -> jobIds.stream().map(jobId -> { 
       final CompletableFuture<JobDetailResult> jobDetailsResultFuture = restClient().getJobDetails(jobId); 
       final CompletableFuture<JobExceptionsResult> jobExceptionsFuture = restClient().getJobExceptions(jobId); 
       final CompletableFuture<JobConfigResult> jobConfigFuture = restClient().getJobConfig(jobId); 
       return jobDetailsResultFuture.thenCompose(jobDetailResult -> 
         jobExceptionsFuture.thenCombine(jobConfigFuture, (jobExceptionsResult, jobConfigResult) -> 
           new JobDetails(jobDetailResult, jobExceptionsResult, jobConfigResult))); 
      }).collect(Collectors.toList())); 

    return jobDetailsFutureListFuture.thenCompose(jobDetailsFutures -> 
      CompletableFuture.allOf(jobDetailsFutures.toArray(
        new CompletableFuture[jobDetailsFutures.size()])).thenApply(aVoid -> 
        jobDetailsFutures.stream() 
          .map(CompletableFuture::join) 
          .collect(Collectors.toList()))) 
      .thenApply(jobDetails -> jobsResultFuture.thenApply(jobsResult -> 
        new JobInfo(jobsResult, jobDetails))) 
      .join(); 
} 
+1

Не похоже, что большая часть кода, который вы вставили, имеет отношение к вопросу. Можете ли вы уменьшить его до минимального примера того, что вам нужно? – the8472

+0

Вопрос состоял в том, как «отобразить» будущие данные выше, так что возвращается CompletableFuture . –

ответ

4

У вас есть:

  • CompletableFuture<JobsResult> jobsResultFuture
  • CompletableFuture<List<CompletableFuture<JobDetails>>> jobDetailsFuture
  • JobInfo(JobsResult a, List<JobDetails> b)

Вы хотите

CompletableFuture<JobInfo>

дополнительное наблюдение: jobDetailsFuture может быть завершена только тогда, когда jobsResultFuture завершена.

Таким образом, вы можете осуществить следующие действия:

  1. List<CompletableFuture<JobDetails>> ->Void через allOf в thenCompose
  2. Void + List<CompletableFuture<JobDetails>> (в захваченном вар) ->List<JobDetails> через thenApply
  3. List<JobDetails> + CompletableFuture<JobsResult> (как захваченный var) ->JobInfo via thenApply

Вы можете просто развернуть фьючерсы через get() внутри этих функций картографа, поскольку фьючерсы, как гарантируется, будут завершены в этой точке из-за зависимостей фьючерсов их предков в этой точке.

Другие подходы с использованием thenCombine и сокращение потока возможны, но более подробные и создавать более промежуточные фьючерсы.

+0

Извините, я не понимаю. Что значит «awaitAll» и Void? –

+0

означает [allOf] (https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html#allOf-java.util.concurrent.CompletableFuture...-).Он возвращает 'CompletableFuture ', поэтому следующий шаг, чтобы вернуть данные – the8472

+0

Все еще не понимаю. Пытались jobDetailsListFuture.thenCompose (jobDetailsFutures -> { возврата CompletableFuture.allOf (jobDetailsFutures.toArray ( новый CompletableFuture [jobDetailsFutures.size()])) .thenApply (следует избегать -> {???}); }); , но не знаю, что делать с пустотой. –

Смежные вопросы