2009-10-30 2 views
15

Я пытаюсь выяснить, как правильно использовать исполнителей Java. Я понимаю, что выполнение задач на ExecutorService имеет свои накладные расходы. Тем не менее, я удивлен, увидев, что он такой высокий, насколько он есть.Исключительная точка безубыточности ExecutorService - эмпирические правила?

Моя программа должна обрабатывать огромное количество данных (данные на фондовом рынке) с минимальной задержкой, насколько это возможно. Большинство вычислений - довольно простые арифметические операции.

Я пытался проверить что-то очень простое: «Math.random() * Math.random()»

Самый простой тест выполняется это вычисление в простом цикле. Второй тест выполняет одно и то же вычисление внутри анонимного Runnable (это должно измерять стоимость создания новых объектов). Третий тест проходит Runnable до ExecutorService (это измеряет стоимость введения исполнителей).

Я побежал испытания на мой изящный ноутбук (2 процессора, 1,5 гиг ​​ОЗУ):

(in milliseconds) 
simpleCompuation:47 
computationWithObjCreation:62 
computationWithObjCreationAndExecutors:422 

(примерно один раз из четырех трасс, первые две цифры в конечном итоге равны)

Примечание что исполнители занимают гораздо больше времени, чем выполнение в одном потоке. Цифры были примерно одинаковыми для размеров пулов потоков от 1 до 8.

Вопрос: Я пропустил что-то очевидное или ожидаются ли эти результаты? Эти результаты говорят мне, что любая задача, которую я передаю исполнителю, должна выполнять некоторые нетривиальные вычисления. Если я обрабатываю миллионы сообщений, и мне нужно выполнить очень простые (и дешевые) преобразования для каждого сообщения, я все равно не смогу использовать исполнителей ... попытка распространения вычислений на нескольких процессорах может оказаться дороже, чем просто делая их в одном потоке. Конструктивное решение становится намного сложнее, чем я думал изначально. Есть предположения?


import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.TimeUnit; 

public class ExecServicePerformance { 

private static int count = 100000; 

public static void main(String[] args) throws InterruptedException { 

    //warmup 
    simpleCompuation(); 
    computationWithObjCreation(); 
    computationWithObjCreationAndExecutors(); 

    long start = System.currentTimeMillis(); 
    simpleCompuation(); 
    long stop = System.currentTimeMillis(); 
    System.out.println("simpleCompuation:"+(stop-start)); 

    start = System.currentTimeMillis(); 
    computationWithObjCreation(); 
    stop = System.currentTimeMillis(); 
    System.out.println("computationWithObjCreation:"+(stop-start)); 

    start = System.currentTimeMillis(); 
    computationWithObjCreationAndExecutors(); 
    stop = System.currentTimeMillis(); 
    System.out.println("computationWithObjCreationAndExecutors:"+(stop-start)); 


} 

private static void computationWithObjCreation() { 
    for(int i=0;i<count;i++){ 
    new Runnable(){ 

    @Override 
    public void run() { 
    double x = Math.random()*Math.random(); 
    } 

    }.run(); 
    } 

} 

private static void simpleCompuation() { 
    for(int i=0;i<count;i++){ 
    double x = Math.random()*Math.random(); 
    } 

} 

private static void computationWithObjCreationAndExecutors() 
    throws InterruptedException { 

    ExecutorService es = Executors.newFixedThreadPool(1); 
    for(int i=0;i<count;i++){ 
    es.submit(new Runnable() { 
    @Override 
    public void run() { 
    double x = Math.random()*Math.random();  
    } 
    }); 
    } 
    es.shutdown(); 
    es.awaitTermination(10, TimeUnit.SECONDS); 
} 
} 
+0

Ничего себе, превью форматировал код намного лучше конечного результата. Как я могу это исправить? – Shahbaz

+1

Я просто переформатировал его, посмотрел лучше? –

+0

Спасибо ZZ Coder, код теперь выглядит так, как должно – Shahbaz

ответ

19
  1. Использование исполнителей о использовании процессоров и/или ядрах процессора, так что если вы создаете пул потоков, который использует количество процессоров, в лучшем случае, вы должны иметь стольких потоков, как ЦП/ядра.
  2. Вы правы, создание новых объектов слишком дорого. Таким образом, одним из способов сокращения расходов является использование партий. Если вы знаете объем и количество вычислений, вы создаете партии. Поэтому подумайте о тысячах вычислений, выполненных в одной выполненной задаче. Вы создаете партии для каждого потока. Как только вычисление будет выполнено (java.util.concurrent.Future), вы создадите следующую партию. Даже создание новых партий может быть выполнено в parralel (4 процессора -> 3 потока для вычисления, 1 поток для пакетного обеспечения). В итоге вы можете получить большую пропускную способность, но с более высокими требованиями к памяти (партии, подготовка).

Редактировать: Я изменил ваш пример, и я разрешил ему работать на моем маленьком двухъядерном ноутбуке x200.

provisioned 2 batches to be executed 
simpleCompuation:14 
computationWithObjCreation:17 
computationWithObjCreationAndExecutors:9 

Как вы видите в исходном коде, я взял пакетную инициализацию и исполнитель жизненный цикл из измерения, тоже. Это более справедливо по сравнению с двумя другими методами.

Смотрите результаты сами ...

import java.util.List; 
import java.util.Vector; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.TimeUnit; 

public class ExecServicePerformance { 

    private static int count = 100000; 

    public static void main(String[] args) throws InterruptedException { 

     final int cpus = Runtime.getRuntime().availableProcessors(); 

     final ExecutorService es = Executors.newFixedThreadPool(cpus); 

     final Vector<Batch> batches = new Vector<Batch>(cpus); 

     final int batchComputations = count/cpus; 

     for (int i = 0; i < cpus; i++) { 
      batches.add(new Batch(batchComputations)); 
     } 

     System.out.println("provisioned " + cpus + " batches to be executed"); 

     // warmup 
     simpleCompuation(); 
     computationWithObjCreation(); 
     computationWithObjCreationAndExecutors(es, batches); 

     long start = System.currentTimeMillis(); 
     simpleCompuation(); 
     long stop = System.currentTimeMillis(); 
     System.out.println("simpleCompuation:" + (stop - start)); 

     start = System.currentTimeMillis(); 
     computationWithObjCreation(); 
     stop = System.currentTimeMillis(); 
     System.out.println("computationWithObjCreation:" + (stop - start)); 

     // Executor 

     start = System.currentTimeMillis(); 
     computationWithObjCreationAndExecutors(es, batches);  
     es.shutdown(); 
     es.awaitTermination(10, TimeUnit.SECONDS); 
     // Note: Executor#shutdown() and Executor#awaitTermination() requires 
     // some extra time. But the result should still be clear. 
     stop = System.currentTimeMillis(); 
     System.out.println("computationWithObjCreationAndExecutors:" 
       + (stop - start)); 
    } 

    private static void computationWithObjCreation() { 

     for (int i = 0; i < count; i++) { 
      new Runnable() { 

       @Override 
       public void run() { 

        double x = Math.random() * Math.random(); 
       } 

      }.run(); 
     } 

    } 

    private static void simpleCompuation() { 

     for (int i = 0; i < count; i++) { 
      double x = Math.random() * Math.random(); 
     } 

    } 

    private static void computationWithObjCreationAndExecutors(
      ExecutorService es, List<Batch> batches) 
      throws InterruptedException { 

     for (Batch batch : batches) { 
      es.submit(batch); 
     } 

    } 

    private static class Batch implements Runnable { 

     private final int computations; 

     public Batch(final int computations) { 

      this.computations = computations; 
     } 

     @Override 
     public void run() { 

      int countdown = computations; 
      while (countdown-- > -1) { 
       double x = Math.random() * Math.random(); 
      } 
     } 
    } 
} 
+0

Интересное решение. Дает мне некоторые идеи о том, как изменить мое использование исполнителей. – Shahbaz

+0

+1, очень хороший пример. –

+0

привет, если я запускаю этот пример на MacOSX двухъядерный, я получил: simpleComputation: 268 computationWithObjCreation: 155 computation2: 0, , так как результат computationWithObjCreationAndExecutors не получен? Если я переместил es.shutdown() и es.awaitTermination, прежде чем взять время остановки, то результат: предоставлен: 2 порции должны быть выполнены simpleComputation: 261 computationWithObjCreation: 92 computationWithObjCreationAndExecutors: 126 где computationWithObjCreationAndExecutors последовательно выполняет хуже, чем вычислениеWithObjCreation. Почему это происходит? – portoalet

6

Это не справедливо испытание для пула потоков для следующих причин,

  1. Вы не пользуясь аккумулирования на всех, потому что у вас есть только 1 поток.
  2. Работа слишком проста, что накладные расходы пула не могут быть оправданы. Умножение на CPU с FPP занимает всего несколько циклов.

Учитывая следующие дополнительные шаги, пул потоков должен сделать, кроме создания объекта и выполнения задания,

  1. Поместите работу в очереди
  2. Удалить задание из очереди
  3. Получить нить из бассейна и выполнить задание
  4. Вернуть нитку в бассейн

Когда у вас есть реальная работа и несколько потоков, преимущество пула потоков будет очевидным.

+1

I второй ZZ Coder; по моему опыту, преимущества станут более очевидными, когда ваш пул потоков больше. – Everyone

+0

Исполнитель не должен «получать» и «возвращать» поток. Он создает внутренний рабочий поток, который poll() представляет собой очередь задач. Кроме того, учитывая низкую сложность задачи, вероятно, преимущество заключается в использовании только одного потока, в противном случае существует вероятность блокировки в BlockingQueue, которая может быть вызвана и вызывает проблемы с перемещением рабочих потоков в и из Бегущее состояние. Реальная стоимость? Переход к ядру для создания потока, а также вызов операции блокировки в ожидании завершения потока. 100 000 - это не так много. Но извлеченные уроки, настройка производительности требует тестирования. –

+0

Я попробовал размер пула потоков между 1 и 8, все они возвращались примерно с одинаковыми номерами.Я сосредоточился на размере пула 1, потому что я хотел измерить накладные расходы на структуру исполнителя. Ваш комментарий подтверждает, что мне нужно дополнительно изучить внутренние рамки. – Shahbaz

0

Во-первых, есть несколько проблем с микрообъективом. Вы делаете разминки, что хорошо.Тем не менее, лучше запустить тест несколько раз, что должно дать представление о том, действительно ли оно прогрелось и дисперсия результатов. Также, как правило, лучше выполнять проверку каждого алгоритма в отдельных прогонах, иначе вы можете вызвать деоптимизацию при изменении алгоритма.

Задача очень маленькая, хотя я не совсем уверен, насколько малы. Таким образом, количество раз быстрее довольно бессмысленно. В многопоточных ситуациях он будет касаться тех же самых изменчивых местоположений, чтобы потоки могли вызывать очень плохую производительность (используйте экземпляр Random для каждого потока). Также пробег в 47 миллисекунд немного короток.

Конечно, переход к другой теме для крошечной операции не будет быстрым. Разделите задачи на большие размеры, если это возможно. JDK7 выглядит так, как будто он будет иметь инфраструктуру fork-join, которая пытается поддерживать прекрасные задачи из алгоритмов разделения и покорения, предпочитая выполнять задачи в одном потоке в порядке, при этом большие задачи вытягиваются простыми потоками.

+0

Хороший вопрос о запуске теста несколько раз. Я на самом деле выполнял его много раз, я просто вложил один результат. Я понимаю, как улучшить бенчмарк. – Shahbaz

4

Я не думаю, что это совсем реалистично, так как вы создаете новую услугу ИСПОЛНИТЕЛЬ каждый раз, когда вы делаете вызов метода. Если у вас нет очень странных требований, которые кажутся нереальными - обычно вы должны создать службу, когда ваше приложение запустится, а затем отправьте ему задания.

Если вы попробуете повторный бенчмаркинг, но инициализируйте сервис как поле, один раз, вне цикла синхронизации; то вы увидите фактические накладные расходы на отправку Runnables в службу и их запуск самостоятельно.

Но я не думаю, что вы полностью поняли суть вопроса - Исполнители не предназначены для повышения эффективности, они там, чтобы упростить координацию и передачу работы в пул потоков. Они всегда будут менее эффективны, чем просто вызывать Runnable.run() (так как в конце дня служба-исполнитель все равно должна это сделать, после предварительной предварительной уборки). Это когда вы используете их из нескольких потоков, требующих асинхронной обработки, что они действительно блестят.

Также считайте, что вы смотрите на относительную разницу во времени в основном фиксированной стоимости (накладные расходы Executor одинаковы, если ваши задачи занимают 1 мс или 1 час для запуска) по сравнению с очень небольшой переменной величиной (ваш тривиальный запуск).Если служба-исполнитель занимает 5 мс, чтобы выполнить задание 1 мс, это не очень выгодная цифра. Если для выполнения 5-секундной задачи (например, нетривиального SQL-запроса) требуется 5 мс, то это совершенно ничтожно и полностью стоит.

В какой-то степени это зависит от вашей ситуации - если у вас есть чрезвычайно критически важный раздел, выполняющий множество небольших задач, которые не нужно выполнять параллельно или асинхронно, вы ничего не получите от Палач. Если вы обрабатываете более сложные задачи параллельно и хотите реагировать асинхронно (например, webapp), то Executors великолепны.

Независимо от того, являются ли они лучшим выбором для вас, это зависит от вашей ситуации, но на самом деле вам нужно попробовать тесты с реалистичными репрезентативными данными. Я не думаю, что было бы целесообразно делать какие-либо выводы из проведенных вами тестов, если ваши задачи действительно тривиальны (и вы не хотите повторно использовать экземпляр-исполнитель ...).

+0

Я инициализирую исполнителя внутри метода, но не внутри цикла. Я использовал методы просто для того, чтобы тесты были раздельными. Я знаю, что у исполнителей есть накладные расходы, я был удивлен, что он был таким высоким. К сожалению (или, к счастью), большинство моих вычислений действительно являются тривиальными (простая арифметика), за исключением того, что они выполняются на множестве сообщений. Подумайте о системе обмена сообщениями, которая обрабатывает поток сообщений, но преобразование каждого сообщения не слишком дорого. То, что я получаю от этого, - это то, что мне нужно сделать свою программу параллельной с разной детализацией из того, что я изначально думал. – Shahbaz

0

Исправленная цель ThreadPool - использовать уже созданные потоки. Таким образом, прирост производительности проявляется в отсутствии необходимости воссоздавать новый поток при каждом представлении задачи. Следовательно, время остановки должно быть взято внутри поставленной задачи. Просто в последнем утверждении метода run.

2

Math.random() фактически синхронизируется с одним генератором случайных чисел. Вызов Math.random() приводит к значению утверждение для генератора чисел. На самом деле, чем больше потоков у вас есть, тем медленнее это будет.

Из Math.random() Javadoc:

Этот метод должным образом синхронизированы, чтобы правильно использовать более чем один нити. Однако, если многие потоки должны генерировать псевдослучайные номера с большой скоростью, это может уменьшить количество конфликтов для каждого потока до , имеет свой собственный генератор псевдослучайных чисел.

0

Вам необходимо как-то выполнить группу, чтобы представить большие части вычислений для каждого потока (например, группы сборки на основе символа запаса). Я получил лучшие результаты в подобных сценариях, используя Disruptor. У него очень низкие накладные расходы. Тем не менее его важно для групповых заданий, наивный круговой ронин обычно создает много промахов в кеше.

см http://java-is-the-new-c.blogspot.de/2014/01/comparision-of-different-concurrency.html

2

«над головой» вы говорите не имеет ничего общего с ExecutorService, это вызвано несколькими потоками синхронизации на Math.random, создавая блокировки.

Так что да, вы что-то упускаете (и «правильный» ответ ниже на самом деле не правильный).

Вот некоторые Ja 8 коды для демонстрации 8 потоков работают простую функцию, в которой нет никаких блокировок:

import java.util.ArrayList; 
import java.util.List; 
import java.util.concurrent.CountDownLatch; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.TimeUnit; 
import java.util.function.DoubleFunction; 

import com.google.common.base.Stopwatch; 

public class ExecServicePerformance { 

    private static final int repetitions = 120; 
    private static int totalOperations = 250000; 
    private static final int cpus = 8; 
    private static final List<Batch> batches = batches(cpus); 

    private static DoubleFunction<Double> performanceFunc = (double i) -> {return Math.sin(i * 100000/Math.PI); }; 

    public static void main(String[] args) throws InterruptedException { 

     printExecutionTime("Synchronous", ExecServicePerformance::synchronous); 
     printExecutionTime("Synchronous batches", ExecServicePerformance::synchronousBatches); 
     printExecutionTime("Thread per batch", ExecServicePerformance::asynchronousBatches); 
     printExecutionTime("Executor pool", ExecServicePerformance::executorPool); 

    } 

    private static void printExecutionTime(String msg, Runnable f) throws InterruptedException { 
     long time = 0; 
     for (int i = 0; i < repetitions; i++) { 
      Stopwatch stopwatch = Stopwatch.createStarted(); 
      f.run(); //remember, this is a single-threaded synchronous execution since there is no explicit new thread 
      time += stopwatch.elapsed(TimeUnit.MILLISECONDS); 
     } 
     System.out.println(msg + " exec time: " + time); 
    }  

    private static void synchronous() { 
     for (int i = 0; i < totalOperations; i++) { 
      performanceFunc.apply(i); 
     } 
    } 

    private static void synchronousBatches() {  
     for (Batch batch : batches) { 
      batch.synchronously(); 
     } 
    } 

    private static void asynchronousBatches() { 

     CountDownLatch cb = new CountDownLatch(cpus); 

     for (Batch batch : batches) { 
      Runnable r =() -> { batch.synchronously(); cb.countDown(); }; 
      Thread t = new Thread(r); 
      t.start(); 
     } 

     try { 
      cb.await(); 
     } catch (InterruptedException e) { 
      throw new RuntimeException(e); 
     }   
    } 

    private static void executorPool() { 

     final ExecutorService es = Executors.newFixedThreadPool(cpus); 

     for (Batch batch : batches) { 
      Runnable r =() -> { batch.synchronously(); }; 
      es.submit(r); 
     } 

     es.shutdown(); 

     try { 
      es.awaitTermination(10, TimeUnit.SECONDS); 
     } catch (InterruptedException e) { 
      throw new RuntimeException(e); 
     } 

    } 

    private static List<Batch> batches(final int cpus) { 
     List<Batch> list = new ArrayList<Batch>(); 
     for (int i = 0; i < cpus; i++) { 
      list.add(new Batch(totalOperations/cpus)); 
     } 
     System.out.println("Batches: " + list.size()); 
     return list; 
    } 

    private static class Batch { 

     private final int operationsInBatch; 

     public Batch(final int ops) { 
      this.operationsInBatch = ops; 
     } 

     public void synchronously() { 
      for (int i = 0; i < operationsInBatch; i++) { 
       performanceFunc.apply(i); 
      } 
     } 
    } 


} 

Результата таймингов для 120 испытаний 2оК операций (мс):

  • Синхронный Exec время: 9956
  • Синхронные партии Exec время: 9900
  • тема на партию Exec время: 2176
  • Исполнительный продюсер: 1922

Победитель: Исполнитель.

1

Вот результаты на моей машине (OpenJDK 8 на 64-битной Ubuntu 14,0, Thinkpad W530)

simpleCompuation:6 
computationWithObjCreation:5 
computationWithObjCreationAndExecutors:33 

Там, конечно, накладные расходы. Но помните, каковы эти цифры: миллисекунды для 100 k итераций. В вашем случае накладные расходы составляли около 4 микросекунд на итерацию. Для меня накладные расходы составляли около четверти микросекунды.

Накладные расходы - это синхронизация, внутренние структуры данных и, возможно, отсутствие оптимизации JIT из-за сложных кодовых путей (конечно, более сложных, чем ваш цикл for).

Задачи, которые вы действительно хотели бы распараллелить, были бы полезны, несмотря на четверть микросекунды.


FYI, это было бы очень плохим вычислением для распараллеливания. Я поднял нить до 8 (количество ядер):

simpleCompuation:5 
computationWithObjCreation:6 
computationWithObjCreationAndExecutors:38 

Это не ускорилось. Это связано с тем, что Math.random() синхронизирован.

Смежные вопросы