2015-05-14 4 views
4

Я познакомился с LMAX и этой замечательной концепцией RingBuffer. Итак, ребята говорят, что при написании на ringbuffer только с одной версией потока лучше, чем с несколькими производителями ...Запись одним потоком LMAX

Однако я не нахожу возможным, чтобы типичное приложение использовало только один поток для записи на ringbuffer .. Я действительно не понимаю, как это делает lmax (если они это делают). Например, N число разных трейдеров ставит заказы на обмен, это все асинхронные запросы, которые трансформируются в заказы и помещаются в ringbuffer, как они могут писать те, которые используют один поток?

Вопрос 1. Возможно, я пропустил что-то или не понял какой-то аспект, но если у вас есть N одновременных производителей, как можно объединить их в 1, а не блокировать друг друга?

Вопрос 2. Я помню rxJava наблюдаемые, где вы могли бы взять N наблюдаемых и объединить их в 1, используя Observable.merge. Интересно, блокирует ли он какой-либо замок или поддерживает какой-либо замок?

ответ

2

Влияние на RingBuffer многоступенчатой ​​записи незначительно, но при очень тяжелых нагрузках может быть значительным.

Реализация RingBuffer содержит узел next, где будет добавлено следующее дополнение. Если на кольце записывается только один поток, процесс всегда будет завершен за минимальное время, то есть buffer[head++] = newData.

Чтобы обрабатывать многопоточность, избегая блокировок, вы обычно делаете что-то вроде while (!buffer[head++].compareAndSet(null,newValue)){}. Этот замкнутый цикл продолжит выполнение, в то время как другие потоки будут мешать хранению данных, что замедлит пропускную способность города.

Обратите внимание, что я использовал псевдо-код выше, посмотрите на getFree в моей реализации here для реального примера.

// Find the next free element and mark it not free. 
    private Node<T> getFree() { 
    Node<T> freeNode = head.get(); 
    int skipped = 0; 
    // Stop when we hit the end of the list 
    // ... or we successfully transit a node from free to not-free. 
    // This is the loop that could cause delays under hight thread activity. 
    while (skipped < capacity && !freeNode.free.compareAndSet(true, false)) { 
     skipped += 1; 
     freeNode = freeNode.next; 
    } 
    // ... 
    } 
+0

Так что, используя это для N производителей (где n не так много), будет работать быстрее, чем блокирование доступа на запись? – vach

+0

@vach - Да. Любой неблокирующий алгоритм предпочтительнее одного с использованием блокировок. – OldCurmudgeon

+0

Что я не могу понять, как LMAX удается сделать это в одном потоке? они получают много заказов одновременно, как они получают их в ringbuffer в одном потоке? Любые мысли об этом :)? – vach

2

Внутри объединение RxJava использует сериализацию конструкцию я называю emitter-loop, который использует synchronized и блокирует.

Использование наших клиентов в основном в случаях с пропускной и латентной чувствительностью или полностью однопоточное и блокирующее на самом деле не является проблемой.

Можно написать неблокирующий сериализатор, который я вызываю queue-drain, но слияние не может быть настроено на использование этого.

Вы также можете взглянуть на JCTools 'MpscArrayQueue непосредственно, если вы готовы обрабатывать потоки производителя и потребителя вручную.

Смежные вопросы