2017-02-08 8 views
1

Я пытаюсь использовать Observable.Interval в сочетании с выбором для опроса службы, это похоже на синтаксически хороший способ сделать это. Однако всякий раз, когда я пытаюсь реализовать способ дождаться завершения наблюдения, я получаю странное поведение, когда вызов, вызываемый внутри select, вызывается несколько раз.Observable.Interval с выбором для состояния службы вызывает странное поведение

без ожидания я получаю правильное поведение Я ищу ...

кодекса

private static ConcurrentQueue<string> _data = new ConcurrentQueue<string>(new [] { "a", "b", "c", "d" }); 

static void Main(string[] args) 
{ 
    var observable = Observable 
     .Interval(TimeSpan.FromSeconds(2)) 
     .Select(Transform) 
     .TakeWhile(x => x != null); 

    Console.WriteLine("Starting subcription"); 
    var disposable = observable.Subscribe(x => Console.WriteLine("Event raised for {0}", x)); 

    Console.WriteLine("Waiting for subcription to complete"); 
    // need to wait here 

    Console.WriteLine("Press any key to exit. . ."); 
    Console.ReadKey(); 
} 

private static string Transform(long x) 
{ 
    string result; 
    _data.TryDequeue(out result); 

    Console.WriteLine("Transform invoked [x: {0}, Result: {1}]", x, result ?? "NULL"); 

    return result; 
} 

Выходной

Starting subcription 
Waiting for subcription to complete 
Press any key to exit. . . 
Transform invoked [x: 0, Result: a] 
Event raised for a 
Transform invoked [x: 1, Result: b] 
Event raised for b 
Transform invoked [x: 2, Result: c] 
Event raised for c 
Transform invoked [x: 3, Result: d] 
Event raised for d 
Transform invoked [x: 4, Result: NULL] 

Если я вызываю метод расширения ожидания по наблюдаемому, похоже, вызывает преобразование Transform дважды за интервал и только один o е значения возвращается к событию ...

Код

private static ConcurrentQueue<string> _data = new ConcurrentQueue<string>(new [] { "a", "b", "c", "d" }); 

static void Main(string[] args) 
{ 
    var observable = Observable 
     .Interval(TimeSpan.FromSeconds(2)) 
     .Select(Transform) 
     .TakeWhile(x => x != null); 

    Console.WriteLine("Starting subcription"); 
    var disposable = observable.Subscribe(x => Console.WriteLine("Event raised for {0}", x)); 

    Console.WriteLine("Waiting for subcription to complete"); 
    observable.Wait(); 

    Console.WriteLine("Press any key to exit. . ."); 
    Console.ReadKey(); 
} 

private static string Transform(long x) 
{ 
    string result; 
    _data.TryDequeue(out result); 

    Console.WriteLine("Transform invoked [x: {0}, Result: {1}]", x, result ?? "NULL"); 

    return result; 
} 

Выход

Starting subcription 
Waiting for subcription to complete 
Transform invoked [x: 0, Result: a] 
Event raised for a 
Transform invoked [x: 0, Result: b] 
Transform invoked [x: 1, Result: c] 
Event raised for c 
Transform invoked [x: 1, Result: d] 
Transform invoked [x: 2, Result: NULL] 
Transform invoked [x: 2, Result: NULL] 
Press any key to exit. . . 

Я подозреваю, что это происходит потому, что Wait создает вторую подписку за кулисами и служба, стоящая за моим наблюдаемым, является состоятельной.

Я видел, как люди рекомендуют использовать ToTask, чтобы дождаться завершения наблюдаемого, это имеет такое же странное поведение.

Итак, каков правильный способ опроса, поддерживающего состояние, за наблюдаемым, в то время как все подписчики получают один и тот же набор данных?

ответ

2

Пара вещей:

  1. Рекомендуется, чтобы на изменения в состоянии произойти только во время Subscribe, а не другие операторы, как Select. Хотя я не уверен, что это практично для вашего примера.
  2. Ваш наблюдаемый номер cold observable, что означает, что каждая подписка воссоздает все: новый таймер в фоновом режиме, отмечающий каждую секунду, новый оператор Select и т. Д.У вас есть две подписки: одна от Subscribe другая от Wait, поэтому у вас будет два таймера с двумя операторами Select, приложенными к телефону Transform.

Вы можете исправить это одним из двух способов:

  1. Превратите ваши наблюдаемый в горячие наблюдаемый
  2. Устранить подписку (решение намеченной @Daniel Вебер)

Ваших наблюдаемый в качестве наблюдаемого в горячем состоянии, будет выглядеть так:

var observable = Observable 
    .Interval(TimeSpan.FromSeconds(2)) 
    .Select(Transform) 
    .TakeWhile(x => x != null) 
    .Publish() 
    .RefCount(); 

Мое предложение состояло в том, что, поскольку вы меняете состояние в своем наблюдаемом, я бы сделал его горячим, чтобы убедиться, что вы не закончили работать, что дважды.

+0

То же самое происходит и с горячими наблюдаемыми; каждая новая подписка создает новый конвейер. Неважно, холодно или нет. – Enigmativity

1

Убедитесь, что вы только подписаны один раз на наблюдаемые. Опустите первый звонок до Subscribe и просто оставьте вызов Wait. Если вы все еще хотите, чтобы испустить некоторые сообщения журнала (как вы сделали в вашей подписке), добавьте Do -шаговые:

private static ConcurrentQueue<string> _data = new ConcurrentQueue<string>(new [] { "a", "b", "c", "d" }); 

static void Main(string[] args) 
{ 
    var observable = Observable 
     .Interval(TimeSpan.FromSeconds(2)) 
     .Select(Transform) 
     .TakeWhile(x => x != null); 

    Console.WriteLine("Wait for the observable to complete."); 
    observable 
     .Do(x => Console.WriteLine("Event raised for {0}", x)) 
     .Wait(); 

    Console.WriteLine("Press any key to exit. . ."); 
    Console.ReadKey(); 
} 

private static string Transform(long x) 
{ 
    string result; 
    _data.TryDequeue(out result); 

    Console.WriteLine("Transform invoked [x: {0}, Result: {1}]", x, result ?? "NULL"); 

    return result; 
} 

Обратите внимание, что Wait будет блокировать (что неизбежно в методе Main). Кроме того, он будет бросать, когда ваш наблюдаемый пуст. Если вас не интересуют какие-либо значения наблюдаемого, добавьте LastOrDefault -ступенчатый.

Ожидание наблюдаемого является неотъемлемой асинхронной операцией, поэтому вы должны проверить, можно ли использовать ToTask вместо Wait и ждать его в асинхронном режиме.

+0

Я заметил, что создание видимого IConnectableObservable с помощью метода расширения Replay() решило проблему. Будет ли повторное воспроизведение более желательным, чтобы вы могли вернуть этот IObservable нескольким потребителям и все-таки позволить им подписаться. Делайте это только как хорошее решение, когда вы делаете что-то самодостаточное, как в этом примере, хотя вряд ли это будет так, как это происходит. – KrisG

+0

Конечно, 'Replay' (вероятно, вместе с' RefCount') делает наблюдаемым «горячим» наблюдаемым и позволяет избежать повторных вычислений. Это полезно, если у вас действительно есть несколько подписчиков. Обратите внимание, однако, что, хотя вы можете наблюдать за завершением в одной подписке, еще одна подписка может по-прежнему работать. Поэтому будьте осторожны, чтобы не разорвать ваше приложение слишком рано. –

Смежные вопросы