2015-12-09 34 views
1

Это вопрос новичка. Я передаю результаты нескольких процессов обратно клиенту, и я хочу сохранить каждый поток в отдельный файл.Реконструкция вложенных наблюдений

Так что я хочу (большая буква указывает на конец вложенного потока)

S: -a-a-a-b-b-a-A-c-b-B-c-c-...-d--C...D-e--...-z-E-z--z--... 

R: a-a-a-----a-A (complete) 
      b-b-------b-B (complete) 
        c-----c-c--------C (complete) 
           d------D (complete) 
             e--------E (complete) 

        . 
        . 
        . 
     (end many more nested streams coming) 
        . 

Так что я хочу что-то вроде динамического завода наблюдаемыми. Подобно using(), но, как я понимаю, using() создает Observables, которые существуют до тех пор, пока оригинал Observable, тогда как я хочу завершить и закрыть файл каждый раз, когда вложенный поток завершается.

ВАЖНО - я не хочу буферизировать в памяти, так как это очень длинные потоки (вывод длинных процессов). Поэтому я хотел бы избежать buffer(), groupBy().

ответ

1

Если вы можете определить, из значения, если оно является окончательным для конкретного письма, вы можете использовать groupBy с takeUntil во внутренних группах (возможно, потребуется несколько примерочных уловов в лямбды):

source 
.groupBy(v -> v.type) 
.flatMap(g -> 
    Observable.using(
     () -> createOutput(g.getKey()), 
     f -> g.takeUntil(v -> v.isEnd).doOnNext(v -> f.write(v))), 
     f -> f.close() 
    ) 
) 
.subscribe(...) 

TakeUntil гарантирует, что groupBy сохраняет только субпотоки, пока мы ожидаем его значений (и если источник заказан правильно, groupBy не будет воссоздавать группу).

Если вы действительно хотите, чтобы избежать groupBy, вы должны вручную отслеживать каждый открытый файл и закрыть их в соответствующие моменты времени:

Observable.using(
    HashMap::new, 
    map -> source 
     .doOnNext(v -> { 
      Output o = map.get(v.type); 
      if (o == null) { 
       o = new Output(v.type); 
       map.put(v.type, o); 
      } 
      o.write(v); 
      if (v.isEnd) { 
       o.close(); 
       map.remove(v.type); 
      } 
     }), 
    map -> map.values().forEach(e -> e.close()) 
) 
.subscribe(...); 
+0

Спасибо за быстрый ответ. Я смотрю на вторую часть (потому что я действительно не хочу использовать 'groupBy()'). Это похоже на работу, но это, похоже, не является реактивным решением. Это всего лишь слушатель, который отправляет данные на основе своего частного хэшмапа. Так что это стандартный императивный способ. Есть ли способ решить проблему чистым реактивным способом? –

+0

Вариант № 1 более реактивный, и если вы можете получить доступ к реактивной библиотеке файлов, вы также можете выполнить полную асинхронную операцию сохранения. В противном случае, не бойтесь смешивать две концепции, если это более эффективно решает вашу проблему. – akarnokd

+0

Это не страх. Просто я хотел бы узнать, как написать правильный реактивный код. И так же, как я не пытаюсь встроить ассемблер при написании Java-кода (не уверен, что это возможно), я не хочу вводить нереактивные методы в реактивный код. Вариант № 1 является реактивным без каких-либо сомнений, но он накапливает данные в памяти. Это то, что я не могу сделать, как уже объяснялось в первоначальном вопросе. –

Смежные вопросы