3

Я недавно узнал о LMAX Disruptor и делал некоторые эксперименты. Одна вещь, которая меня озадачивает, - это параметр endOfBatch метода обработчика onEventEventHandler. Рассмотрим мой следующий код. Во-первых, фиктивные сообщения и потребительские классы, которые я называю Test1 и Test1Worker:LMAX Disruptor - что определяет размер партии?

public class Test1 { 

} 

public class Test1Worker implements EventHandler<Test1>{ 
    public void onEvent(Test1 event, long sequence, boolean endOfBatch) { 
     try{ 
      Thread.sleep(500); 
     } 
     catch(Exception e){ 
      e.printStackTrace(); 
     } 
     System.out.println("Received message with sequence " + sequence + ". " 
       + "EndOfBatch = " + endOfBatch); 
    } 
} 

Обратите внимание, что я поставил задержку 500 миллисекунд только в качестве замены некоторого реального мира работы. Я также печать в консоли порядковый номер

И тогда мой класс водителя (который выступает в качестве продюсера) называется DisruptorTest:

public class DisruptorTest { 

    private static Disruptor<Test1> bus1; 

    private static ExecutorService test1Workers; 

    public static void main(String[] args){    
     test1Workers = Executors.newFixedThreadPool(1); 

     bus1 = new Disruptor<Test1>(new Test1Factory(), 8, test1Workers);   
     bus1.handleEventsWith(new Test1Worker()); 
     RingBuffer<Test1> buf1 = bus1.start(); 

     for (int i = 0; i < 10; i++){ 
      long a = System.currentTimeMillis(); 
      long next = buf1.next(); 
      long b = System.currentTimeMillis(); 
      System.out.println("Delay for claiming slot " + i + " is "+ (b - a)); 
      try { 
       Test1 message = buf1.get(next); 
      } catch (Exception e) { 
       e.printStackTrace(); 
      } finally { 
       buf1.publish(next); 
      } 
     } 
    } 

    public static class Test1Factory implements EventFactory<Test1> { 
     public Test1 newInstance() { 
      return new Test1(); 
     } 

    } 
} 

Здесь, после инициализации необходимых веществ, я кормления 10 сообщений к RingBuffer (размер буфера 8) и попытка контролировать несколько вещей - задержку для продюсера для запроса следующего слота в RingBuffer и сообщения с их порядковыми номерами на стороне потребителя, а также определенную последовательность считается окончанием партии.

Теперь, интересно с в 500 мс задержка участвует в обработке каждого сообщения, это то, что я получаю в качестве вывода:

Delay for claiming slot 0 is 0 
Delay for claiming slot 1 is 0 
Delay for claiming slot 2 is 0 
Delay for claiming slot 3 is 0 
Delay for claiming slot 4 is 0 
Delay for claiming slot 5 is 0 
Delay for claiming slot 6 is 0 
Delay for claiming slot 7 is 0 
Received message with sequence 0. EndOfBatch = true 
Delay for claiming slot 8 is 505 
Received message with sequence 1. EndOfBatch = false 
Received message with sequence 2. EndOfBatch = false 
Received message with sequence 3. EndOfBatch = false 
Received message with sequence 4. EndOfBatch = false 
Received message with sequence 5. EndOfBatch = false 
Received message with sequence 6. EndOfBatch = false 
Received message with sequence 7. EndOfBatch = true 
Delay for claiming slot 9 is 3519 
Received message with sequence 8. EndOfBatch = true 
Received message with sequence 9. EndOfBatch = true 

Однако, если я извлекаю 500 мс время ожидания, это то, что я получаю:

Delay for claiming slot 0 is 0 
Delay for claiming slot 1 is 0 
Received message with sequence 0. EndOfBatch = true 
Received message with sequence 1. EndOfBatch = true 
Delay for claiming slot 2 is 0 
Received message with sequence 2. EndOfBatch = true 
Delay for claiming slot 3 is 0 
Received message with sequence 3. EndOfBatch = true 
Delay for claiming slot 4 is 0 
Received message with sequence 4. EndOfBatch = true 
Delay for claiming slot 5 is 0 
Received message with sequence 5. EndOfBatch = true 
Delay for claiming slot 6 is 0 
Received message with sequence 6. EndOfBatch = true 
Delay for claiming slot 7 is 0 
Received message with sequence 7. EndOfBatch = true 
Delay for claiming slot 8 is 1 
Delay for claiming slot 9 is 0 
Received message with sequence 8. EndOfBatch = false 
Received message with sequence 9. EndOfBatch = true 

Так выглядит считается ли определенное сообщение, чтобы быть в конце партии (то есть, размер партии) оказывает влияние задержки обработки сообщений потребителя. Может быть, я здесь глуп, но так ли это должно быть? Каковы причины этого? Что вообще определяет размер партии? Заранее спасибо. Дайте мне знать, если что-либо в моем вопросе неясно.

+0

Вы должны, вероятно, посмотреть некоторые из видео Алексея Шипилева на бенчмаркинг на JVM, если вы хотите углубиться в подробности - http://shipilev.net/ – jasonk

ответ

7

Размер партии определяется исключительно количеством доступных элементов. Поэтому, если в данный момент доступно больше элементов, оно будет включено в пакет. Например, если Disruptor вызывает ваш код, и в очереди есть только один элемент, вы получите один вызов с endOfBatch = true. Если в очереди есть 8 элементов, то они собирают все 8 и отправляют их в одну партию.

В приведенном ниже коде вы можете увидеть, что количество записей «доступно» в очереди, и может быть намного больше, чем «следующий» элемент. Так, например, вы в настоящее время 5, ожидая слота 6, а затем приходят 3 события, доступно будет 8, и вы получите несколько вызовов (6,7,8) в партии.

https://github.com/LMAX-Exchange/disruptor/blob/master/src/main/java/com/lmax/disruptor/BatchEventProcessor.java#L124

final long availableSequence = sequenceBarrier.waitFor(nextSequence); 
while (nextSequence <= availableSequence) 
{ 
    event = dataProvider.get(nextSequence); 
    eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence); 
    nextSequence++; 
} 

касаемо 500ms паузы в элементе 9, обратите внимание, что Disruptor построен с кольцевым буфером, и вы указали количество слотов в буфере, 8 (см второго параметр здесь):

bus1 = new Disruptor<Test1>(new Test1Factory(), 8, test1Workers); 

Если не все потребители потребляли элемент, а ringbuffer на полную мощность (все 8 элементов полные), производитель будет заблокирован от размещения новых событий в буфер. Вы можете попытаться увеличить размер буфера, скажем, 2 миллиона объектов или убедиться, что ваш потребитель быстрее, чем производитель, поэтому очередь не заполняется (удалить сон, который вы уже продемонстрировали).

+0

Спасибо за ваш ответ. Теперь у меня есть четкое представление о размере партии. Однако сейчас я озадачен временем ожидания продюсера. Обратите внимание, что в моем примере (с задержкой 500 мс) продюсер ждал примерно 3500 мс (500 * 7), прежде чем требовать слот 9 (который является слотом 2 после модуляции). Мне интересно, почему продюсер просто не потребовал слот 9 сразу после того, как потребитель получил последовательность 2? Почему он подождал, пока не будет потреблена последовательность 7? Я знаю, что этот вопрос немного не соответствует теме первоначального вопроса, но мне все еще интересно. –

Смежные вопросы