Это вопрос новичка. Я передаю результаты нескольких процессов обратно клиенту, и я хочу сохранить каждый поток в отдельный файл.Реконструкция вложенных наблюдений
Так что я хочу (большая буква указывает на конец вложенного потока)
S: -a-a-a-b-b-a-A-c-b-B-c-c-...-d--C...D-e--...-z-E-z--z--...
R: a-a-a-----a-A (complete)
b-b-------b-B (complete)
c-----c-c--------C (complete)
d------D (complete)
e--------E (complete)
.
.
.
(end many more nested streams coming)
.
Так что я хочу что-то вроде динамического завода наблюдаемыми. Подобно using()
, но, как я понимаю, using()
создает Observables, которые существуют до тех пор, пока оригинал Observable, тогда как я хочу завершить и закрыть файл каждый раз, когда вложенный поток завершается.
ВАЖНО - я не хочу буферизировать в памяти, так как это очень длинные потоки (вывод длинных процессов). Поэтому я хотел бы избежать buffer()
, groupBy()
.
Спасибо за быстрый ответ. Я смотрю на вторую часть (потому что я действительно не хочу использовать 'groupBy()'). Это похоже на работу, но это, похоже, не является реактивным решением. Это всего лишь слушатель, который отправляет данные на основе своего частного хэшмапа. Так что это стандартный императивный способ. Есть ли способ решить проблему чистым реактивным способом? –
Вариант № 1 более реактивный, и если вы можете получить доступ к реактивной библиотеке файлов, вы также можете выполнить полную асинхронную операцию сохранения. В противном случае, не бойтесь смешивать две концепции, если это более эффективно решает вашу проблему. – akarnokd
Это не страх. Просто я хотел бы узнать, как написать правильный реактивный код. И так же, как я не пытаюсь встроить ассемблер при написании Java-кода (не уверен, что это возможно), я не хочу вводить нереактивные методы в реактивный код. Вариант № 1 является реактивным без каких-либо сомнений, но он накапливает данные в памяти. Это то, что я не могу сделать, как уже объяснялось в первоначальном вопросе. –