Редактировать: Пожалуйста, см. Комментарий the_joric, если вы собираетесь использовать это. Существует краевой регистр, который не обрабатывается, я не вижу быстрого способа его исправить, и поэтому у меня нет времени исправить его прямо сейчас.
Вот решение в C#, так как у вас есть тег system.reactive
.
static IObservable<int> MergeSorted(IObservable<int> a, IObservable<int> b)
{
var source = Observable.Merge(
a.Select(x => Tuple.Create('a', x)),
b.Select(y => Tuple.Create('b', y)));
return source.Publish(o =>
{
var published_a = o.Where(t => t.Item1 == 'a').Select(t => t.Item2);
var published_b = o.Where(t => t.Item1 == 'b').Select(t => t.Item2);
return Observable.Merge(
published_a.Delay(x => published_b.FirstOrDefaultAsync(y => x <= y)),
published_b.Delay(y => published_a.FirstOrDefaultAsync(x => y <= x)));
});
}
Идея суммирована следующим образом.
Когда a
излучает значение x
, мы задержать его до тех пор, пока b
излучает значение y
таким образом, что x <= y
.
Когда b
излучает значение y
, мы задержать его до тех пор, пока a
излучает значение x
таким образом, что y <= x
.
Если у вас были только горячие наблюдаемые, вы могли бы сделать следующее. Но следующее не будет работать, если в миксе присутствуют какие-либо холодные наблюдаемые. Я бы посоветовал всегда использовать версию, которая работает как для горячих, так и для холодных наблюдаемых.
static IObservable<int> MergeSortedHot(IObservable<int> a, IObservable<int> b)
{
return Observable.Merge(
a.Delay(x => b.FirstOrDefaultAsync(y => x <= y)),
b.Delay(y => a.FirstOrDefaultAsync(x => y <= x)));
}
Очень интересный вопрос, мне нравится. :) –