2016-07-14 3 views
4

У меня здесь простая программа, которая отображает количество букв в разных словах. Он работает так, как ожидалось.Путаница над поведением Publish(). Refcount()

static void Main(string[] args) { 
    var word = new Subject<string>(); 
    var wordPub = word.Publish().RefCount(); 
    var length = word.Select(i => i.Length); 
    var report = 
     wordPub 
     .GroupJoin(length, 
      s => wordPub, 
      s => Observable.Empty<int>(), 
      (w, a) => new { Word = w, Lengths = a }) 
     .SelectMany(i => i.Lengths.Select(j => new { Word = i.Word, Length = j })); 
    report.Subscribe(i => Console.WriteLine($"{i.Word} {i.Length}")); 
    word.OnNext("Apple"); 
    word.OnNext("Banana"); 
    word.OnNext("Cat"); 
    word.OnNext("Donkey"); 
    word.OnNext("Elephant"); 
    word.OnNext("Zebra"); 
    Console.ReadLine(); 
} 

И выход:

Apple 5 
Banana 6 
Cat 3 
Donkey 6 
Elephant 8 
Zebra 5 

Я использовал публикации() RefCount(), потому что "wordpub" включен в "докладе" дважды.. Без этого, когда слово испускается, первая часть отчета будет получать уведомление путем обратного вызова, а затем другая часть отчета будет уведомлена, удвоить уведомления. Это то, что происходит; выход заканчивается тем, что имеет 11 элементов, а не 6. По крайней мере, это то, что я думаю. Я думаю об использовании Publish(). RefCount() в этой ситуации одновременно обновляет обе части отчета.

Однако если изменить функцию длины также использовать опубликованный источник, как это:

var length = wordPub.Select(i => i.Length); 

Тогда выход таков:

Apple 5 
Apple 6 
Banana 6 
Cat 3 
Banana 3 
Cat 6 
Donkey 6 
Elephant 8 
Donkey 8 
Elephant 5 
Zebra 5 

Почему не функция длины также использовать такой же опубликованный источник?

ответ

3

Это было большой вызов решить! Так тонкие условия, что это происходит. Извиняйтесь заранее за длинное объяснение, но несите меня!

TL; DR

Подписка на опубликованный источник обрабатывается в порядке, но перед любой другой подпиской непосредственно к неопубликованному источнику. т. е. вы можете перепрыгнуть в очередь! С подпиской на подписку GroupJoin важно определить, когда открываются и закрываются окна.

Моей первой проблемой было бы то, что вы публикуете вопрос о пересчете предмета. Это должно быть no-op. Subject<T> не имеет стоимости подписки.

Итак, когда вы удалите Publish().RefCount():

var word = new Subject<string>(); 
var wordPub = word;//.Publish().RefCount(); 
var length = word.Select(i => i.Length); 

тогда вы получите тот же вопрос.

Итак, я смотрю на GroupJoin (потому что моя интуиция предполагает, что Publish().Refcount() - это красная сельдь). Для меня это было слишком сложно, чтобы рационализировать, поэтому я опираюсь на простую отладку, которую я использовал десятки раз в год - метод расширения Trace или Log.

public interface ILogger 
{ 
    void Log(string input); 
} 
public class DumpLogger : ILogger 
{ 
    public void Log(string input) 
    { 
     //LinqPad `Dump()` extension method. 
     // Could use Console.Write instead. 
     input.Dump(); 
    } 
} 


public static class ObservableLoggingExtensions 
{ 
    private static int _index = 0; 

    public static IObservable<T> Log<T>(this IObservable<T> source, ILogger logger, string name) 
    { 
     return Observable.Create<T>(o => 
     { 
      var index = Interlocked.Increment(ref _index); 
      var label = $"{index:0000}{name}"; 
      logger.Log($"{label}.Subscribe()"); 
      var disposed = Disposable.Create(() => logger.Log($"{label}.Dispose()")); 
      var subscription = source 
       .Do(
        x => logger.Log($"{label}.OnNext({x.ToString()})"), 
        ex => logger.Log($"{label}.OnError({ex})"), 
        () => logger.Log($"{label}.OnCompleted()") 
       ) 
       .Subscribe(o); 

      return new CompositeDisposable(subscription, disposed); 
     }); 
    } 
} 

Когда я добавить протоколирование в свой соответствующий код выглядит следующим образом:

var logger = new DumpLogger(); 

var word = new Subject<string>(); 
var wordPub = word.Publish().RefCount(); 
var length = word.Select(i => i.Length); 
var report = 
    wordPub.Log(logger, "lhs") 
    .GroupJoin(word.Select(i => i.Length).Log(logger, "rhs"), 
     s => wordPub.Log(logger, "lhsDuration"), 
     s => Observable.Empty<int>().Log(logger, "rhsDuration"), 
     (w, a) => new { Word = w, Lengths = a }) 
    .SelectMany(i => i.Lengths.Select(j => new { Word = i.Word, Length = j })); 
report.Subscribe(i => ($"{i.Word} {i.Length}").Dump("OnNext")); 
word.OnNext("Apple"); 
word.OnNext("Banana"); 
word.OnNext("Cat"); 
word.OnNext("Donkey"); 
word.OnNext("Elephant"); 
word.OnNext("Zebra"); 

Это будет выводить в моем журнале что-то вроде следующего

Вход с Публикация(). RefCount() used

0001lhs.Subscribe()    
0002rhs.Subscribe()    
0001lhs.OnNext(Apple) 
0003lhsDuration.Subscribe()  
0002rhs.OnNext(5) 
0004rhsDuration.Subscribe() 
0004rhsDuration.OnCompleted() 
0004rhsDuration.Dispose() 

    OnNext 
    Apple 5 

0001lhs.OnNext(Banana) 
0005lhsDuration.Subscribe()  
0003lhsDuration.OnNext(Banana) 
0003lhsDuration.Dispose()  
0002rhs.OnNext(6) 
0006rhsDuration.Subscribe() 
0006rhsDuration.OnCompleted() 
0006rhsDuration.Dispose() 

    OnNext 
    Banana 6 
... 

Однако, когда Я отменить использование Publish().RefCount() новый выход журнала выглядит следующим образом:

Вход без Только Тема

0001lhs.Subscribe()     
0002rhs.Subscribe()     
0001lhs.OnNext(Apple) 
0003lhsDuration.Subscribe()   
0002rhs.OnNext(5) 
0004rhsDuration.Subscribe() 
0004rhsDuration.OnCompleted() 
0004rhsDuration.Dispose() 

    OnNext 
    Apple 5 

0001lhs.OnNext(Banana) 
0005lhsDuration.Subscribe()   
0002rhs.OnNext(6) 
0006rhsDuration.Subscribe() 
0006rhsDuration.OnCompleted() 
0006rhsDuration.Dispose() 

    OnNext 
    Apple 6 

    OnNext 
    Banana 6 

0003lhsDuration.OnNext(Banana) 
0003lhsDuration.Dispose() 
... 

Это дает нам некоторое представление о том, однако, когда вопрос действительно становится ясно, когда мы начинаем аннотирования наши журналы с логическим списком подписки.

В исходной (рабочем) коде с RefCount наших аннотации могут выглядеть следующим образом

//word.Subsribers.Add(wordPub) 

0001lhs.Subscribe()    //wordPub.Subsribers.Add(0001lhs) 
0002rhs.Subscribe()    //word.Subsribers.Add(0002rhs) 
0001lhs.OnNext(Apple) 
0003lhsDuration.Subscribe()  //wordPub.Subsribers.Add(0003lhsDuration) 
0002rhs.OnNext(5) 
0004rhsDuration.Subscribe() 
0004rhsDuration.OnCompleted() 
0004rhsDuration.Dispose() 

    OnNext 
    Apple 5 

0001lhs.OnNext(Banana) 
0005lhsDuration.Subscribe()  //wordPub.Subsribers.Add(0005lhsDuration) 
0003lhsDuration.OnNext(Banana) 
0003lhsDuration.Dispose()  //wordPub.Subsribers.Remove(0003lhsDuration) 
0002rhs.OnNext(6) 
0006rhsDuration.Subscribe() 
0006rhsDuration.OnCompleted() 
0006rhsDuration.Dispose() 

    OnNext 
    Banana 6 

Таким образом, в этом примере, когда word.OnNext("Banana"); выполняется цепью наблюдателей связан именно в таком порядке

  1. wordPub
  2. 0002rhs

Однако, wordPub имеет дочерние подписки! Таким образом, реальный список подписки выглядит

  1. wordPub
    1. 0001lhs
    2. 0003lhsDuration
    3. 0005lhsDuration
  2. 0002rhs

Если мы аннотировать Subject только журнал мы видим, где тонкость лежит

0001lhs.Subscribe()     //word.Subsribers.Add(0001lhs) 
0002rhs.Subscribe()     //word.Subsribers.Add(0002rhs) 
0001lhs.OnNext(Apple) 
0003lhsDuration.Subscribe()   //word.Subsribers.Add(0003lhsDuration) 
0002rhs.OnNext(5) 
0004rhsDuration.Subscribe() 
0004rhsDuration.OnCompleted() 
0004rhsDuration.Dispose() 

    OnNext 
    Apple 5 

0001lhs.OnNext(Banana) 
0005lhsDuration.Subscribe()   //word.Subsribers.Add(0005lhsDuration) 
0002rhs.OnNext(6) 
0006rhsDuration.Subscribe() 
0006rhsDuration.OnCompleted() 
0006rhsDuration.Dispose() 

    OnNext 
    Apple 6 

    OnNext 
    Banana 6 

0003lhsDuration.OnNext(Banana) 
0003lhsDuration.Dispose() 

Таким образом, в этом примере, когда word.OnNext("Banana"); выполняется цепочка наблюдателей связана именно в таком порядке

1. 0001lhs 
2. 0002rhs 
3. 0003lhsDuration 
4. 0005lhsDuration 

Как 0003lhsDuration подписка активируется после 0002rhs, она не увидит значение «Банана» для завершения окна, пока не будет отправлено значение rhs, что даст его в открывшемся окне.

Уф

Как @ francezu13k50 указывает на очевидное и простое решение вашей проблемы является просто использовать word.Select(x => new { Word = x, Length = x.Length });, но, как я думаю, что вы дали нам упрощенную версию вашей реальной проблемы (оценили) Я понимаю, почему это не подходит. Однако, поскольку я не знаю, каково ваше реальное проблемное пространство, я не уверен, что предложить вам предоставить решение, за исключением того, что у вас есть один с вашим текущим кодом, и теперь вы должны знать, почему он работает так, как он делает.

+0

Отличный анализ! Я добавляю журнал на свой инструментарий. Реальное проблемное пространство использует rx для моделей просмотра. Есть свойства, которые пользователь вводит в формы, например, адрес электронной почты.Есть расчеты по этим свойствам, например, является ли адрес электронной почты действительным. Я пытаюсь скорректировать входы и выходы. Например, я могу присоединиться к электронной почте и isEmailValid. Его похожая корреляция сложна. Возможно, я включу входы с выходами - isEmailValid будет {Email = string, IsValid = bool}, поэтому корреляция не требуется. – JustinM

+0

Причина, по которой я делаю групповое присоединение: в моем реальном пространстве проблемы у меня есть медленная проверка. Чтобы проверить URL-адрес, ответ начинается с нуля (неизвестно), поэтому я могу обновить интерфейс с помощью «вычисления ...», за которым следует true/false, поэтому я могу обновить пользовательский интерфейс с «хорошим URL-адресом!». или «плохой URL!». SelectMany позволяет мне выравнивать сообщения проверки для отображения в пользовательском интерфейсе. Так что это на самом деле довольно простая проблема, и это печально, что нет простого ответа. мой ответ зависит от Publish(). RefCount(), который, как вы говорите, не нужен. – JustinM

+0

Посмотрите мое решение, связанное с Scan. Я думаю, что это делает то, что я хочу, без непредсказуемости/хитрости GroupJoin. Опять же, я пытаюсь сделать что-то действительно простое - сопоставить вход и соответствующий выход. Я мог бы сделать это методом расширения, поэтому я могу легко его повторно использовать. – JustinM

0

RefCount возвращает Наблюдаемый, который остается подключенным к источнику, если есть хотя бы одна подписка на возвращаемый Наблюдаемый. Когда выбрана последняя подписка, RefCount распоряжается своим подключением к источнику и повторно подключается при создании новой подписки. Возможно, в вашем запросе отчета все подписки на «wordPub» будут удалены до выполнения запроса.

Вместо сложного запроса GroupJoin вы могли бы просто сделать:

var report = word.Select(x => new { Word = x, Length = x.Length }); 

Edit: Изменить запрос отчета к этому, если вы хотите использовать GroupJoin оператор:

var report = 
     wordPub 
     .GroupJoin(length, 
      s => wordPub, 
      s => Observable.Empty<int>(), 
      (w, a) => new { Word = w, Lengths = a }) 
     .SelectMany(i => i.Lengths.FirstAsync().Select(j => new { Word = i.Word, Length = j })); 
+0

Я понимаю, что существует гораздо более простой способ написания моего запроса. Я просто пытаюсь узнать, как объединить отдельные потоки. – JustinM

+0

Проблема заключается в том, что вы проецируете каждую группу, выданную группой GroupJoin, в последовательность длин. Вам нужно разбить последовательность только на текущую длину. – francezu13k50

0

Поскольку GroupJoin, похоже, очень сложно работать, вот еще один подход для корреляции входов и выходов функций.

static void Main(string[] args) { 
    var word = new Subject<string>(); 
    var length = new Subject<int>(); 
    var report = 
     word 
     .CombineLatest(length, (w, l) => new { Word = w, Length = l }) 
     .Scan((a, b) => new { Word = b.Word, Length = a.Word == b.Word ? b.Length : -1 }) 
     .Where(i => i.Length != -1); 
    report.Subscribe(i => Console.WriteLine($"{i.Word} {i.Length}")); 
    word.OnNext("Apple"); length.OnNext(5); 
    word.OnNext("Banana"); 
    word.OnNext("Cat"); length.OnNext(3); 
    word.OnNext("Donkey"); 
    word.OnNext("Elephant"); length.OnNext(8); 
    word.OnNext("Zebra"); length.OnNext(5); 
    Console.ReadLine(); 
} 

Этот подход работает, если каждый вход имеет 0 или более выходов при ограничениях, что (1) выдает только прибывают в том же порядке, как входы и (2) каждый выход соответствует своей последней вход. Это похоже на LeftJoin - каждый элемент в первом списке (слово) соединяется с элементами в правом списке (длине), которые затем поступают, вплоть до выделения другого элемента в первом списке.

0

Пытается использовать обычный Join вместо GroupJoin. Я думал, что проблема заключалась в том, что при создании нового слова существовало условие гонки внутри Соединиться между созданием нового окна и окончанием текущего. Итак, здесь я попытался объяснить это, соединяя каждое слово с нулевым символом, обозначающим конец окна. Не работает, как и в первой версии. Как возможно, что новое окно создается для каждого слова без предыдущего закрытия первого? Полностью смущен.

static void Main(string[] args) { 
    var lgr = new DelegateLogger(Console.WriteLine); 
    var word = new Subject<string>(); 
    var wordDelimited = 
     word 
     .Select(i => Observable.Return<string>(null).StartWith(i)) 
     .SelectMany(i => i); 
    var wordStart = wordDelimited.Where(i => i != null); 
    var wordEnd = wordDelimited.Where(i => i == null); 
    var report = Observable 
     .Join(
      wordStart.Log(lgr, "word"), // starts window 
      wordStart.Select(i => i.Length), 
      s => wordEnd.Log(lgr, "expireWord"), // ends current window 
      s => Observable.Empty<int>(), 
      (l, r) => new { Word = l, Length = r }); 
    report.Subscribe(i => Console.WriteLine($"{i.Word} {i.Length}")); 
    word.OnNext("Apple"); 
    word.OnNext("Banana"); 
    word.OnNext("Cat"); 
    word.OnNext("Zebra"); 
    word.OnNext("Elephant"); 
    word.OnNext("Bear"); 
    Console.ReadLine(); 
} 
Смежные вопросы