2016-07-11 2 views
0

Я начал проект с использованием vertX и RxJava, и у меня есть проблема, для которой я не нашел решение.RxJava Наблюдаемый буфер на основе содержимого

У меня есть наблюдаемый, который испускает WebSocketFrame для входящих сообщений, каждый WebSocketFrame состоит из полезной нагрузки (ByteBuffer) и флагов, которая указывает, что это первый кадр сообщения или последний.

Я хочу сделать операцию над этим Наблюдаемым, чтобы преобразовать ее в Наблюдаемое, которое испускает ByteBufferd, которые содержат весь кадр каждого сообщения.

Я пробовал метод buffer, но, похоже, он предназначен для перегруппировки элементов по произвольным критериям (время или другое наблюдаемое).

Другой способ, по-видимому, использовать compose для подписки на наблюдаемый WebSocketFrame, для добавления в буфер на бесконечном кадре и для «подачи» ByteBuffer Observable на завершающий кадр. Но я не знаю, как создавать и кормить буфер вручную.

Итак, если кто-то уже видел эту проблему (что похоже на IMHO), и у вас достаточно знаний о RxJava для предложения реализации, я был бы очень благодарен.

спасибо, что посмотрели.

ответ

1

Я думаю, вы должны использовать оператора buffer для этого (возможно, вы могли бы сделать это с более простым buffer, но я не уверен в этом). См. Также this other question, который охватывает примерно ту же тему и this GitHub page для более подробного обсуждения. Надеюсь, это поможет вам!

+0

Очень хороший ответ, я реализовал большую часть функциональности, но у меня все еще есть проблема. shutoffObservable сначала уведомляется издателем, в то время как мне нужен элемент, который нужно добавить до того, как выйдет буфер (я не могу дождаться следующего кадра, чтобы испустить предыдущее сообщение) –

+0

После долгих попыток я нашел решение. Проблема возникает из буфера, который подписывается на закрытие наблюдаемого, прежде чем подписаться у производителя. Мое решение состоит в том, чтобы связать '.delaySubscription (1, TimeUnit.NANOSECONDS)' с закрывающимся наблюдаемым, чтобы принудительно подписываться после, и, следовательно, получать уведомление после. –

Смежные вопросы