2011-01-03 5 views
24

Я использую BlockingQueue: s (пытается как ArrayBlockingQueue, так и LinkedBlockingQueue) передавать объекты между различными потоками в приложении, в котором я сейчас работаю. Производительность и латентность относительно важны в этом приложении, поэтому мне было любопытно, сколько времени требуется для передачи объектов между двумя потоками с помощью BlockingQueue. Чтобы измерить это, я написал простую программу с двумя потоками (один потребитель и один производитель), где я разрешаю производителю передавать временную метку (полученную с использованием System.nanoTime()) для потребителя, см. Код ниже.Java BlockingQueue latency high на Linux

Я помню, как читал где-то на каком-то форуме, что потребовалось около 10 микросекунд для кого-то другого, кто это пробовал (не знаю, на какой ОС и аппаратном обеспечении было), поэтому я не был слишком удивлен, когда потребовалось ~ 30 микросекунд для меня на моем окне Windows 7 (процессор Intel E7500 Core 2 Duo, 2,93 ГГц), в то время как работает много других приложений в фоновом режиме. Тем не менее, я был очень удивлен, когда я сделал тот же тест на нашем гораздо более быстром сервере Linux (два четырехъядерных процессора Intel X5677 3.46GHz, работающих под управлением Debian 5 с ядром 2.6.26-2-amd64). Я ожидал, что латентность будет ниже, чем у моего окна, но, наоборот, она была намного выше - ~ 75 - 100 микросекунд! Оба теста были выполнены с помощью Sun's Hotspot JVM версии 1.6.0-23.

Проводили ли какие-либо аналогичные тесты с аналогичными результатами в Linux? Или кто-нибудь знает, почему он намного медленнее в Linux (с лучшим оборудованием), может быть, что переключение потоков просто намного медленнее в Linux по сравнению с Windows? Если это так, кажется, что окна действительно намного лучше подходят для некоторых приложений. Любая помощь, помогающая мне понять относительно высокие показатели, очень ценится.

Edit:
После комментария от DaveC, я также сделал тест, где я ограничило JVM (на машине Linux) на одном ядре (т.е. все темы, работающие на том же ядре). Это резко изменило результаты - латентность снизилась до менее 20 микросекунд, то есть лучше, чем результаты на машине Windows. Я также провел несколько тестов, в которых я ограничил поток производителей одним ядром и потребительским потоком на другой (пытаясь как иметь их в одном и том же сокете и в разных сокетах), но это, похоже, не помогло - латентность все еще была ~ 75 микросекунд. Кстати, это тестовое приложение - это почти все, что я запускаю на машине во время тестирования.

Кто-нибудь знает, имеют ли эти результаты смысл? Должно ли быть действительно намного медленнее, если производитель и потребитель работают на разных ядрах? Любой вход действительно оценен.

Edited снова (6 января):
Я экспериментировал с различными изменениями в коде и работает среды:

  1. Я обновил ядро ​​Linux 2.6.36.2 с (от 2.6.26.2). После обновления ядра измеренное время изменилось на 60 микросекунд с очень небольшими вариациями, начиная с 75-100 до обновления. Настройка близости процессора к потоку производителя и потребителя не имела никакого эффекта, за исключением случаев, когда они ограничивали их одним ядром. При работе на одном и том же ядре измеряемая латентность составляла 13 микросекунд.

  2. В исходном коде у меня был продюсер, который спал в течение 1 секунды между каждой итерацией, чтобы дать потребителю достаточно времени, чтобы вычислить прошедшее время и распечатать его на консоли. Если я удалю вызов Thread.sleep() и вместо этого позволю как барьер производителя, так и потребительский вызов.await() на каждой итерации (потребитель называет его после печати прошедшего времени на консоль), измеренная задержка уменьшается с 60 микросекунд до менее 10 микросекунд. При запуске потоков на одном и том же ядре латентность становится ниже 1 микросекунды. Может ли кто-нибудь объяснить, почему это значительно сократило латентность?Мое первое предположение заключалось в том, что изменение привело к тому, что продюсер назвал queue.put() перед тем, как потребитель назвал queue.take(), поэтому потребителю никогда не приходилось блокировать, но после игры с модифицированной версией ArrayBlockingQueue я обнаружил это предположение было ложным - потребитель действительно блокировал. Если у вас есть другие предположения, пожалуйста, дайте мне знать. (Кстати, если я позволю продюсеру вызвать как Thread.sleep(), так и барьер.await(), латентность остается на 60 микросекунд).

  3. Я также пробовал другой подход - вместо вызова queue.take() я вызывал queue.poll() с тайм-аутом в 100 микронов. Это уменьшило среднюю задержку до менее 10 микросекунд, но, конечно, намного интенсивнее процессора (но, вероятно, менее интенсивный процессор, ожидающий ожидание?).

Отредактировано раз (10 января) - Проблема решена:
ninjalj предположил, что задержка ~ 60 мкс было связано с CPU того, чтобы проснуться от более глубоких состояний сна - и он был совершенно прав! После отключения C-состояний в BIOS латентность была уменьшена до < 10 микросекунд. Это объясняет, почему я получил гораздо лучшую задержку в пункте 2 выше - когда я отправлял объекты чаще, процессор был достаточно занят, чтобы не переходить в более глубокие состояния сна. Большое спасибо всем, кто нашел время, чтобы прочитать мой вопрос и поделился своими мыслями!

...

import java.util.concurrent.ArrayBlockingQueue; 
import java.util.concurrent.CyclicBarrier; 

public class QueueTest { 

    ArrayBlockingQueue<Long> queue = new ArrayBlockingQueue<Long>(10); 
    Thread consumerThread; 
    CyclicBarrier barrier = new CyclicBarrier(2); 
    static final int RUNS = 500000; 
    volatile int sleep = 1000; 

    public void start() { 
     consumerThread = new Thread(new Runnable() { 
      @Override 
      public void run() { 
       try { 
        barrier.await(); 
        for(int i = 0; i < RUNS; i++) { 
         consume(); 

        } 
       } catch (Exception e) { 
        e.printStackTrace(); 
       } 
      } 
     }); 
     consumerThread.start(); 

     try { 
      barrier.await(); 
     } catch (Exception e) { e.printStackTrace(); } 

     for(int i = 0; i < RUNS; i++) { 
      try { 
       if(sleep > 0) 
        Thread.sleep(sleep); 
       produce(); 

      } catch (Exception e) { 
       e.printStackTrace(); 
      } 
     } 
    } 

    public void produce() { 
     try { 
      queue.put(System.nanoTime()); 
     } catch (InterruptedException e) { 
     } 
    } 

    public void consume() { 
     try { 
      long t = queue.take(); 
      long now = System.nanoTime(); 
      long time = (now - t)/1000; // Divide by 1000 to get result in microseconds 
      if(sleep > 0) { 
       System.out.println("Time: " + time); 
      } 

     } catch (Exception e) { 
      e.printStackTrace(); 
     } 

    } 

    public static void main(String[] args) { 
     QueueTest test = new QueueTest(); 
     System.out.println("Starting..."); 
     // Run first once, ignoring results 
     test.sleep = 0; 
     test.start(); 
     // Run again, printing the results 
     System.out.println("Starting again..."); 
     test.sleep = 1000; 
     test.start(); 
    } 
} 
+1

вы пробовали тест на коробке linux, ограничивая jvm только одним процессором? может помочь определить, где идет время. – DaveC

+0

Интересно. Я попытался ограничить его конкретным процессором, запустив приложение с помощью команды «taskset 0x00000001 java QueueTest», и латентность была уменьшена примерно с 75-100 до ~20 микросекунд! Я не уверен, что понимаю, хотя ... – Johan

+0

@Johan: Эти времена вы сообщаете одинаково во многих итерациях? CyclicBarrier используется для координации потоков, работающих над независимыми задачами. Ваши задачи, хотя и не являются независимыми. У вас есть как производитель, так и потребитель ждет на барьере, а затем (как только оба потока достигнут барьерной точки), они, по сути, начинают синхронизацию в блокирующей очереди. Вы могли видеть перемежения всех видов комбинаций планирования, описывающих различные задержки. – Cratylus

ответ

3

Я хотел бы использовать только в ArrayBlockingQueue, если вы можете. Когда я использовал его, время ожидания составляло от 8 до 18 микросекунд в Linux. Некоторые замечания.

  • Стоимость в основном зависит от времени, необходимого для пробуждения нити. Когда вы пробуждаете поток, его данные/код не будут находиться в кеше, поэтому вы обнаружите, что если вы заметите, что произойдет после того, как поток проснулся, это может занять 2-5 раз больше, чем если бы вы повторяли одно и то же.
  • Некоторые операции используют вызовы ОС (например, блокирующие/циклические барьеры), они часто более дороги в сценарии с низкой задержкой, чем ожидание. Я предлагаю попробовать заняться ждать вашего продюсера, а не использовать CyclicBarrier. Вы могли бы оживить своего потребителя, но это может быть неоправданно дорогостоящим в реальной системе.
+0

Благодарим вас за ответ. Я понимаю, что большую часть времени тратится на пробуждение потребительского потока, но я думал, что контекстный коммутатор в Linux будет намного дешевле, чем цифры, которые я получаю. Я не уверен, полностью ли понимаю ваш второй момент - CyclicBarrier используется только один раз здесь (не обязательно), а не на каждой итерации, когда отправляется новая метка времени. – Johan

1

@Peter Lawrey

Некоторые операции используют вызовы операционной системы (например, блокировки/циклических барьеров)

Это не ОС (ядро) вызывает. Реализовано через простой CAS (который на x86 приходит с защитой свободной памяти)

Еще один: не используйте ArrayBlockingQueue, если вы не знаете, почему (вы его используете).

@OP: Посмотрите на ThreadPoolExecutor, он предлагает отличную структуру производителя/потребителя.

Edit ниже:

уменьшить задержку (обнажая активное ожидание), измените очереди на SynchronousQueue добавить следующее, как перед запуском потребителя

... 
consumerThread.setPriority(Thread.MAX_PRIORITY); 
consumerThread.start(); 

Это лучшее, что вы можете получить.


Edit2: Вот ж/синхронизации. очередь. И не распечатывать результаты.

package t1; 

import java.math.BigDecimal; 
import java.util.concurrent.CountDownLatch; 
import java.util.concurrent.SynchronousQueue; 

public class QueueTest { 

    static final int RUNS = 250000; 

    final SynchronousQueue<Long> queue = new SynchronousQueue<Long>(); 

    int sleep = 1000; 

    long[] results = new long[0]; 
    public void start(final int runs) throws Exception { 
     results = new long[runs]; 
     final CountDownLatch barrier = new CountDownLatch(1); 
     Thread consumerThread = new Thread(new Runnable() { 
      @Override 
      public void run() { 
       barrier.countDown(); 
       try { 

        for(int i = 0; i < runs; i++) {       
         results[i] = consume(); 

        } 
       } catch (Exception e) { 
        return; 
       } 
      } 
     }); 
     consumerThread.setPriority(Thread.MAX_PRIORITY); 
     consumerThread.start(); 


     barrier.await(); 
     final long sleep = this.sleep; 
     for(int i = 0; i < runs; i++) { 
      try {     
       doProduce(sleep); 

      } catch (Exception e) { 
       return; 
      } 
     } 
    } 

    private void doProduce(final long sleep) throws InterruptedException { 
     produce(); 
    } 

    public void produce() throws InterruptedException { 
     queue.put(new Long(System.nanoTime()));//new Long() is faster than value of 
    } 

    public long consume() throws InterruptedException { 
     long t = queue.take(); 
     long now = System.nanoTime(); 
     return now-t; 
    } 

    public static void main(String[] args) throws Throwable {   
     QueueTest test = new QueueTest(); 
     System.out.println("Starting + warming up..."); 
     // Run first once, ignoring results 
     test.sleep = 0; 
     test.start(15000);//10k is the normal warm-up for -server hotspot 
     // Run again, printing the results 
     System.gc(); 
     System.out.println("Starting again..."); 
     test.sleep = 1000;//ignored now 
     Thread.yield(); 
     test.start(RUNS); 
     long sum = 0; 
     for (long elapsed: test.results){ 
      sum+=elapsed; 
     } 
     BigDecimal elapsed = BigDecimal.valueOf(sum, 3).divide(BigDecimal.valueOf(test.results.length), BigDecimal.ROUND_HALF_UP);   
     System.out.printf("Avg: %1.3f micros%n", elapsed); 
    } 
} 
6

Ваш тест не является хорошей мерой очереди передачи обслуживания задержки, потому что у вас есть один поток чтения из очереди, которая записывает синхронно System.out (делает строку и долго конкатенацию в то время как она на него), прежде чем он принимает снова , Чтобы правильно измерить это, вам нужно переместить эту активность из этой нити и сделать как можно меньше работы в принимающей нитке.

Вам бы лучше просто выполнить расчет (тогда-сейчас) в берущем и добавить результат в какую-либо другую коллекцию, которая периодически сливается другим потоком, который выводит результаты. Я имею тенденцию делать это, добавляя к соответствующей структуре, поддерживаемой соответствующим массивом, доступ к которой осуществляется через AtomicReference (поэтому поток отчетности просто должен getAndSet в этой ссылке с другим экземпляром этой структуры хранения, чтобы захватить последнюю партию результатов, например, сделать 2 списки, устанавливают один как активный, каждый поток xsa просыпается и меняет активный и пассивный). Затем вы можете сообщить о некотором распределении, а не о каждом отдельном результате (например, в децильном диапазоне), что означает, что вы не генерируете огромные файлы журналов при каждом запуске и получаете полезную информацию, напечатанную для вас.

FWIW Я согласен со временем Питера Lawrey указанных &, если задержка действительно критическая, то вы должны думать о занятости ждет с соответствующей аффинностью центрального процессора (т.е. выделить ядро ​​в эту тему)

EDIT после 6 января

Если удалить вызов Thread.sleep() и вместо того, чтобы позволить как производитель и потребитель вызова barrier.await() в каждой итерации (потребитель называет это после того, как напечатали истекшее время на консоль), измеренное значение l уменьшается с 60 микросекунд до менее 10 микросекунд. При запуске потоков на одном и том же ядре латентность становится ниже 1 микросекунды. Может ли кто-нибудь объяснить, почему это значительно сократило латентность?

Вы смотрите на разницу между java.util.concurrent.locks.LockSupport#park (и соответствующей unpark) и Thread#sleep. Большинство j.u.c. материал построен на LockSupport (часто с помощью AbstractQueuedSynchronizer, который ReentrantLock предоставляет или напрямую), и этот (в Точке) разрешает до sun.misc.Unsafe#parkunpark), и это, как правило, оказывается в руках pthread (posix threads) lib. Обычно pthread_cond_broadcast для пробуждения и pthread_cond_wait или pthread_cond_timedwait для таких вещей, как BlockingQueue#take.

Я не могу сказать, что я когда-либо смотрел, как фактически реализуется Thread#sleep (потому что я никогда не сталкивался с какой-то низкой задержкой, которая не является условием ожидания), но я бы предположил, что это заставляет пониженный в должности по графику более агрессивным способом, чем механизм сигнализации pthread, и именно это объясняет разницу в задержках.

+0

Благодарим вас за ввод. В этом конкретном тесте я не думаю, что синхронная запись в System.out должна быть проблемой, хотя, поскольку у меня есть продюсерский поток, ожидающий в течение 2 секунд, прежде чем он помещает новую временную метку в очередь .... если я не пропущу что-то здесь? Ваше решение для протоколирования временных меток с двумя списками и AtomicReferences звучит как отличный способ регистрировать задержки в моем «реальном» приложении. – Johan

+0

Да, честная точка, я пропустил сон перед продуктом. Время тогда преобладает время пробуждения. Вы должны уметь это видеть, добавляя возможность варьировать скорость, с которой вы предлагаете временные метки в очереди, латентность будет уменьшаться по мере увеличения ставки предложения, если на поле не появится что-то странное. – Matt

+0

@Matt: Ваши точки очень хорошие. Но тем не менее, почему один и тот же код сильно варьируется между Windows и Linux, все еще необъяснен. – Cratylus

0

Если латентность имеет решающее значение и вам не нужна строгая семантика FIFO, тогда вы можете рассмотреть LinkedTransferQueue JSR-166. Он позволяет исключить, чтобы противоположные операции могли обменивать значения вместо синхронизации в структуре данных очереди. Такой подход помогает сократить конкуренцию, обеспечивает параллельный обмен и позволяет избежать штрафов за сон/пробуждение нити.

+0

Спасибо, я посмотрю LinkedTransferQueue и посмотрю, подходит ли он для моего приложения. – Johan