2016-07-31 2 views
5

У меня возникли проблемы с реализацией элегантного функционального стиля для программы, которая должна выполнять различные задачи. Вот чего я хочу достичь.Реализация задачи оркестровки

У меня есть три класса, чьи методы, которые я хочу, чтобы организовать (упрощены для краткости):

class TaskA { 
    public ResultA call() { 
     return new ResultA(); 
    } 
} 

class TaskB { 
    public ResultB call(ResultA a) { 
     return new ResultB(); 
    } 
} 

class TaskC { 
    public ResultC call(List<ResultB> resultBs) { 
     return new ResultC(); 
    } 
} 

Мне нужно выполнить TaskA «п» раз параллельно и для каждого исполнения TaskA, мне нужно выполнить TaskB 'n' раз, используя результат соответствующего TaskA. Наконец, мне нужно выполнить TaskC один раз, используя результаты всех вызовов TaskB.

Одним из способов достижения этой цели было бы создать Callable, который инкапсулирует вызов TaskA и TaskB и, наконец, в моем главном потоке, собирать List из Future х ResultB выполнить TaskC:

class TaskATaskBCallable implements Callable<ResultB> { 
    private TaskA taskA ...; 
    private TaskB taskB ...; 

    public ResultB call() { 
     return taskB.call(taskA.call()); 
    } 
} 

И в моей главной теме:

private ResultC orchestrate() { 
    ExecutorService service = ...; 
    List<Callable<ResultB>> callables = ...; 

    taskC.call(callables.map(callable -> 
     service.submit(callable)).map(Future::get).collect(Collectors.toList()); 
} 

Одна вещь, которую я не люблю об этом решении является TaskATaskBCallable. Это, вероятно, бесполезное соединение класса TaskA и TaskB. Более того, если мне нужно связать другую задачу с TaskA и TaskB, мне придется изменить TaskATaskBCallable, возможно, также изменить его имя. Я чувствую, что могу избавиться от него, используя интеллектуальные классы параллельной библиотеки Java, такие как CompletableFuture или Phaser.

Любые указатели?

ответ

0

Я нашел один способ сделать это с помощью CompletableFuture:

private ResultC orchestrate() { 
    ExecutorService service = ...; 
    int taskCount = ...; 

    List<CompletableFuture<ResultB>> resultBFutures = IntStream.rangeClosed(1, taskCount) 
        .mapToObj((i) -> CompletableFuture.supplyAsync(() -> new TaskA().call(), service)) 
        .map(resultAFuture -> resultAFuture.thenApplyAsync(resultA -> new TaskB().call(resultA), 
            service)) 
        .collect(Collectors.toList()); 

    return new TaskC().call(CompletableFuture.allOf(resultBFutures.toArray(new CompletableFuture[resultBFutures.size()])) 
        .thenApply(v -> resultBFutures.stream().map(CompletableFuture::join) 
            .collect(Collectors.toList())) 
        .join()); 
} 
-1

я думаю, что на самом деле CompletableFuture окажется самым элегантным:

int taskCount = 100; 

    List<ResultB> resultBs = IntStream.range(0, taskCount) 
      .mapToObj(i -> new TaskA()) 
      .map(taskA -> CompletableFuture.supplyAsync(taskA::call)) 
      .map(completableFutureA -> completableFutureA.thenApplyAsync(new TaskB()::call)) 
      .collect(Collectors.toList()) // collect, in order to kick off the async tasks 
      .stream() 
      .map(CompletableFuture::join) 
      .collect(Collectors.toList()); 
    return new TaskC().call(resultBs); 
+0

Не совсем, ваш 'CompletableFuture :: join' делает все это последовательный. Ваше решение выполняет следующий график в последовательном порядке: «TaskA-> TaskB-> TaskA-> TaskB ....-> TaskC' –

+0

@SwarangaSarma, вы правы, мне нужно собрать до присоединения или ленивую оценку Поток закручивает меня :) – bowmore

Смежные вопросы