2013-02-26 3 views
1

У меня есть следующая функция, в псевдокоде:Рекурсивный параллелизм

Result calc(Data data) { 
    if (data.isFinal()) { 
    return new Result(data); // This is the actual lengthy calculation 
    } else { 
    List<Result> results = new ArrayList<Result>(); 
    for (int i=0; i<data.numOfSubTasks(); ++i) { 
     results.add(calc(data.subTask(i)); 
    } 
    return new Result(results); // merge all results in to a single result 
    } 
} 

Я хочу, чтобы распараллелить его, используя фиксированное количество потоков.

Моя первая попытка была:

ExecutorService executorService = Executors.newFixedThreadPool(numOfThreads); 

Result calc(Data data) { 
    if (data.isFinal()) { 
    return new Result(data); // This is the actual lengthy calculation 
    } else { 
    List<Result> results = new ArrayList<Result>(); 
    List<Callable<Void>> callables = new ArrayList<Callable<Void>>(); 
    for (int i=0; i<data.numOfSubTasks(); ++i) { 
     callables.add(new Callable<Void>() { 
     public Void call() { 
     results.add(calc(data.subTask(i)); 
     } 
     }); 
    } 
    executorService.invokeAll(callables); // wait for all sub-tasks to complete 
    return new Result(results); // merge all results in to a single result 
    } 
} 

Однако это быстро застрял в тупик, потому что, в то время как верхние ожидания уровня рекурсии для всех потоков, чтобы закончить внутренние уровни и ждать, пока потоки становятся доступными ...

Как я могу эффективно распараллеливать свою программу без взаимоблокировок?

+2

В чем вопрос? –

+0

Мы не видим синхронизации в вашем коде, поэтому мы не можем помочь вам найти тупик. Предоставленный код не имеет ничего общего с взаимоблокировками. – ATrubka

+0

Возможно, это просто, что numOfThreads меньше, чем callables.size()? Вы должны убедиться в своей реализации, что у вас больше потоков, чем вызывающих (вниз по дереву). – cybye

ответ

4

Ваша проблема - общая проблема проектирования при использовании ThreadPoolExecutor для задач с зависимостями.

Я вижу два варианта:

1) Убедитесь представить задачи в порядке снизу вверх, так что вы никогда не запущенное задание, которое зависит от задачи, которую еще не начать.

2) Используйте «прямой передачи обслуживания» стратегии (см ThreadPoolExecutor документацию):

ThreadPoolExecutor executor = new ThreadPoolExecutor(poolSize, poolSize, 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); 
executor.setRejectedExecutionHandler(new CallerRunsPolicy()); 

Идея использования синхронной очереди, так что задачи никогда не ждать в реальной очереди. Обработчик отклонения выполняет задачи, которые не имеют доступного потока для запуска. С помощью этого конкретного обработчика поток отправителя выполняет отклоненные задачи.

Эта конфигурация исполнителя гарантирует, что задачи никогда не будут отклонены и что у вас никогда не будет взаимоблокировок из-за зависимостей между задачами.

+0

с 2) не сталкивайтесь с проблемой, что весь пул заполнен организацией сброса, и вся тяжелая работа будет выполняться вызывающим? – cybye

+2

@cybye: «вызывающий» в этом случае является нитью из пула или фактически многими из них. Поэтому вместо того, чтобы ждать их зависимостей, они фактически обрабатывают данные. –

+0

выглядит хорошо и легче понять, чем фьючерсы. – cybye

0

вы должны разделить свой подход в два этапа:

  1. не создают все дерево вниз до data.isFinal() == TRUE
  2. рекурсивно собирать результаты (возможно только, если слияние не производит другие операции/звонки)

Для этого вы можете использовать [Futures][1], чтобы выполнить результаты async. Значит, все результаты calc будут иметь тип Future [Result].

Сразу же возвращение Будущего освободит текущую нить и даст место для обработки других. С помощью коллекции результатов (новый результат (результаты)) вы должны ждать, пока все результаты будут готовы (ScatterGather-Pattern, вы можете использовать семафор, чтобы ждать всех результатов). Сама коллекция будет ходить по дереву, и проверка (или ожидание результатов будет достигнута) произойдет в одном потоке.

В целом вы строите дерево фьючерсов, которое используется для сбора результатов и выполнения только «дорогих» операций в threadpool.

Смежные вопросы