2013-05-08 2 views
2

Мне нужно измерить ту скорость, с которой система программного обеспечения потребляет сообщения из очереди сообщений, и периодически сообщать об этом.Как измерить скорость событий через систему

В частности, сообщения поступают из системы очередности сообщений, и мне нужно сообщать (каждую секунду) о количестве сообщений, полученных в течение нескольких переходящих окон - например, последние секунды, последние 5 секунд, последние 30 секунд и т. д.

Несмотря на то, что я уверен, что смогу это сделать, я не уверен, что я сделаю это самым эффективным способом! Я также уверен, что есть библиотеки для этого (я использую JVM, поэтому Apache Commons Math вспоминает), но я даже не знаю правильных слов для Google! :-)

+0

Прогресс в этом вопросе? У меня есть по существу одно и то же требование. – sceaj

+0

В итоге я написал свой собственный. Я отправлю его в ответ. – dty

ответ

1

Вот что я в итоге написал.

package com.example; 

import java.util.Arrays; 
import java.util.concurrent.atomic.AtomicInteger; 
import java.util.concurrent.locks.Lock; 
import java.util.concurrent.locks.ReentrantLock; 

public class BucketCounter { 
    private final Lock rollLock = new ReentrantLock(); 
    private final int[] bucketSizes; 
    private final int[] buckets; 
    private final int[] intervals; 
    private final AtomicInteger incoming = new AtomicInteger(0); 

    public BucketCounter(int... bucketSizes) { 
     if (bucketSizes.length < 1) { 
      throw new IllegalArgumentException("Must specify at least one bucket size"); 
     } 

     this.bucketSizes = bucketSizes; 
     this.buckets = new int[bucketSizes.length]; 

     Arrays.sort(bucketSizes); 

     if (bucketSizes[0] < 1) { 
      throw new IllegalArgumentException("Cannot have a bucket of size < 1"); 
     } 

     intervals = new int[bucketSizes[bucketSizes.length - 1]]; 
    } 

    public int count(int n) { 
     return incoming.addAndGet(n); 
    } 

    public int[] roll() { 
     final int toAdd = incoming.getAndSet(0); 

     rollLock.lock(); 
     try { 
      final int[] results = new int[buckets.length]; 

      for (int i = 0, n = buckets.length; i < n; i++) { 
       results[i] = buckets[i] = buckets[i] - intervals[bucketSizes[i] - 1] + toAdd; 
      } 

      System.arraycopy(intervals, 0, intervals, 1, intervals.length - 1); 
      intervals[0] = toAdd; 

      return results; 
     } finally { 
      rollLock.unlock(); 
     } 
    } 
} 

Инициализировать его, передавая различные приращения времени (например, 1, 5, 30). Затем организуйте фоновый поток, чтобы вызвать roll() каждый «период времени». Если вы вызываете его каждую секунду, ваши ведра составляют 1, 5 и 30 секунд. Если вы вызываете его каждые 5 секунд, то ваши ведра составляют 5, 25 и 150 секунд и т. Д. В принципе, ведра выражаются в «количестве раз roll() называется»).

roll() также возвращает вам массив текущих подсчетов для каждого ведра. Обратите внимание, что эти числа являются сырыми подсчетами и не усредняются за промежуток времени. Вам нужно будет сделать это подразделение самостоятельно, если вы хотите измерить «ставки», а не «считать».

И, наконец, каждый раз, когда происходит событие, звоните count(). Я создал систему с несколькими из них, и я вызываю count(1) на каждое сообщение, чтобы подсчитывать входящие сообщения, count(message.size()) на каждое сообщение, чтобы подсчитывать входящие байтовые ставки и т. Д.

Надеюсь, что это поможет.

+0

Спасибо за сообщение. Я хотел избежать сложности фоновых потоков. Я работаю над решением, основанным на экспоненциальном сглаживании. Я отправлю его, когда закончим. – sceaj

-1

Возможно, вы можете реализовать его как перехватчик, поэтому выполните поиск перехватчика в сочетании с именем продукта очереди сообщений и именем языка.

+0

Спасибо за ваш ответ, но я не уверен, что это значит! Я понятия не имею, что такое «перехватчик», если вы не говорите о чем-то вроде АОП. И конкретный продукт и язык массового обслуживания сообщений в основном неактуальны. – dty

+0

@dty Вы также можете попробовать «обратный вызов». –

5

Вот мое решение, основанное на экспоненциальном сглаживании. Он не требует каких-либо фоновых потоков. Вы должны создать 1 экземпляр для каждого перетаскиваемого окна, которое вы хотите отслеживать. Для каждого соответствующего события вы вызываете newEvent для каждого экземпляра.

public class WindowedEventRate { 

    private double normalizedRate; // event rate/window 
    private long windowSizeTicks; 
    private long lastEventTicks; 


    public WindowedEventRate(int aWindowSizeSeconds) { 
    windowSizeTicks = aWindowSizeSeconds * 1000L; 
    lastEventTicks = System.currentTimeMillis(); 
    } 

    public double newEvent() { 

    long currentTicks = System.currentTimeMillis(); 
    long period = currentTicks - lastEventTicks; 
    lastEventTicks = currentTicks; 
    double normalizedFrequency = (double) windowSizeTicks/(double) period; 

    double alpha = Math.min(1.0/normalizedFrequency, 1.0); 
    normalizedRate = (alpha * normalizedFrequency) + ((1.0 - alpha) * normalizedRate); 
    return getRate(); 
    } 

    public double getRate() { 
    return normalizedRate * 1000L/windowSizeTicks; 
    } 
} 
+0

Увлекательный. Можете ли вы объяснить, как это работает? В частности, можете ли вы объяснить, как исторические события удаляются из результата? – dty

+0

Я могу попробовать. Мы хотим выполнить экспоненциальное сглаживание по переменной-члену normalizedRate. Каждый раз, когда мы получаем новое событие, мы обновляем как: normalizedRate (t) = (normalizedFrequency * alpha) + (normalizedRate * (1 - alpha)). Обратите внимание, что моя реализация не справляется с длительным отсутствием событий, то есть она обновляет normalizedRate только при обнаружении нового события, поэтому она будет работать лучше всего, если ожидаемая скорость события не менее 1 за период окна. Я думаю о том, как это можно было бы обработать. Сегодня я попытаюсь лучше документировать вещи и обновить свой ответ. – sceaj

+0

Более непосредственно на ваш вопрос. Поэтому при каждом обновлении мы вычисляем новое значение, которое является частью нового «измерения» и части старого значения. Поиск «Экспоненциальное сглаживание» или «Фильтр Калмана». У меня нет сильной математической основы для этого, но в моей реализации я назначаю вес нового измерения пропорции отображаемого окна. – sceaj

Смежные вопросы