У вас здесь просто состояние гонки.
Если мы отрываться назад весь код просто
var emitter = new Subject<string>();
emitter.OnNext("one");
emitter.OnCompleted();
var subscription = emitter
.Subscribe(
item => Console.WriteLine(item),
error => Console.WriteLine(error),
() => Console.WriteLine("Complete!")
);
Console.WriteLine("DONE.");
Console.ReadLine();
Мы получаем тот же результат. С помощью Subject<T>
вы не получите никакого поведения кэширования, за исключением уведомления OnCompleted
.
Оператор SubscribeOn
планирует запланировать любую работу по подписке на предоставленный экземпляр IScheduler
. В случае подписки на Subject<T>
почти не нужно выполнять работу. Это почти так же просто, как регистрация обратного вызова в список обратных вызовов.
Планирование работы с NewThreadScheduler
создаст новый поток, а затем создаст внутренний цикл событий для обработки запланированной работы. Это довольно быстро, но требует создания нового потока, EventloopScheduler и выполнения контекстного перехода к новому потоку.
В вашем примере вы планируете уведомления OnNext
и OnCompleted
на TestScheduler
. Вы тогда SubscribeOn
с NewThreadScheduler
. После этого вы начинаете обрабатывать все запланированные работы для экземпляра TestScheduler
. Обработка этих виртуальных запланированных элементов, это просто повторение запланированных элементов, удаление делегата и продвижение виртуальных часов. Это невероятно быстро.
Чтобы быть более конкретным, приведенный ниже код аналогичен тому, что вы написали
var newThreadScheduler = new NewThreadScheduler();
var callbacks = new List<Action<string>>();
newThreadScheduler.Schedule(()=>callbacks.Add(str=>Console.WriteLine(str)));
foreach (var callback in callbacks)
{
callback("one");
}
Console.WriteLine("Done");
Здесь мы просто список обратного вызова действий (назовем их абонентов или наблюдателей). Затем мы асинхронно планируем в новом потоке добавление одного из этих обратных вызовов. И тут же повторили обратные вызовы и отправили строку «один» каждому из них. В результате
Done
NewThreadScheduler
просто не дано достаточно времени, чтобы начать новую нить, планировать действия, а затем выполнить это действие, до того, как основной поток может итерацию по коллекции.
Таким образом, существует несколько рекомендаций, которые, как я думаю, вы не соблюдаете: 1) Избегайте предметов ;-) 2) Не смешивайте пробивки и модульные испытания. Я предполагаю, что наличие TestScheduler
связано с тем, что вы проверяете это. Однако вы можете использовать два экземпляра TestScheduler
, например. фона и переднего плана.
Чтобы быть более полезным, я бы предложил положительное руководство по предложению о том, что вы просто удалите второй планировщик из своего теста. Используйте экземпляр TestScheduler
в вашем SubscribeOn
оператора.
Далее я предлагаю заменить использование объектов + планирование с использованием методов фабричной последовательности наблюдений TestScheduler
, т. Е. CreateColdObservable
. Наконец-то я не знаю, может ли продвигаться к периоду 1-го года, что-то, только используя метод Start
. Я думаю, что это уменьшит шум и использование магического значения 1s.
var testScheduler = new TestScheduler();
var source = testScheduler.CreateColdObservable<string>(
ReactiveTest.OnNext(TimeSpan.FromSeconds(0.1).Ticks, "one"),
ReactiveTest.OnCompleted<string>(TimeSpan.FromSeconds(0.2).Ticks));
var subscription = source.SubscribeOn(testScheduler)
.Subscribe(
item => Console.WriteLine(item),
error => Console.WriteLine(error),
() => Console.WriteLine("Complete!")
);
testScheduler.Start();
Console.WriteLine("DONE.");
Console.ReadLine();
Единственная проблема в настоящее время является то, что SubscribeOn
вызов является довольно излишним.
FYI: Код для NewThreadScheduler
- https://github.com/Reactive-Extensions/Rx.NET/blob/master/Rx.NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/NewThreadScheduler.cs
Я ничего не знаю о реактивная, но глядя на ваш код, вы подписались на 'newThreadScheduler', но все ваши работы и графики срабатывают против' testScheduler'. – TyCobb