2013-07-21 2 views
7

В настоящее время я пытаюсь обернуть голову вокруг параллелизма с помощью RX .NET и запутать что-то. Я хочу запустить четыре относительно медленных задачи параллельно, поэтому я предположил, что NewThreadScheduler.Default будет способом, так как он «Представляет объект, который планирует каждую единицу работы в отдельном потоке»..NewThreadScheduler.Default планирует все работать в одном потоке

Вот мой код установки:

static void Test() 
    { 
     Console.WriteLine("Starting. Thread {0}", Thread.CurrentThread.ManagedThreadId); 

     var query = Enumerable.Range(1, 4); 
     var obsQuery = query.ToObservable(NewThreadScheduler.Default); 
     obsQuery.Subscribe(DoWork, Done); 

     Console.WriteLine("Last line. Thread {0}", Thread.CurrentThread.ManagedThreadId); 
    } 

    static void DoWork(int i) 
    { 
     Thread.Sleep(500); 
     Console.WriteLine("{0} Thread {1}", i, Thread.CurrentThread.ManagedThreadId); 
    } 

    static void Done() 
    { 
     Console.WriteLine("Done. Thread {0}", Thread.CurrentThread.ManagedThreadId); 
    } 

Я принял «X тему Y» будет выводить другой идентификатор потока каждый раз, однако фактический выход:

Starting. Thread 1 
Last line. Thread 1 
1 Thread 3 
2 Thread 3 
3 Thread 3 
4 Thread 3 
Done. Thread 3 

Все работы будучи одним из тех же новых потоков в последовательном порядке, чего я не ожидал.

Я предполагаю, что у меня что-то не хватает, но я не могу понять, что.

ответ

8

Есть две части для наблюдаемого запроса: Query и Subscription. (Это также разница между ObserveOn и SubscribeOn операторов.)

Ваш Query является

Enumerable 
    .Range(1, 4) 
    .ToObservable(NewThreadScheduler.Default); 

Это создает наблюдаемое, что выдает значения по умолчанию NewThreadScheduler для этой системы.

Ваша подписка

obsQuery.Subscribe(DoWork, Done); 

Это работает DoWork для каждого значения, полученного в Query и Done когда Query завершается с OnComplete вызова. Я не думаю, что есть какие-либо гарантии относительно того, на какой поток будут вызваны функции в методе подписки, на практике, если все значения запроса создаются в том же потоке, который является потоком, на котором будет выполняться подписка. Похоже, что они также делают так, что все вызовы подписки выполняются в том же потоке, который, скорее всего, сделан, чтобы избавиться от множества распространенных ошибок многопоточности.

Так у вас есть два вопроса, один с вашим протоколирования, если вы измените ваш Query на

Enumerable 
    .Range(1, 4) 
    .Do(x => Console.WriteLine("Query Value {0} produced on Thread {1}", x, Thread.CurrentThread.ManagedThreadId); 
    .ToObservable(NewThreadScheduler.Default); 

Вы увидите значение каждого полученного в новом потоке.

Другая проблема - одно из намерений и дизайна Rx. Это предназначено для, что Query является долгосрочным процессом, а Subscription - это короткий метод, который имеет дело с результатами. Если вы хотите запустить долговременную функцию как Rx Observable, ваш лучший вариант - использовать Observable.ToAsync.

+0

Благодарим вас за это. Вы не только разъясняете, что я делаю неправильно, но также думаю, что вы говорите о намерениях Rx, которые имеют большой смысл. –

+0

Вышеприведенный ответ кажется устаревшим, метод Do больше недоступен в Enumerable. – VivekDev

+0

Для того, чтобы метод Do работал, вам необходимо установить пакет System.Interactive nuget – VivekDev

Смежные вопросы