2015-08-12 1 views
1

Я играю с идеей, которую у меня есть для языка потока данных, и я пытаюсь использовать службу-исполнитель, чтобы ограничить количество доступных потоков количеством ядер в моей системе и создать общий пул потоков.Почему этот CompletableFuture не выполняется совместно с ExecutorService?

ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); 
Function a = new Function("A", service); 
Function b = new Function("B", service); 
Function c = new Function("C", service); 
a.addObserver(b); 
a.addObserver(c); 
a.change() 

А изменилось, заставляя его называть это evaluate(), то B и C уведомляются им необходимо обновить, и они делают.

В Function.java у меня есть:

public class Function implements Func{ 
    private boolean isComplete; 
    private Set<Func> dependsOn; 
    private Set<Func> observedBy; 
    private ExecutorService service; 

    public Function(String name, ExecutorService service){ 
     observedBy = new HashSet<>(); 
     dependsOn = new HashSet<>(); 
     this.isComplete = false; 
     this.service = service; 
    } 

    public void addObserver(Func f){ 
     observedBy.add(f); 
     f.addDependency(this); 
    } 

    private void evaluate() { 
     boolean match = dependsOn.stream().allMatch(Func::isComplete); 

     if (match) { 
      this.isComplete = false; 

      // inform each observer that I'm about to start to evaluate 
      observedBy.forEach(func -> func.onDependentEvaluate(this)); 

      CompletableFuture.runAsync(() -> { 
       try { 
        System.out.println("Doing some work"); 
        int sleepTime = (int) (Math.random() * 5000); 
        System.out.println("Working for " + sleepTime); 
        Thread.sleep(sleepTime); 
       } catch (InterruptedException e) { 
        e.printStackTrace(); 
       } 
      }, service).thenRun(this::onComplete); 
     } 
    } 

    public void addDependency(Func f){ 
     dependsOn.add(f); 
    } 

    @Override 
    public void change() {; 
     evaluate(); 
    } 

    @Override 
    public void onComplete() { 
     this.isComplete = true; 
     observedBy.forEach((f) -> f.onDependentComplete(this)); 
    } 

    @Override 
    public void onDependentEvaluate(Func f) { 
    } 

    @Override 
    public void onDependentComplete(Func func){ 
     evaluate(); 
    } 

    @Override 
    public boolean isComplete() { 
     return this.isComplete; 
    } 
} 

Когда я прохожу service в runAsync, только выполняется. Если я не пройду service, а CompletableFuture использует ForkJoin.commonPool, график будет выполнен, как я ожидаю. Есть ли другой способ поделиться ExecutorService? Любые идеи, почему следующие фьючерсы не выполняются?

+0

Что такое 'Функция'? –

+0

Это класс под названием 'Function' –

+0

Я получаю это, но мы действительно не можем помочь, если мы не знаем, что делает класс. –

ответ

1

Ваша проблема намного проще, чем вы думаете. tl; dr Ваша единственная проблема в том, что вы не закрываете ExecutorService, поэтому ваша программа не заканчивается.

Длинная версия: В моей среде, эта программа работает, как ожидалось, и задачи B и C делают, на самом деле, бегут. Я отредактировал ваш код, чтобы разрешить мне создать AbstractFunction, что позволяет мне переопределить поведение того, что делает Function, например.

public abstract class AbstractFunction implements Func { 
    protected boolean isComplete; 
    protected Set<Func> dependsOn; 
    protected Set<Func> observedBy; 
    protected ExecutorService service; 
    protected String name; 

    public AbstractFunction(String name, ExecutorService service) { 
    observedBy = new HashSet<>(); 
    dependsOn = new HashSet<>(); 
    this.isComplete = false; 
    this.service = service; 
    this.name = name; 
    } 

    public void addObserver(Func f) { 
    observedBy.add(f); 
    f.addDependency(this); 
    } 

    public void addDependency(Func f) { 
    dependsOn.add(f); 
    } 

    @Override 
    public void onComplete() { 
    this.isComplete = true; 
    observedBy.forEach((f) -> f.onDependentComplete(this)); 
    } 

    @Override 
    public boolean isComplete() { 
    return this.isComplete; 
    } 

    @Override 
    public void change() { } 

    @Override 
    public void onDependentEvaluate(Func f) { } 

    @Override 
    public void onDependentComplete(Func func) { } 
} 

Итак, оригинал (фактически без изменений) Function теперь это:

public class Function extends AbstractFunction implements Func { 
    public Function(String name, ExecutorService service) { 
    super(name, service); 
    } 

    protected void evaluate() { 
    boolean match = dependsOn.stream().allMatch(Func::isComplete); 

    if (match) { 
     this.isComplete = false; 

     // inform each observer that I'm about to start to evaluate 
     observedBy.forEach(func -> func.onDependentEvaluate(this)); 

     CompletableFuture.runAsync(() -> { 
     try { 
      System.out.println("Doing some work"); 
      int sleepTime = (int) (Math.random() * 5000); 
      System.out.println("Working for " + sleepTime); 
      Thread.sleep(sleepTime); 
     } catch (InterruptedException e) { 
      System.out.println("I was interrupted, my name is: " + name); 
      e.printStackTrace(); 
     } 
     }, service).thenRun(this::onComplete); 
    } 
    } 

    @Override 
    public void change() { 
    evaluate(); 
    } 

    @Override 
    public void onDependentComplete(Func f) { 
    evaluate(); 
    } 
} 

Учитывая это новая структура, я создаю ShutdownFunc слушать и закрыть службу ИСПОЛНИТЕЛЬ когда "C" заканчивается, т.е.

public class ShutdownFunc extends AbstractFunction { 
    public ShutdownFunc(String name, ExecutorService service) { 
    super(name, service); 
    } 

    @Override 
    public void onDependentComplete(Func f) { 
    System.out.println("Shutting down the system"); 
    try { 
     service.shutdown(); 
     service.awaitTermination(5, TimeUnit.SECONDS); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
    } 
} 

И, главное, конечно:

public class Main { 
    public static void main(String... args) { 
    ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime() 
     .availableProcessors()); 
    Function a = new Function("A", service); 
    Function b = new Function("B", service); 
    Function c = new Function("C", service); 
    a.addObserver(b); 
    a.addObserver(c); 

    Func d = new ShutdownFunc("D", service); 
    c.addObserver(d); 

    a.change(); 
    } 
} 

Выход:

Doing some work 
Working for 4315 
Doing some work 
Working for 2074 
Doing some work 
Working for 2884 
Shutting down the system 

И программа правильно ожидает завершения, а затем завершается корректно.

+1

Nice sol'n! На самом деле у меня есть 'service.shutdown()' как последняя строка моего основного метода, но пропустил его в моей копии. Ваше решение указало мне в правильном направлении. Если я вызываю 'service.shutdown()' в моем основном методе, он отключает службу, даже если новая задача была просто отправлена, что заставляет B никогда не выполнять. –

Смежные вопросы