2017-01-13 2 views
0

В приведенном ниже коде показано, что простое использование рекурсивного соединения fork (find max), я знаю, что Java JIT может добиться этого быстрее в простой однопоточной петле, однако ее просто для демонстрации.Можно ли написать рекурсивное решение объединения fork с помощью Executors.newWorkStealingPool()?

Первоначально я реализовал макс find, используя фреймворк ForkJoin, который отлично работал для больших массивов удвоений (1024 * 1024).

Я чувствую, что смогу добиться того же без с использованием рамки ForkJoin, используя только Executor.workStealingPool() и Callables/Futures.

Возможно ли это?

Моя попытка ниже:

class MaxTask implements Callable<Double> { 

    private double[] array; 
    private ExecutorService executorService; 
    public MaxTask(double[] array, ExecutorService es){ 
     this.array = array; 
     this.executorService = es; 
    } 
    @Override 
    public Double call() throws Exception { 
     if (this.array.length!=2){ 
      double[] a = new double[(this.array.length/2)]; 
      double[] b = new double[(this.array.length/2)]; 
      for (int i=0;i<(this.array.length/2);i++){ 
       a[i] = array[i]; 
       b[i] = array[i+(this.array.length/2)]; 
      } 
      Future<Double> f1 = this.executorService.submit(new MaxTask(a,this.executorService)); 
      Future<Double> f2 = this.executorService.submit(new MaxTask(b,this.executorService)); 

      return Math.max(f1.get(), f2.get()); 
     } else { 
      return Math.max(this.array[0], this.array[1]); 
     } 
    } 

} 

ExecutorService es = Executors.newWorkStealingPool(); 

double[] x = new double[1024*1024]; 
for (int i=0;i<x.length;i++){ 
    x[i] = Math.random(); 
} 

MaxTask mt = new MaxTask(x,es); 

es.submit(mt).get(); 
+0

Надлежащим образом осуществленная перешеек и властвуй должны работать. –

+0

"Использование только Executor.workStealingPool()" - это фасад. В эталонной реализации это [просто 'ForkJoinPool'] (http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/8u40-b25/java/util/concurrent/Executors.java # Executors.newWorkStealingPool% 28% 29). Документация 'workStealingPool()' не очень исчерпывающая в отношении того, что на самом деле представляет собой «пул потоков обработки». Например, ваш код полагается на то, что в пуле будет создан 'Future', метод 'get()' поможет выполнить другие ожидающие задачи, истинные для 'ForkJoinPool', но все ли« пулы для обработки работы »поддерживают это? – Holger

ответ

0

Кажется, что его можно написать вычисление типа «вилка/присоединиться к» без рамок ForkJoin (см использование отзывной ниже). Сам интерфейс ForkJoin, по-видимому, не имеет разницы в производительности, но, возможно, немного более аккуратный для кодирования, я предпочитаю использовать только Callables.

Я также исправил первоначальную попытку. Похоже, что порог был слишком мал по первоначальной попытке, поэтому он был медленным, я думаю, он должен быть как минимум размером с число ядер.

Я не уверен, что если использование ForkJoinPool будет быстрее для этого использования, мне нужно будет собрать больше статистики, я не думаю, что у меня нет никаких операций, которые блокируются в течение длительного времени.

public class Main { 

static class FindMaxTask extends RecursiveTask<Double> { 

    private int threshold; 
    private double[] data; 
    private int startIndex; 
    private int endIndex; 

    public FindMaxTask(double[] data, int startIndex, int endIndex, int threshold) { 
     super(); 
     this.data = data; 
     this.startIndex = startIndex; 
     this.endIndex = endIndex; 
     this.threshold = threshold; 
    } 


    @Override 
    protected Double compute() { 
     int diff = (endIndex-startIndex+1); 
     if (diff!=(this.data.length/threshold)){ 
      int aStartIndex = startIndex; 
      int aEndIndex = startIndex + (diff/2) - 1; 
      int bStartIndex = startIndex + (diff/2); 
      int bEndIndex = endIndex; 

      FindMaxTask f1 = new FindMaxTask(this.data,aStartIndex,aEndIndex,threshold); 
      f1.fork(); 
      FindMaxTask f2 = new FindMaxTask(this.data,bStartIndex,bEndIndex,threshold); 
      return Math.max(f1.join(),f2.compute()); 
     } else { 
      double max = Double.MIN_VALUE; 
      for (int i = startIndex; i <= endIndex; i++) { 
       double n = data[i]; 
       if (n > max) { 
        max = n; 
       } 
      } 
      return max; 
     } 
    } 

} 

static class FindMax implements Callable<Double> { 

    private double[] data; 
    private int startIndex; 
    private int endIndex; 
    private int threshold; 

    private ExecutorService executorService; 

    public FindMax(double[] data, int startIndex, int endIndex, int threshold, ExecutorService executorService) { 
     super(); 
     this.data = data; 
     this.startIndex = startIndex; 
     this.endIndex = endIndex; 
     this.executorService = executorService; 
     this.threshold = threshold; 
    } 



    @Override 
    public Double call() throws Exception { 
     int diff = (endIndex-startIndex+1); 
     if (diff!=(this.data.length/this.threshold)){ 
      int aStartIndex = startIndex; 
      int aEndIndex = startIndex + (diff/2) - 1; 
      int bStartIndex = startIndex + (diff/2); 
      int bEndIndex = endIndex; 

      Future<Double> f1 = this.executorService.submit(new FindMax(this.data,aStartIndex,aEndIndex,this.threshold,this.executorService)); 
      Future<Double> f2 = this.executorService.submit(new FindMax(this.data,bStartIndex,bEndIndex,this.threshold,this.executorService)); 
      return Math.max(f1.get(), f2.get()); 
     } else { 
      double max = Double.MIN_VALUE; 
      for (int i = startIndex; i <= endIndex; i++) { 
       double n = data[i]; 
       if (n > max) { 
        max = n; 
       } 
      } 
      return max; 
     } 
    } 

} 

public static void main(String[] args) throws InterruptedException, ExecutionException { 

    double[] data = new double[1024*1024*64]; 
    for (int i=0;i<data.length;i++){ 
     data[i] = Math.random(); 
    } 

    int p = Runtime.getRuntime().availableProcessors(); 
    int threshold = p; 
    int threads = p; 
    Instant start = null; 
    Instant end = null; 

    ExecutorService es = null; 
    es = Executors.newFixedThreadPool(threads); 
    System.out.println("1. started.."); 
    start = Instant.now(); 
    System.out.println("max = "+es.submit(new FindMax(data,0,data.length-1,threshold,es)).get()); 
    end = Instant.now(); 
    System.out.println("Callable (recrusive), with fixed pool, Find Max took ms = "+ Duration.between(start, end).toMillis()); 

    es = new ForkJoinPool(); 
    System.out.println("2. started.."); 
    start = Instant.now(); 
    System.out.println("max = "+es.submit(new FindMax(data,0,data.length-1,threshold,es)).get()); 
    end = Instant.now(); 
    System.out.println("Callable (recursive), with fork join pool, Find Max took ms = "+ Duration.between(start, end).toMillis()); 

    ForkJoinPool fj = new ForkJoinPool(threads); 
    System.out.println("3. started.."); 
    start = Instant.now(); 
    System.out.println("max = "+fj.invoke(new FindMaxTask(data,0,data.length-1,threshold))); 
    end = Instant.now(); 
    System.out.println("RecursiveTask (fork/join framework),with fork join pool, Find Max took ms = "+ Duration.between(start, end).toMillis()); 
} 

}

Смежные вопросы