Это было большой вызов решить! Так тонкие условия, что это происходит. Извиняйтесь заранее за длинное объяснение, но несите меня!
TL; DR
Подписка на опубликованный источник обрабатывается в порядке, но перед любой другой подпиской непосредственно к неопубликованному источнику. т. е. вы можете перепрыгнуть в очередь! С подпиской на подписку GroupJoin
важно определить, когда открываются и закрываются окна.
Моей первой проблемой было бы то, что вы публикуете вопрос о пересчете предмета. Это должно быть no-op. Subject<T>
не имеет стоимости подписки.
Итак, когда вы удалите Publish().RefCount()
:
var word = new Subject<string>();
var wordPub = word;//.Publish().RefCount();
var length = word.Select(i => i.Length);
тогда вы получите тот же вопрос.
Итак, я смотрю на GroupJoin
(потому что моя интуиция предполагает, что Publish().Refcount()
- это красная сельдь). Для меня это было слишком сложно, чтобы рационализировать, поэтому я опираюсь на простую отладку, которую я использовал десятки раз в год - метод расширения Trace
или Log
.
public interface ILogger
{
void Log(string input);
}
public class DumpLogger : ILogger
{
public void Log(string input)
{
//LinqPad `Dump()` extension method.
// Could use Console.Write instead.
input.Dump();
}
}
public static class ObservableLoggingExtensions
{
private static int _index = 0;
public static IObservable<T> Log<T>(this IObservable<T> source, ILogger logger, string name)
{
return Observable.Create<T>(o =>
{
var index = Interlocked.Increment(ref _index);
var label = $"{index:0000}{name}";
logger.Log($"{label}.Subscribe()");
var disposed = Disposable.Create(() => logger.Log($"{label}.Dispose()"));
var subscription = source
.Do(
x => logger.Log($"{label}.OnNext({x.ToString()})"),
ex => logger.Log($"{label}.OnError({ex})"),
() => logger.Log($"{label}.OnCompleted()")
)
.Subscribe(o);
return new CompositeDisposable(subscription, disposed);
});
}
}
Когда я добавить протоколирование в свой соответствующий код выглядит следующим образом:
var logger = new DumpLogger();
var word = new Subject<string>();
var wordPub = word.Publish().RefCount();
var length = word.Select(i => i.Length);
var report =
wordPub.Log(logger, "lhs")
.GroupJoin(word.Select(i => i.Length).Log(logger, "rhs"),
s => wordPub.Log(logger, "lhsDuration"),
s => Observable.Empty<int>().Log(logger, "rhsDuration"),
(w, a) => new { Word = w, Lengths = a })
.SelectMany(i => i.Lengths.Select(j => new { Word = i.Word, Length = j }));
report.Subscribe(i => ($"{i.Word} {i.Length}").Dump("OnNext"));
word.OnNext("Apple");
word.OnNext("Banana");
word.OnNext("Cat");
word.OnNext("Donkey");
word.OnNext("Elephant");
word.OnNext("Zebra");
Это будет выводить в моем журнале что-то вроде следующего
Вход с Публикация(). RefCount() used
0001lhs.Subscribe()
0002rhs.Subscribe()
0001lhs.OnNext(Apple)
0003lhsDuration.Subscribe()
0002rhs.OnNext(5)
0004rhsDuration.Subscribe()
0004rhsDuration.OnCompleted()
0004rhsDuration.Dispose()
OnNext
Apple 5
0001lhs.OnNext(Banana)
0005lhsDuration.Subscribe()
0003lhsDuration.OnNext(Banana)
0003lhsDuration.Dispose()
0002rhs.OnNext(6)
0006rhsDuration.Subscribe()
0006rhsDuration.OnCompleted()
0006rhsDuration.Dispose()
OnNext
Banana 6
...
Однако, когда Я отменить использование Publish().RefCount()
новый выход журнала выглядит следующим образом:
Вход без Только Тема
0001lhs.Subscribe()
0002rhs.Subscribe()
0001lhs.OnNext(Apple)
0003lhsDuration.Subscribe()
0002rhs.OnNext(5)
0004rhsDuration.Subscribe()
0004rhsDuration.OnCompleted()
0004rhsDuration.Dispose()
OnNext
Apple 5
0001lhs.OnNext(Banana)
0005lhsDuration.Subscribe()
0002rhs.OnNext(6)
0006rhsDuration.Subscribe()
0006rhsDuration.OnCompleted()
0006rhsDuration.Dispose()
OnNext
Apple 6
OnNext
Banana 6
0003lhsDuration.OnNext(Banana)
0003lhsDuration.Dispose()
...
Это дает нам некоторое представление о том, однако, когда вопрос действительно становится ясно, когда мы начинаем аннотирования наши журналы с логическим списком подписки.
В исходной (рабочем) коде с RefCount наших аннотации могут выглядеть следующим образом
//word.Subsribers.Add(wordPub)
0001lhs.Subscribe() //wordPub.Subsribers.Add(0001lhs)
0002rhs.Subscribe() //word.Subsribers.Add(0002rhs)
0001lhs.OnNext(Apple)
0003lhsDuration.Subscribe() //wordPub.Subsribers.Add(0003lhsDuration)
0002rhs.OnNext(5)
0004rhsDuration.Subscribe()
0004rhsDuration.OnCompleted()
0004rhsDuration.Dispose()
OnNext
Apple 5
0001lhs.OnNext(Banana)
0005lhsDuration.Subscribe() //wordPub.Subsribers.Add(0005lhsDuration)
0003lhsDuration.OnNext(Banana)
0003lhsDuration.Dispose() //wordPub.Subsribers.Remove(0003lhsDuration)
0002rhs.OnNext(6)
0006rhsDuration.Subscribe()
0006rhsDuration.OnCompleted()
0006rhsDuration.Dispose()
OnNext
Banana 6
Таким образом, в этом примере, когда word.OnNext("Banana");
выполняется цепью наблюдателей связан именно в таком порядке
- wordPub
- 0002rhs
Однако, wordPub имеет дочерние подписки! Таким образом, реальный список подписки выглядит
- wordPub
- 0001lhs
-
0003lhsDuration
- 0005lhsDuration
- 0002rhs
Если мы аннотировать Subject только журнал мы видим, где тонкость лежит
0001lhs.Subscribe() //word.Subsribers.Add(0001lhs)
0002rhs.Subscribe() //word.Subsribers.Add(0002rhs)
0001lhs.OnNext(Apple)
0003lhsDuration.Subscribe() //word.Subsribers.Add(0003lhsDuration)
0002rhs.OnNext(5)
0004rhsDuration.Subscribe()
0004rhsDuration.OnCompleted()
0004rhsDuration.Dispose()
OnNext
Apple 5
0001lhs.OnNext(Banana)
0005lhsDuration.Subscribe() //word.Subsribers.Add(0005lhsDuration)
0002rhs.OnNext(6)
0006rhsDuration.Subscribe()
0006rhsDuration.OnCompleted()
0006rhsDuration.Dispose()
OnNext
Apple 6
OnNext
Banana 6
0003lhsDuration.OnNext(Banana)
0003lhsDuration.Dispose()
Таким образом, в этом примере, когда word.OnNext("Banana");
выполняется цепочка наблюдателей связана именно в таком порядке
1. 0001lhs
2. 0002rhs
3. 0003lhsDuration
4. 0005lhsDuration
Как 0003lhsDuration
подписка активируется после 0002rhs
, она не увидит значение «Банана» для завершения окна, пока не будет отправлено значение rhs, что даст его в открывшемся окне.
Уф
Как @ francezu13k50 указывает на очевидное и простое решение вашей проблемы является просто использовать word.Select(x => new { Word = x, Length = x.Length });
, но, как я думаю, что вы дали нам упрощенную версию вашей реальной проблемы (оценили) Я понимаю, почему это не подходит. Однако, поскольку я не знаю, каково ваше реальное проблемное пространство, я не уверен, что предложить вам предоставить решение, за исключением того, что у вас есть один с вашим текущим кодом, и теперь вы должны знать, почему он работает так, как он делает.
Отличный анализ! Я добавляю журнал на свой инструментарий. Реальное проблемное пространство использует rx для моделей просмотра. Есть свойства, которые пользователь вводит в формы, например, адрес электронной почты.Есть расчеты по этим свойствам, например, является ли адрес электронной почты действительным. Я пытаюсь скорректировать входы и выходы. Например, я могу присоединиться к электронной почте и isEmailValid. Его похожая корреляция сложна. Возможно, я включу входы с выходами - isEmailValid будет {Email = string, IsValid = bool}, поэтому корреляция не требуется. – JustinM
Причина, по которой я делаю групповое присоединение: в моем реальном пространстве проблемы у меня есть медленная проверка. Чтобы проверить URL-адрес, ответ начинается с нуля (неизвестно), поэтому я могу обновить интерфейс с помощью «вычисления ...», за которым следует true/false, поэтому я могу обновить пользовательский интерфейс с «хорошим URL-адресом!». или «плохой URL!». SelectMany позволяет мне выравнивать сообщения проверки для отображения в пользовательском интерфейсе. Так что это на самом деле довольно простая проблема, и это печально, что нет простого ответа. мой ответ зависит от Publish(). RefCount(), который, как вы говорите, не нужен. – JustinM
Посмотрите мое решение, связанное с Scan. Я думаю, что это делает то, что я хочу, без непредсказуемости/хитрости GroupJoin. Опять же, я пытаюсь сделать что-то действительно простое - сопоставить вход и соответствующий выход. Я мог бы сделать это методом расширения, поэтому я могу легко его повторно использовать. – JustinM