2015-06-03 3 views
13

Дано:Как объединить две сортированные наблюдаемые в одну сортировку Observable?

Integer[] arr1 = {1, 5, 9, 17}; 
Integer[] arr2 = {1, 2, 3, 6, 7, 12, 15}; 
Observable<Integer> o1 = Observable.from(arr1); 
Observable<Integer> o2 = Observable.from(arr2); 

Как получить Observable, который содержит 1, 1, 2, 3, 5, 6, 7, 9, 12, 15, 17?

+1

Очень интересный вопрос, мне нравится. :) –

ответ

4

Редактировать: Пожалуйста, см. Комментарий the_joric, если вы собираетесь использовать это. Существует краевой регистр, который не обрабатывается, я не вижу быстрого способа его исправить, и поэтому у меня нет времени исправить его прямо сейчас.

Вот решение в C#, так как у вас есть тег system.reactive.

static IObservable<int> MergeSorted(IObservable<int> a, IObservable<int> b) 
{ 
    var source = Observable.Merge(
     a.Select(x => Tuple.Create('a', x)), 
     b.Select(y => Tuple.Create('b', y))); 
    return source.Publish(o => 
    { 
     var published_a = o.Where(t => t.Item1 == 'a').Select(t => t.Item2); 
     var published_b = o.Where(t => t.Item1 == 'b').Select(t => t.Item2); 
     return Observable.Merge(
      published_a.Delay(x => published_b.FirstOrDefaultAsync(y => x <= y)), 
      published_b.Delay(y => published_a.FirstOrDefaultAsync(x => y <= x))); 
    }); 
} 

Идея суммирована следующим образом.

  • Когда a излучает значение x, мы задержать его до тех пор, пока b излучает значение y таким образом, что x <= y.

  • Когда b излучает значение y, мы задержать его до тех пор, пока a излучает значение x таким образом, что y <= x.

Если у вас были только горячие наблюдаемые, вы могли бы сделать следующее. Но следующее не будет работать, если в миксе присутствуют какие-либо холодные наблюдаемые. Я бы посоветовал всегда использовать версию, которая работает как для горячих, так и для холодных наблюдаемых.

static IObservable<int> MergeSortedHot(IObservable<int> a, IObservable<int> b) 
{ 
    return Observable.Merge(
     a.Delay(x => b.FirstOrDefaultAsync(y => x <= y)), 
     b.Delay(y => a.FirstOrDefaultAsync(x => y <= x))); 
} 
+1

Кажется, что это не работает, когда создается одна из наблюдаемых с помощью 'Observable.Create'. Я привел пример к gist: https://gist.github.com/skalinets/89f21662a619f685bd6a –

+1

@the_joric Да, похоже, что он не работает, когда один наблюдаемый заканчивается, прежде чем другой будет подписан. У меня нет времени исправить это сейчас, поэтому я просто хочу указать на ваш комментарий. –

+0

Не знал об этой перегрузке 'Delay', спасибо! – ionoy

3

Вы можете объединить, сортировать и придавить последовательности, но она будет иметь значительные накладные расходы:

o1.mergeWith(o2).toSortedList().flatMapIterable(v -> v).subscribe(...) 

или

o1.concatWith(o2).toSortedList().flatMapIterable(v -> v).subscribe(...) 

В противном случае, вам нужно написать достаточно сложный оператор ,

Редактировать 04/06/2015:

Here is оператор, который делает это сортируется-слияние более эффективно.

1

Это было обсуждено некоторое время назад на RxJava mailing list, вы найдете ссылки на возможные решения в этом потоке.

3

Я также искал решение для сортировки слияния, которое поддерживает противодавление и не может найти его. Поэтому я решил реализовать его самостоятельно самостоятельно на основе существующего оператора zip.

Подобно молнии, отсортированный оператор слияния собирает элемент из каждый источник наблюдаемой первой, но затем помещает их в приоритетной очереди, из которой излучает их один за другим в соответствии с их естественным порядком или указанного компаратора.

Вы можете захватить его с GitHub, как готовые к использованию библиотеки или просто скопировать/вставить код:

https://github.com/ybayk/rxjava-recipes

модульных тестов Смотрите использования.

+0

В вашем readme говорится: «Вы можете объединить сортировку очень больших или бесконечных отсортированных последовательностей». Я думаю, что это главное преимущество перед другими решениями. Вы должны, вероятно, подчеркнуть это. – Luciano

0

Как насчет слияния и сортировки?

@Test 
public void testMergeChains() { 
    Observable.merge(Observable.from(Arrays.asList(1, 2, 13, 11, 5)), Observable.from(Arrays.asList(10, 4, 12, 3, 14, 15))) 
       .collect(ArrayList<Integer>::new, ArrayList::add) 
      .doOnNext(Collections::sort) 
      .subscribe(System.out::println); 

} 

Вы можете увидеть больше примеров здесь

https://github.com/politrons/reactive

+0

Это решение работает, но, к сожалению, оно неэффективно в случае огромного количества данных: оно требует, чтобы все потоки потреблялись и сохранялись в памяти из-за sort(). – Christophe