Я пытаюсь использовать Observable.Interval в сочетании с выбором для опроса службы, это похоже на синтаксически хороший способ сделать это. Однако всякий раз, когда я пытаюсь реализовать способ дождаться завершения наблюдения, я получаю странное поведение, когда вызов, вызываемый внутри select, вызывается несколько раз.Observable.Interval с выбором для состояния службы вызывает странное поведение
без ожидания я получаю правильное поведение Я ищу ...
кодекса
private static ConcurrentQueue<string> _data = new ConcurrentQueue<string>(new [] { "a", "b", "c", "d" });
static void Main(string[] args)
{
var observable = Observable
.Interval(TimeSpan.FromSeconds(2))
.Select(Transform)
.TakeWhile(x => x != null);
Console.WriteLine("Starting subcription");
var disposable = observable.Subscribe(x => Console.WriteLine("Event raised for {0}", x));
Console.WriteLine("Waiting for subcription to complete");
// need to wait here
Console.WriteLine("Press any key to exit. . .");
Console.ReadKey();
}
private static string Transform(long x)
{
string result;
_data.TryDequeue(out result);
Console.WriteLine("Transform invoked [x: {0}, Result: {1}]", x, result ?? "NULL");
return result;
}
Выходной
Starting subcription
Waiting for subcription to complete
Press any key to exit. . .
Transform invoked [x: 0, Result: a]
Event raised for a
Transform invoked [x: 1, Result: b]
Event raised for b
Transform invoked [x: 2, Result: c]
Event raised for c
Transform invoked [x: 3, Result: d]
Event raised for d
Transform invoked [x: 4, Result: NULL]
Если я вызываю метод расширения ожидания по наблюдаемому, похоже, вызывает преобразование Transform дважды за интервал и только один o е значения возвращается к событию ...
Код
private static ConcurrentQueue<string> _data = new ConcurrentQueue<string>(new [] { "a", "b", "c", "d" });
static void Main(string[] args)
{
var observable = Observable
.Interval(TimeSpan.FromSeconds(2))
.Select(Transform)
.TakeWhile(x => x != null);
Console.WriteLine("Starting subcription");
var disposable = observable.Subscribe(x => Console.WriteLine("Event raised for {0}", x));
Console.WriteLine("Waiting for subcription to complete");
observable.Wait();
Console.WriteLine("Press any key to exit. . .");
Console.ReadKey();
}
private static string Transform(long x)
{
string result;
_data.TryDequeue(out result);
Console.WriteLine("Transform invoked [x: {0}, Result: {1}]", x, result ?? "NULL");
return result;
}
Выход
Starting subcription
Waiting for subcription to complete
Transform invoked [x: 0, Result: a]
Event raised for a
Transform invoked [x: 0, Result: b]
Transform invoked [x: 1, Result: c]
Event raised for c
Transform invoked [x: 1, Result: d]
Transform invoked [x: 2, Result: NULL]
Transform invoked [x: 2, Result: NULL]
Press any key to exit. . .
Я подозреваю, что это происходит потому, что Wait создает вторую подписку за кулисами и служба, стоящая за моим наблюдаемым, является состоятельной.
Я видел, как люди рекомендуют использовать ToTask, чтобы дождаться завершения наблюдаемого, это имеет такое же странное поведение.
Итак, каков правильный способ опроса, поддерживающего состояние, за наблюдаемым, в то время как все подписчики получают один и тот же набор данных?
То же самое происходит и с горячими наблюдаемыми; каждая новая подписка создает новый конвейер. Неважно, холодно или нет. – Enigmativity