Я играю с идеей, которую у меня есть для языка потока данных, и я пытаюсь использовать службу-исполнитель, чтобы ограничить количество доступных потоков количеством ядер в моей системе и создать общий пул потоков.Почему этот CompletableFuture не выполняется совместно с ExecutorService?
ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
Function a = new Function("A", service);
Function b = new Function("B", service);
Function c = new Function("C", service);
a.addObserver(b);
a.addObserver(c);
a.change()
А изменилось, заставляя его называть это evaluate()
, то B и C уведомляются им необходимо обновить, и они делают.
В Function.java
у меня есть:
public class Function implements Func{
private boolean isComplete;
private Set<Func> dependsOn;
private Set<Func> observedBy;
private ExecutorService service;
public Function(String name, ExecutorService service){
observedBy = new HashSet<>();
dependsOn = new HashSet<>();
this.isComplete = false;
this.service = service;
}
public void addObserver(Func f){
observedBy.add(f);
f.addDependency(this);
}
private void evaluate() {
boolean match = dependsOn.stream().allMatch(Func::isComplete);
if (match) {
this.isComplete = false;
// inform each observer that I'm about to start to evaluate
observedBy.forEach(func -> func.onDependentEvaluate(this));
CompletableFuture.runAsync(() -> {
try {
System.out.println("Doing some work");
int sleepTime = (int) (Math.random() * 5000);
System.out.println("Working for " + sleepTime);
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, service).thenRun(this::onComplete);
}
}
public void addDependency(Func f){
dependsOn.add(f);
}
@Override
public void change() {;
evaluate();
}
@Override
public void onComplete() {
this.isComplete = true;
observedBy.forEach((f) -> f.onDependentComplete(this));
}
@Override
public void onDependentEvaluate(Func f) {
}
@Override
public void onDependentComplete(Func func){
evaluate();
}
@Override
public boolean isComplete() {
return this.isComplete;
}
}
Когда я прохожу service
в runAsync
, только выполняется. Если я не пройду service
, а CompletableFuture использует ForkJoin.commonPool, график будет выполнен, как я ожидаю. Есть ли другой способ поделиться ExecutorService
? Любые идеи, почему следующие фьючерсы не выполняются?
Что такое 'Функция'? –
Это класс под названием 'Function' –
Я получаю это, но мы действительно не можем помочь, если мы не знаем, что делает класс. –