Я начал проект с использованием vertX и RxJava, и у меня есть проблема, для которой я не нашел решение.RxJava Наблюдаемый буфер на основе содержимого
У меня есть наблюдаемый, который испускает WebSocketFrame для входящих сообщений, каждый WebSocketFrame состоит из полезной нагрузки (ByteBuffer) и флагов, которая указывает, что это первый кадр сообщения или последний.
Я хочу сделать операцию над этим Наблюдаемым, чтобы преобразовать ее в Наблюдаемое, которое испускает ByteBufferd, которые содержат весь кадр каждого сообщения.
Я пробовал метод buffer
, но, похоже, он предназначен для перегруппировки элементов по произвольным критериям (время или другое наблюдаемое).
Другой способ, по-видимому, использовать compose
для подписки на наблюдаемый WebSocketFrame, для добавления в буфер на бесконечном кадре и для «подачи» ByteBuffer Observable на завершающий кадр. Но я не знаю, как создавать и кормить буфер вручную.
Итак, если кто-то уже видел эту проблему (что похоже на IMHO), и у вас достаточно знаний о RxJava для предложения реализации, я был бы очень благодарен.
спасибо, что посмотрели.
Очень хороший ответ, я реализовал большую часть функциональности, но у меня все еще есть проблема. shutoffObservable сначала уведомляется издателем, в то время как мне нужен элемент, который нужно добавить до того, как выйдет буфер (я не могу дождаться следующего кадра, чтобы испустить предыдущее сообщение) –
После долгих попыток я нашел решение. Проблема возникает из буфера, который подписывается на закрытие наблюдаемого, прежде чем подписаться у производителя. Мое решение состоит в том, чтобы связать '.delaySubscription (1, TimeUnit.NANOSECONDS)' с закрывающимся наблюдаемым, чтобы принудительно подписываться после, и, следовательно, получать уведомление после. –