2016-12-16 3 views
3

Я преподаю себе реактивное программирование, пробираясь сквозь случайные проблемы и задавая глупые вопросы новичков без стыда. Пока выясняя, как работает планирование потоков, мне удалось пнуть себя. Хотя я уверен, что этот код не имеет логического смысла, я также не могу понять, что происходит. Полагая, что это, вероятно, поможет мне. Вот код:Что такое SubscribeOn() здесь?

var testScheduler = new TestScheduler(); 
var newThreadScheduler = new NewThreadScheduler(); 

var emitter = new Subject<string>(); 
testScheduler.Schedule(TimeSpan.FromSeconds(0.1),() => emitter.OnNext("one")); 
testScheduler.Schedule(TimeSpan.FromSeconds(0.2),() => emitter.OnCompleted()); 

var subscription = emitter.SubscribeOn(newThreadScheduler) 
          .Subscribe(
           item => Console.WriteLine(item), 
           error => Console.WriteLine(error), 
           () => Console.WriteLine("Complete!") 
          ); 

testScheduler.AdvanceBy(TimeSpan.FromSeconds(1).Ticks); 

Console.WriteLine("DONE."); 
Console.ReadLine(); 

Что я ожидал было возможно:

one  
DONE. 
Complete! 

с возможным чередованием, как я был не совсем уверен, что SubscribeOn() будет делать. Вместо этого я получил:

DONE. 
Complete! 

Что именно происходит здесь? Почему товар не появился до завершения? ObserveOn() работает так, как я ожидал в этом случае, и я понимаю, почему: он запускает делегаты на каком-то другом потоке, и они могут чередоваться с «DONE». Итак, что именно делает программа SubscribeOn()?

+0

Я ничего не знаю о реактивная, но глядя на ваш код, вы подписались на 'newThreadScheduler', но все ваши работы и графики срабатывают против' testScheduler'. – TyCobb

ответ

2

У вас здесь просто состояние гонки.

Если мы отрываться назад весь код просто

var emitter = new Subject<string>(); 
emitter.OnNext("one"); 
emitter.OnCompleted(); 

var subscription = emitter 
          .Subscribe(
           item => Console.WriteLine(item), 
           error => Console.WriteLine(error), 
           () => Console.WriteLine("Complete!") 
          ); 



Console.WriteLine("DONE."); 
Console.ReadLine(); 

Мы получаем тот же результат. С помощью Subject<T> вы не получите никакого поведения кэширования, за исключением уведомления OnCompleted.

Оператор SubscribeOn планирует запланировать любую работу по подписке на предоставленный экземпляр IScheduler. В случае подписки на Subject<T> почти не нужно выполнять работу. Это почти так же просто, как регистрация обратного вызова в список обратных вызовов.

Планирование работы с NewThreadScheduler создаст новый поток, а затем создаст внутренний цикл событий для обработки запланированной работы. Это довольно быстро, но требует создания нового потока, EventloopScheduler и выполнения контекстного перехода к новому потоку.

В вашем примере вы планируете уведомления OnNext и OnCompleted на TestScheduler. Вы тогда SubscribeOn с NewThreadScheduler. После этого вы начинаете обрабатывать все запланированные работы для экземпляра TestScheduler. Обработка этих виртуальных запланированных элементов, это просто повторение запланированных элементов, удаление делегата и продвижение виртуальных часов. Это невероятно быстро.

Чтобы быть более конкретным, приведенный ниже код аналогичен тому, что вы написали

var newThreadScheduler = new NewThreadScheduler(); 

var callbacks = new List<Action<string>>(); 
newThreadScheduler.Schedule(()=>callbacks.Add(str=>Console.WriteLine(str))); 

foreach (var callback in callbacks) 
{ 
    callback("one"); 
} 

Console.WriteLine("Done"); 

Здесь мы просто список обратного вызова действий (назовем их абонентов или наблюдателей). Затем мы асинхронно планируем в новом потоке добавление одного из этих обратных вызовов. И тут же повторили обратные вызовы и отправили строку «один» каждому из них. В результате

Done 

NewThreadScheduler просто не дано достаточно времени, чтобы начать новую нить, планировать действия, а затем выполнить это действие, до того, как основной поток может итерацию по коллекции.

Таким образом, существует несколько рекомендаций, которые, как я думаю, вы не соблюдаете: 1) Избегайте предметов ;-) 2) Не смешивайте пробивки и модульные испытания. Я предполагаю, что наличие TestScheduler связано с тем, что вы проверяете это. Однако вы можете использовать два экземпляра TestScheduler, например. фона и переднего плана.

Чтобы быть более полезным, я бы предложил положительное руководство по предложению о том, что вы просто удалите второй планировщик из своего теста. Используйте экземпляр TestScheduler в вашем SubscribeOn оператора.

Далее я предлагаю заменить использование объектов + планирование с использованием методов фабричной последовательности наблюдений TestScheduler, т. Е. CreateColdObservable. Наконец-то я не знаю, может ли продвигаться к периоду 1-го года, что-то, только используя метод Start. Я думаю, что это уменьшит шум и использование магического значения 1s.

var testScheduler = new TestScheduler(); 

var source = testScheduler.CreateColdObservable<string>(
    ReactiveTest.OnNext(TimeSpan.FromSeconds(0.1).Ticks, "one"), 
    ReactiveTest.OnCompleted<string>(TimeSpan.FromSeconds(0.2).Ticks)); 

var subscription = source.SubscribeOn(testScheduler) 
          .Subscribe(
           item => Console.WriteLine(item), 
           error => Console.WriteLine(error), 
           () => Console.WriteLine("Complete!") 
          ); 

testScheduler.Start(); 

Console.WriteLine("DONE."); 
Console.ReadLine(); 

Единственная проблема в настоящее время является то, что SubscribeOn вызов является довольно излишним.

FYI: Код для NewThreadScheduler - https://github.com/Reactive-Extensions/Rx.NET/blob/master/Rx.NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/NewThreadScheduler.cs

+0

Спасибо за этот очень подробный ответ, это был не просто производственный код, а просто попытка выяснить, как все работает, и это сделало его немного странным! Похоже, что смешивание TestScheduler с «реальными» планировщиками вызывает больше проблем, поэтому мне нужно настроить, как я экспериментирую. – OwenP

+0

Да, согласен. Либо «виртуализировать» время, используя testcheduler/historyscheduler, либо использовать реальные –

Смежные вопросы