5

У меня есть следующий код (в результате my previous question), который планирует задачу на удаленном сервере, а затем опроса для завершения с использованием ScheduledExecutorService#scheduleAtFixedRate. По завершении задачи он загружает результат. Я хочу вернуть вызывающему абоненту Future, чтобы они могли решить, когда и как долго блокировать, и дать им возможность отменить задачу.CompletedFuture # whenComplete не вызывается, если thenApply используется

Моя проблема заключается в том, что если клиент отменяет Future, возвращенный методом download, блок whenComplete не выполняет. Если я удалю thenApply, то делает. Очевидно, я что-то не понимаю о композиции Future ... Что я должен изменить?

public Future<Object> download(Something something) { 
    String jobId = schedule(something); 
    CompletableFuture<String> job = pollForCompletion(jobId); 
    return job.thenApply(this::downloadResult); 
} 

private CompletableFuture<String> pollForCompletion(String jobId) { 
    ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); 
    CompletableFuture<String> completionFuture = new CompletableFuture<>(); 

    ScheduledFuture<?> checkFuture = executor.scheduleAtFixedRate(() -> {   
      if (pollRemoteServer(jobId).equals("COMPLETE")) { 
       completionFuture.complete(jobId); 
      } 
    }, 0, 10, TimeUnit.SECONDS); 
    completionFuture     
      .whenComplete((result, thrown) -> { 
       System.out.println("XXXXXXXXXXX"); //Never happens unless thenApply is removed 
       checkFuture.cancel(true); 
       executor.shutdown(); 
      }); 
    return completionFuture; 
} 

На той же ноте, если я:

return completionFuture.whenComplete(...) 

вместо

completionFuture.whenComplete(...); 
return completionFuture; 

whenComplete также никогда не выполняется. Это кажется мне очень противоречивым. Не следует ли логически возвращать FuturewhenComplete, к которому я должен придерживаться?

EDIT:

Я изменил код явно резервного размножать отмены. Это отвратительное и нечитаемым, но это работает, и я не мог найти лучший способ:

public Future<Object> download(Something something) throws ChartDataGenException, Exception { 
     String jobId = schedule(report); 
     CompletableFuture<String> job = pollForCompletion(jobId); 
     CompletableFuture<Object> resulting = job.thenApply(this::download); 
     resulting.whenComplete((result, thrown) -> { 
      if (resulting.isCancelled()) { //the check is not necessary, but communicates the intent better 
       job.cancel(true); 
      } 
     }); 
     return resulting; 
} 
+0

Он даже не входит в блок 'whenComplete'. Я помещаю точку останова и 'System.out.print' внутри, и ни одна точка останова не попадает, и линия не печатается. Это произойдет, если я удалю бит 'thenApply'. – kaqqao

+0

По моему мнению, это должно быть обратное тому, что вы сообщаете. 'completeFuture.whenComplete()' является чистой функцией и не должен ничего менять в том, как работает 'completeFuture'. Если вы не вернете результат 'whenComplete', он должен стать недоступным и подчиняться GC. –

+0

Абсолютно согласен. Но это то, что я вижу ... Если вернуть результат 'whenComplete' и' cancel' его немедленно, я не получаю XXXXX в консоли. С другой стороны, если я верну оригинал 'completeFuture' и' cancel' _that_, я это сделаю. – kaqqao

ответ

4

Ваша структура выглядит следующим образом:

  ┌──────────────────┐ 
      │ completionFuture | 
      └──────────────────┘ 
      ↓    ↓ 
    ┌──────────────┐  ┌───────────┐ 
    │ whenComplete |  │ thenApply | 
    └──────────────┘  └───────────┘ 

Итак, когда вы отмените thenApply будущее, оригинальный completionFuture объект остается незатронутым, поскольку он не зависит от этапа thenApply. Если, однако, вы не связываете этап thenApply, вы возвращаете исходный экземпляр completionFuture, и отмена этого этапа вызывает отмену всех зависимых этапов, в результате чего действие whenComplete должно быть выполнено немедленно.

Но когда этап thenApply отменен, completionFuture все еще может быть завершен, если выполнено условие pollRemoteServer(jobId).equals("COMPLETE"), поскольку этот опрос не прекращается. Но мы не знаем отношения jobId = schedule(something) и pollRemoteServer(jobId). Если ваши изменения состояния приложения таким образом, что это условие никогда не может быть выполнено после отмены загрузки, это будущее никогда не будет завершена ...


Что касается вашего последнего вопроса, что будущее есть «один я должен держаться? «Нет необходимости иметь линейную цепочку фьючерсов, на самом деле, в то время как удобные методы CompletableFuture позволяют легко создавать такую ​​цепочку, чаще всего это наименее полезная вещь, поскольку вы могли бы просто написать блок кода, если у вас есть линейная зависимость. Ваша модель цепочки двух независимых этапов правильная, но отмена не проходит через нее, но она не будет работать и через линейную цепочку.

Если вы хотите отменить исходный этап, вам нужна ссылка на него, но если вы хотите получить результат зависимого этапа, вам понадобится ссылка на этот этап.

+0

OP не нуждается в результате 'whenComplete'. Но я понимаю, что этот этап должен быть явно сохранен, иначе он недостижим и может существовать или не существовать в момент отмены. –

+0

@MarkoTopolnik Я предполагаю, что первоначальное будущее, которое вы называете 'whenComplete', сохраняет ссылку на цепочку ... В противном случае это было бы серьезно отвратительно. – kaqqao

+0

@kaqqao Вероятно, это правильно, потому что вы ожидаете, что это будет реализовано, но это все еще неопределенное поведение и нездоровое, чтобы полагаться. –

Смежные вопросы