2013-12-01 4 views
0

С буфером кольцевого прерывания я могу получить только 6 миллионов операций в секунду. Мне интересно, где я ошибаюсь. Мой обработчик событий просто накладывает счетчик. Это с Single Producer и Single Consumer. Может кто-нибудь сказать мне, если я ошибаюсь в самой семантике. Программа создает поток производителей, который добавляется в буфер. И создает обработчик события для обработки события публикации. Каждый раз, когда событие публикуется, обработчик событий увеличивает счетчик волатильности.LMAX Disruptor SPSC - 6 миллионов операционных систем в секунду

public class MainClass{ 

    public static class globalVariables 
    { 
     static int NUMBER_OF_ITERATIONS = 33554432; // 2 power 25  
     static int NUMBER_OF_THREADS; 
     static int RING_SIZE = NUMBER_OF_ITERATIONS; 
     static int WRITE_CODE = 1; 

     static volatile int keep_going = 1; 
    }; 

    public void start_execution() 
    {    
     int remainder = globalVariables.NUMBER_OF_ITERATIONS % globalVariables.NUMBER_OF_THREADS;    
     int iterations_per_thread = (globalVariables.NUMBER_OF_ITERATIONS - remainder)/globalVariables.NUMBER_OF_THREADS ; 


     /* New Shared Object */ 
     final sharedObject newSharedObject = new sharedObject(); 


    ExecutorService exec = Executors.newFixedThreadPool(1); 
     Disruptor<valueEvent> disruptor = new Disruptor<valueEvent>(valueEvent.EVENT_FACTORY, globalVariables.RING_SIZE, exec); 


     /* Creating event handler whenever an item is published in the queue */ 

      final EventHandler<valueEvent> handler = new EventHandler<valueEvent>() 
      { 
       public void onEvent(final valueEvent event, final long sequence, 
          final boolean endOfBatch) throws Exception 
       {   
       newSharedObject.shared_variable++; // increment the shared variable 
       } 
      }; 


      /* Use the above handler to handler events */ 
      disruptor.handleEventsWith(handler); 


    /* start Disruptor */ 
    final RingBuffer<valueEvent> ringBuffer = disruptor.start(); 


    final long[] runtime = new long [globalVariables.NUMBER_OF_THREADS]; 
    /* Code the producer thread */ 
    final class ProducerThread extends Thread { 
     int i; 

     public ProducerThread(int i) 
     { 
      this.i = i; 
     } 

     public void run() 
     { 
      long idle_counter = 0; 
      long count; 

      System.out.println("In thread "+i); 

      long startTime = System.nanoTime(); 

      //while(globalVariables.keep_going == 1) 
      for(int counter=0; counter<globalVariables.NUMBER_OF_ITERATIONS; counter++) 
      { 
       // Publishers claim events in sequence 
       long sequence = ringBuffer.next(); 
       valueEvent event = ringBuffer.get(sequence); 

       event.setValue(globalVariables.WRITE_CODE); 

       // make the event available to EventProcessors 
       ringBuffer.publish(sequence); 
      } 

      long stopTime = System.nanoTime(); 
      runtime[i] = (stopTime - startTime)/1000; 
     } 
    }; 

    /* ------------------------------------------------------------------------------- */  
    //final class AlarmHandler extends TimerTask {  
      /*** Implements TimerTask's abstract run method. */ 
    // @Override public void run(){ 
    //  globalVariables.keep_going = 0; 
    // } 
    // }; 

    /* ------------------------------------------------------------------------------- */ 
    /* Creating Producer threads */ 
    ProducerThread[] threads = new ProducerThread[globalVariables.NUMBER_OF_THREADS]; 
    for (int i = 0; i < globalVariables.NUMBER_OF_THREADS; i++) { 
     threads[i] = new ProducerThread(i); 
     threads[i].start(); 
    } 

    // Waiting for the threads to finish 
    for (int i = 0; i < globalVariables.NUMBER_OF_THREADS; i++) { 
     try 
     { 
     threads[i].join(); 
     } catch (InterruptedException e) { System.out.println("hi exception :)"); } 
    } 

    /* shutdown */  
    disruptor.shutdown(); 
    exec.shutdown(); 

    /* Printing Statistics */ 
    System.out.println("Shared Variable: " + newSharedObject.shared_variable); 
    for (int i=0; i<globalVariables.NUMBER_OF_THREADS; i++) 
    { 
     System.out.println("Runtime="+ runtime[i] + "; Operations per second = " + (globalVariables.NUMBER_OF_ITERATIONS/runtime[i])*1000000 +"ops/sec");   
    }  

    } 

    public static void main(String args[]) 
    { 
     globalVariables.NUMBER_OF_THREADS = Integer.parseInt(args[0]);  

     System.out.println("Number of Threads = "+ globalVariables.NUMBER_OF_THREADS); 

     MainClass mainObj = new MainClass(); 
     mainObj.start_execution(); 
     System.exit(0); 
    } 
}; 

Вот выход из программы

Общая переменная: 33554432; Runtime = 5094139 микросекунд; Операции в секунду = 6000000

Любая помощь будет высоко оценена.

ответ

0

Поскольку вы запускаете обработчик событий в одном потоке и не используете это состояние, вы должны получить значительно лучшую производительность (но все же правильную функцию), используя обработчик событий в энергонезависимом поле. Разрушитель гарантирует, что обработчик обрабатывает только одно событие за раз, поэтому вам не нужно беспокоиться об убытках.

Если другой компонент системы зависит от этого значения, появляющегося в определенном порядке (например: это контрольное значение), вы должны подумать о том, чтобы использовать что-то вроде AtomicInteger с lazySet [1].

[1] http://psy-lob-saw.blogspot.com.au/2012/12/atomiclazyset-is-performance-win-for.html

+0

привет спасибо. Итак, вы думаете, что можно ожидать 6 миллионов операций? В базовом исследовании приведена эффективность очереди в 220 млн. Оп/сек. На что относятся эти цифры? – user39617

+0

Я думаю, что более 6 миллионов, возможно, могут быть достигнуты, попробуйте, не используя поле volatile – jasonk

+0

Спасибо, что получил тот же результат. Больше примеров использования для разрушителей определенно будет полезно – user39617

Смежные вопросы