2012-03-07 4 views
2

Я пытаюсь узнать о разнице в производительности между обычной многопотоковой и многопотоковой версией с помощью исполнителя (для поддержания пула потоков).Разница между многопотоком с и без Executor

Ниже приведены примеры кода для обоих.

Без палача кодекса (с многопоточной):

import java.lang.management.ManagementFactory; 
import java.lang.management.MemoryPoolMXBean; 
import java.lang.management.MemoryUsage; 
import java.lang.management.ThreadMXBean; 
import java.util.List; 

public class Demo1 { 
public static void main(String arg[]) { 
    Demo1 demo = new Demo1(); 
    Thread t5 = new Thread(new Runnable() { 
     public void run() { 
       int count=0; 
       // Thread.State; 
       // System.out.println("ClientMsgReceiver started-----"); 
       Demo1.ChildDemo obj = new Demo1.ChildDemo(); 
       while(true) { 

       // System.out.println("Threadcount is"+Thread); 
       // System.out.println("count is"+(count++)); 
       Thread t=new Thread(obj); 
       t.start(); 
       ThreadMXBean tb = ManagementFactory.getThreadMXBean(); 
       List<MemoryPoolMXBean> pools = ManagementFactory.getMemoryPoolMXBeans(); 
       for (MemoryPoolMXBean pool : pools) { 
        MemoryUsage peak = pool.getPeakUsage(); 
        System.out.format("Peak %s memory used: %,d%n", 
          pool.getName(), peak.getUsed()); 
        System.out.format("Peak %s memory reserved: %,d%n", 
          pool.getName(), peak.getCommitted()); 
       } 

       System.out.println("Current Thread Count"+ tb.getThreadCount()); 
       System.out.println("Peak Thread Count"+ tb.getPeakThreadCount()); 
       System.out.println("Current_Thread_Cpu_Time " 
         + tb.getCurrentThreadCpuTime()); 
       System.out.println("Daemon Thread Count" +tb.getDaemonThreadCount()); 
     } 
     // ChatLogin = new ChatLogin(); 
    } 
    }); 
    t5.start(); 
} 

static class ChildDemo implements Runnable { 
    public void run() { 
     try { 
     // System.out.println("Thread Started with custom Run method"); 
      Thread.sleep(100000); 
     } catch (InterruptedException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
     finally { 
      System.out.println("A" +Thread.activeCount()); 
     } 
    } 
    } 
} 

с исполнителем (многопоточность):

import java.lang.management.ManagementFactory; 
import java.lang.management.MemoryPoolMXBean; 
import java.lang.management.MemoryUsage; 
import java.lang.management.ThreadMXBean; 
import java.util.List; 
import java.util.concurrent.ArrayBlockingQueue; 
import java.util.concurrent.BlockingQueue; 
import java.util.concurrent.ThreadPoolExecutor; 
import java.util.concurrent.TimeUnit; 

public class Executor_Demo { 
public static void main(String arg[]) { 
    BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(10); 
    ThreadPoolExecutor executor = new ThreadPoolExecutor(
      10, 100, 10, TimeUnit.MICROSECONDS, queue); 
    Executor_Demo demo = new Executor_Demo(); 

    executor.execute(new Runnable() { 
     public void run() { 
      int count=0; 
      // System.out.println("ClientMsgReceiver started-----"); 
      Executor_Demo.Demo demo2 = new Executor_Demo.Demo(); 
      BlockingQueue<Runnable> queue1 = new ArrayBlockingQueue<Runnable>(1000); 
      ThreadPoolExecutor executor1 = new ThreadPoolExecutor(
        1000, 10000, 10, TimeUnit.MICROSECONDS, queue1); 

      while(true) { 
      // System.out.println("Threadcount is"+Thread); 
      // System.out.println("count is"+(count++)); 
      Runnable command= new Demo(); 
      // executor1.execute(command); 
      executor1.submit(command);   
      // Thread t=new Thread(demo2); 
      // t.start(); 
      ThreadMXBean tb = ManagementFactory.getThreadMXBean(); 
      /* try { 
        executor1.awaitTermination(100, TimeUnit.MICROSECONDS); 
       } catch (InterruptedException e) { 
        // TODO Auto-generated catch block 
        e.printStackTrace(); 
       } */ 
       List<MemoryPoolMXBean> pools = ManagementFactory.getMemoryPoolMXBeans(); 
       for (MemoryPoolMXBean pool : pools) { 
       MemoryUsage peak = pool.getPeakUsage(); 
       System.out.format("Peak %s memory used: %,d%n", 
          pool.getName(), peak.getUsed()); 
       System.out.format("Peak %s memory reserved: %,d%n", 
          pool.getName(), peak.getCommitted()); 
      } 
       System.out.println("daemon threads"+tb.getDaemonThreadCount()); 
       System.out.println("All threads"+tb.getAllThreadIds()); 
       System.out.println("current thread CPU time " 
         + tb.getCurrentThreadCpuTime()); 
       System.out.println("current thread user time " 
         + tb.getCurrentThreadUserTime()); 
       System.out.println("Total started thread count " 
         + tb.getTotalStartedThreadCount()); 
       System.out.println("Current Thread Count"+ tb.getThreadCount()); 
       System.out.println("Peak Thread Count"+ tb.getPeakThreadCount()); 
       System.out.println("Current_Thread_Cpu_Time " 
         + tb.getCurrentThreadCpuTime()); 
       System.out.println("Daemon Thread Count" 
         + tb.getDaemonThreadCount()); 
       // executor1.shutdown(); 
      } 
      //ChatLogin = new ChatLogin(); 
      } 
    }); 
    executor.shutdown(); 
} 

static class Demo implements Runnable { 
    public void run() { 
     try { 
     // System.out.println("Thread Started with custom Run method"); 
     Thread.sleep(100000); 
     } catch (InterruptedException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
     finally { 
     System.out.println("A" +Thread.activeCount()); 
     } 
    } 
    } 
} 

Пример вывода Output

Когда я запускаю обе программы, то получается, что исполнитель дороже обычного мультиплексора резьб. почему это так?

И учитывая это, что именно используется исполнителем? Мы используем исполнителя для управления пулами потоков.

Я бы ожидал, что исполнитель даст лучшие результаты, чем обычный многопоточность.

В основном я делаю это, так как мне нужно обрабатывать миллионы клиентов, используя программирование сокетов с многопотоковой обработкой.

Любые предложения будут полезны.

+0

«Миллионы клиентов» все сразу? Сколько * параллельных * соединений необходимо поддерживать? –

+0

миллионов одновременных подключений. Мне нужно поддерживать. – Java

+1

Для начала я бы не стал делать это на одной машине, и я бы даже не подумал об этом с потоком на одного клиента с исполнителем или без него. Первое, на что вам нужно обратить внимание - это асинхронный IO ... в этот момент вы можете работать с очень небольшим количеством потоков. Кроме того, при выполнении бенчмаркинга я бы выбрал более реалистичный тест - в вашем реальном коде вы бы не просто непрерывно * непрерывно добавляли больше потоков, которые ничего не делали, кроме сна, не так ли? –

ответ

2

Чтобы увидеть, как что-то масштабируется, я попытался бы свести стоимость мониторинга до минимума, и я бы сравнил небольшое число с большим числом.

public class Executor_Demo { 
    public static void main(String... arg) throws ExecutionException, InterruptedException { 
     int nThreads = 5100; 
     ExecutorService executor = Executors.newFixedThreadPool(nThreads, new DaemonThreadFactory()); 

     List<Future<Results>> futures = new ArrayList<Future<Results>>(); 
     for (int i = 0; i < nThreads; i++) { 
      futures.add(executor.submit(new BackgroundCallable())); 
     } 
     Results result = new Results(); 
     for (Future<Results> future : futures) { 
      result.merge(future.get()); 
     } 
     executor.shutdown(); 

     result.print(System.out); 

    } 

    static class Results { 
     private long cpuTime; 
     private long userTime; 

     Results() { 
      final ThreadMXBean tb = ManagementFactory.getThreadMXBean(); 
      cpuTime = tb.getCurrentThreadCpuTime(); 
      userTime = tb.getCurrentThreadUserTime(); 
     } 


     public void merge(Results results) { 
      cpuTime += results.cpuTime; 
      userTime += results.userTime; 
     } 

     public void print(PrintStream out) { 
      ThreadMXBean tb = ManagementFactory.getThreadMXBean(); 

      List<MemoryPoolMXBean> pools = ManagementFactory.getMemoryPoolMXBeans(); 
      for (int i = 0, poolsSize = pools.size(); i < poolsSize; i++) { 
       MemoryPoolMXBean pool = pools.get(i); 
       MemoryUsage peak = pool.getPeakUsage(); 
       out.format("Peak %s memory used:\t%,d%n", pool.getName(), peak.getUsed()); 
       out.format("Peak %s memory reserved:\t%,d%n", pool.getName(), peak.getCommitted()); 
      } 

      out.println("Total thread CPU time\t" + cpuTime); 
      out.println("Total thread user time\t" + userTime); 
      out.println("Total started thread count\t" + tb.getTotalStartedThreadCount()); 
      out.println("Current Thread Count\t" + tb.getThreadCount()); 
      out.println("Peak Thread Count\t" + tb.getPeakThreadCount()); 
      out.println("Daemon Thread Count\t" + tb.getDaemonThreadCount()); 
     } 
    } 

    static class DaemonThreadFactory implements ThreadFactory { 
     @Override 
     public Thread newThread(Runnable r) { 
      Thread t = new Thread(r); 
      t.setDaemon(true); 
      return t; 
     } 
    } 

    static class BackgroundCallable implements Callable<Results> { 
     @Override 
     public Results call() throws Exception { 
      Thread.sleep(100); 
      return new Results(); 
     } 
    } 
} 

при испытании с -XX:MaxNewSize=64m (это ограничивает размер временных ячеек памяти увеличится)

100 threads 
Peak Code Cache memory used: 386,880 
Peak Code Cache memory reserved: 2,555,904 
Peak PS Eden Space memory used: 41,280,984 
Peak PS Eden Space memory reserved: 50,331,648 
Peak PS Survivor Space memory used: 0 
Peak PS Survivor Space memory reserved: 8,388,608 
Peak PS Old Gen memory used: 0 
Peak PS Old Gen memory reserved: 192,675,840 
Peak PS Perm Gen memory used: 3,719,616 
Peak PS Perm Gen memory reserved: 21,757,952 
Total thread CPU time 20000000 
Total thread user time 20000000 
Total started thread count 105 
Current Thread Count 93 
Peak Thread Count 105 
Daemon Thread Count 92 

5100 threads 
Peak Code Cache memory used: 425,728 
Peak Code Cache memory reserved: 2,555,904 
Peak PS Eden Space memory used: 59,244,544 
Peak PS Eden Space memory reserved: 59,244,544 
Peak PS Survivor Space memory used: 2,949,152 
Peak PS Survivor Space memory reserved: 8,388,608 
Peak PS Old Gen memory used: 3,076,400 
Peak PS Old Gen memory reserved: 192,675,840 
Peak PS Perm Gen memory used: 3,787,096 
Peak PS Perm Gen memory reserved: 21,757,952 
Total thread CPU time 810000000 
Total thread user time 150000000 
Total started thread count 5105 
Current Thread Count 5105 
Peak Thread Count 5105 
Daemon Thread Count 5104 

Основной прирост является увеличение старого ген используется ~ 3 МБ или около 6 КБ для каждого потока. и процессор, используемый в 956 мс или около 0,2 мс на поток.


В первом примере, вы создаете одну нить, во втором вы создаете 1000.

Выход вы выполняете, кажется, большую часть работы, и у вас есть гораздо больше продукции в второй случай, чем первый.

Вы должны быть уверены, что ваши испытания и мониторинг гораздо более легкие, чем вы хотите контролировать/измерять.

+0

+1, но для уточнения: контрольный показатель недействителен. –

+0

@ JimGarrison. Тест показывает, что второй подход мониторинга медленнее первого. Вы должны быть осторожны относительно того, что вы измеряете. ;) –

+0

Во втором случае это не создает 1000 потоков. Он уже создал пул из 1000 'потоков. exector.submit() 'отправит задачу в один из потоков из пула потоков исполнителей. '@ PeterLawrey' исправьте меня, если я ошибаюсь. – Java

2

Каждая нить потребляет память для стека, что-то от 256 К до 1 М. Вы можете установить размер стека вручную, но установить его ниже 128K опасно. Итак, если у вас есть память 2G и вы можете потратить 1/2 на потоки, у вас будет не более 8K потоков. Если это нормально для вас, используйте обычную многопоточность (каждый Runnable имеет свой собственный стек). Если вы не хотите или не можете тратить столько памяти на каждый Runnable, используйте Executor. Установите размер пула потоков на количество процессоров (Runtime.availableProcessors()) или в несколько раз больше. Основная проблема заключается в том, что вы не можете создавать Thread.sleep() или иным образом блокировать поток в вашей runnable (например, ждать ответа пользователя), потому что такая блокировка эффективно исключает поток из обслуживания.В результате, если вы используете пул потоков ограниченного размера, происходит так называемое «потоковое голодание», которое фактически является тупиком. Если ваш пул потоков имеет неограниченный размер, вы возвращаетесь к обычной многопоточности и скоро исчерпаете память.

Исключением является использование асинхронных операций, то есть настройка запроса с обратным вызовом и выход из метода run(). Затем обратный вызов должен запустить выполнение некоторого объекта Runnable (возможно, того же самого) с Executor.execute (Runnable) или выполнить сам метод runnable.run().

Асинхронные операции ввода-вывода теперь присутствуют в Java 7 (nio2), но я не смог заставить его обслуживать более нескольких сотен сетевых подключений. Для обслуживания сетевых подключений могут использоваться асинхронные сетевые библиотеки (например, Apache Netty).

Организация обратных вызовов и выполнение runnables может потребовать сложной синхронизации. Чтобы облегчить жизнь, подумайте о том, чтобы использовать модель Actor (http://en.wikipedia.org/wiki/Actor_model), где Actor выполняет Runnable каждый раз при поступлении входного сообщения. Существуют многочисленные библиотеки Java Actor (например, https://github.com/rfqu/df4j).

+0

В соответствии с вашим ответом, если исполнитель должен использовать меньшее время и использование памяти, то почему исполнитель использует больше времени и памяти в примере исполнителя, чем обычный пример потоковой передачи. Посмотрите на мое образцовое выходное изображение. – Java

+0

Я никогда не говорил, что исполнитель должен использовать меньше времени. Я сказал, что исполнитель может использовать меньше потоков (и, следовательно, меньше памяти), но ваши образцы показывают, что вы используете большое количество потоков в примере исполнителей, поэтому ваш пример плохо разработан. –

Смежные вопросы