2

Те, кто знаком с lmax ring buffer (disruptor), знают, что одним из самых больших преимуществ этой структуры данных является то, что он включает в себя различные события и когда у нас есть потребитель, который может воспользоваться пакетом, что делает систему автоматически регулируемой к нагрузке, чем больше событий вы бросаете на нее, тем лучше.Наблюдаемый пакет, такой как Lmax Disruptor

Интересно, не удалось ли достичь такого же эффекта с наблюдаемым (нацеленным на функцию дозирования). Я опробовал Observable.buffer, но это совсем другое, буфер будет ждать и не будет выпускать пакет, пока ожидаемое количество событий не поступит. то, что мы хотим, совсем другое.

, указанный подписчиком ожидает пакет от Observable<Collection<Event>>, как только абонент заканчивает выполнение, он получает следующую партию с таким количеством событий, которое прибыло с момента начала последней обработки ...

Таким образом, если наш абонент достаточно быстр, чтобы обрабатывать одно событие за раз, он будет сделайте это, если загрузка будет выше, она будет по-прежнему иметь одинаковую частоту обработки, но больше событий каждый раз (таким образом, решение проблемы противодавления) ... в отличие от буфера, который будет придерживаться и ждать, пока пакет не заполнится.

Любые предложения? или я иду с кольцевым буфером?

+1

Я считаю, что этот оператор называется 'bufferIntrospective', см. Http://stackoverflow.com/questions/28880247/buffer-while-processing-items –

ответ

5

RxJava and Disruptor представляют собой два разных подхода к программированию.

У меня нет опыта с Disruptor, но на основе видеообговоров это в основном большой буфер, в котором производитель выделяет данные, такие как firehose и consumer spin/yield/block, пока не будут доступны данные.

RxJava, с другой стороны, имеет целью неблокирующую доставку событий. У нас тоже есть ringbuffers, особенно в watchOn, который действует как асинхронная граница между производителями и потребителями, но они намного меньше, и мы избегаем переполнения буфера и разбухания буфера, применяя подход со-подпрограмм. Совместные подпрограммы сводятся к обратным вызовам, отправленным на ваши обратные вызовы, поэтому вы можете переадресовать наши обратные вызовы, чтобы отправить вам некоторые данные в своем темпе. Частота таких запросов определяет скорость.

Есть источники данных, которые не поддерживают такую ​​совместную поточную передачу, и требуют одного из операторов onBackpressureXXX, которые будут иметь значения буфера/падения, если нисходящий поток не запрашивает достаточно быстро.

Если вы считаете, что вы можете обрабатывать данные партиями более эффективно, чем один за другим, вы можете использовать оператор buffer, который имеет перегрузки, чтобы указать продолжительность времени для буферов: вы можете иметь, например, 10 мс данных, независимо от того, сколько значений поступит в эту продолжительность.

Управление размером партии по частоте запроса является сложным и может иметь непредвиденные последствия. Проблема, как правило, в том, что если вы используете request(n) из исходного пакета, вы указываете, что вы можете обрабатывать n элементов, но теперь источник должен создавать n буферов размера 1 (поскольку тип Observable<List<T>>). Напротив, если запрос не вызывается, оператор буферизует данные, приводящие к увеличению количества буферов. Эти поведения приводят к дополнительным накладным расходам при обработке, если вы действительно можете идти в ногу с собой и также должны превращать холодный источник в огнетушитель (потому что в противном случае у вас есть по существу buffer(1)), который сам по себе теперь может привести к раздуванию буфера.

Смежные вопросы