2016-04-19 2 views
1

У меня есть ExecutorService который направляет вычисленных данных в CompletableFuture:Отмена CompletableFuture контролируется ExecutorService

class DataRetriever { 
    private final ExecutorService service = ...; 

    public CompletableFuture<Data> retrieve() { 
     final CompletableFuture<Data> future = new CompletableFuture<>(); 

     service.execute(() -> { 
      final Data data = ... fetch data ... 
      future.complete(data); 
     }); 
     return future; 
    } 
} 

Я хочу клиент/пользователь, чтобы иметь возможность отменить задание:

final DataRetriever retriever = new DataRetriever(); 
final CompletableFuture<Data> future = retriever().retrieve(); 

future.cancel(true); 

Это не работает, так как это отменяет внешний CompletableFuture, но не внутреннее будущее, как запланировано в службе исполнителя.

Возможно ли распространять cancel() на внешнее будущее во внутреннее будущее?

+0

Почему вы используете 'CompletableFuture' в первую очередь? Почему бы просто не выполнить 'Callable ' и вернуть результирующее 'Future' из' retrieve() '? – erickson

ответ

3

CompletableFuture#cancel действительно предназначен только для маркировки CompletableFuture как отмененный. Он ничего не делает, чтобы уведомить исполняемую задачу, чтобы остановить ее, поскольку она не имеет отношения к какой-либо такой задаче.

В javadoc намеки на этом

mayInterruptIfRunning - это значение не имеет никакого эффекта в этом варианте осуществления, поскольку прерывания не используются для управления обработкой.

внутренний будущее в вашем примере является Future, не CompletableFuture, и она имеет отношение к исполняющему Runnable (или Callable). Внутренне, так как он знает, на что выполняет Thread, он может отправить его interrupt, чтобы попытаться остановить его.

Одним из вариантов является возвращение кортеж некоторых видов (например, некоторые POJO), который предоставляет ссылки на оба ваши CompletableFuture и к Future возвращенного ExecutorService#submit. Вы можете использовать Future до cancel, если вам нужно. Вы должны помнить cancel или completeCompletableFuture так, чтобы другие части вашего кода не оставались заблокированными/голоденными навсегда.

+0

Проблема с возвратом кортежа/pojo (содержащая внешнее и внутреннее будущее) заключается в том, что клиент узнает о внутренней реализации. Он не должен заботиться о своей перспективе. Было бы полезно, если бы мы каким-то образом связали внешнее и внутреннее будущее. –

+0

Отмена 'Future', возвращенная' ExecutorService # submit', кажется, стоит попробовать. – peco

+0

@DaveTeezo Существуют более сложные решения, такие как расширение 'CompletableFuture', чтобы обернуть связанное« будущее », но я считаю реализацию довольно ненадежной и неясной. – Savior

0

Добавив обработчик исключений в внешнее будущее, вы можете передать вызов cancel во внутреннее будущее. Это работает, потому что CompletableFuture.cancel заставляет будущее закончить исключительно с CancellationException.

private final ExecutorService service = ...; 

public CompletableFuture<Data> retrieve() { 
    final CompletableFuture<Data> outer = new CompletableFuture<>(); 

    final Future<?> inner = service.submit(() -> { 
     ... 
     future.complete(data); 
    }); 

    outer.exceptionally((error) -> { 
     if (error instanceof CancellationException) { 
      inner.cancel(true); 
     } 
     return null; // must return something, because 'exceptionally' expects a Function 
    }); 

    return outer; 
} 

Вызов outer.exceptionally создает новый CompletableFuture, так что это не влияет на отмену или исключения статуса из outer самих. Вы все равно можете добавить любые другие CompletionStage, которые вам нравятся outer, в том числе еще один этап exceptionally, и он будет работать как ожидалось.

0

Другим решением, которое затронул Столб, является расширение CompletableFuture. Вот один из методов, который очень похож на ваш существующий код. Он также обрабатывает исключения, что является хорошим бонусом.

class CancelableFuture<T> extends CompletableFuture<T> { 
    private Future<?> inner; 

    /** 
    * Creates a new CancelableFuture which will be completed by calling the 
    * given {@link Callable} via the provided {@link ExecutorService}. 
    */ 
    public CancelableFuture(Callable<T> task, ExecutorService executor) { 
     this.inner = executor.submit(() -> complete(task)); 
    } 

    /** 
    * Completes this future by executing a {@link Callable}. If the call throws 
    * an exception, the future will complete with that exception. Otherwise, 
    * the future will complete with the value returned from the callable. 
    */ 
    private void complete(Callable<T> callable) { 
     try { 
      T result = callable.call(); 
      complete(result); 
     } catch (Exception e) { 
      completeExceptionally(e); 
     } 
    } 

    @Override 
    public boolean cancel(boolean mayInterrupt) { 
     return inner.cancel(mayInterrupt) && super.cancel(true); 
    } 
} 

Затем в DataRetriever, вы можете просто сделать:

public CompletableFuture<Data> retrieve() { 
    return new CancelableFuture<>(() -> {... fetch data ...}, service); 
} 
0

С моей Tascalate Concurrent библиотеки код можно переписать следующим образом:

class DataRetriever { 
    private final ExecutorService service = ...; 

    public Promise<Data> retrieve() { 
    return CompletableTask.supplyAsync(() -> { 
     final Data data = ... fetch data ... 
     return data; 
    }, service); 
    } 
} 

Promise и CompletableTask классы от моего библиотеки, вы можете узнать больше в my blog

Смежные вопросы