2015-06-01 2 views
1

Я следую ответу на этот вопрос: Rx extensions: Where is Parallel.ForEach?, чтобы параллельно выполнять несколько операций с использованием Rx.Rx расширения Parallel.ForEach дросселирование

Проблема, с которой я сталкиваюсь, заключается в том, что она, как представляется, выделяет новый поток для каждый запрос, тогда как с использованием Parallel.ForEach сделано значительно меньше.

Процессы, которые я запускаю параллельно, довольно интенсивно хранятся в памяти, поэтому, если я пытаюсь обрабатывать сотни элементов сразу, то ответ, предоставленный связанному вопросу, быстро обнаруживает, что у меня заканчивается память.

Есть ли способ изменить этот ответ, чтобы уменьшить количество элементов, выполняемых в любой момент времени?

Я принял взглянуть на Window и Buffer операций, мой код выглядит следующим образом:

return inputs.Select(i => new AccountViewModel(i)) 
    .ToObservable() 
    .ObserveOn(RxApp.MainThreadScheduler) 
    .ToList() 
    .Do(l => 
    { 
     using (Accounts.SuppressChangeNotifications()) 
     { 
      Accounts.AddRange(l); 
     } 
    }) 
    .SelectMany(x => x) 
    .SelectMany(acc => Observable.StartAsync(async() => 
    { 
     var res = await acc.ProcessAsync(config, m, outputPath); 
     processed++; 
     var prog = ((double) processed/inputs.Count())*100.0; 
     OverallProgress.Message.OnNext(string.Format("Processing Accounts ({0:000}%)", prog)); 
     OverallProgress.Progress.OnNext(prog); 
     return res; 
    })) 
    .All(x => x); 

В идеале я хочу, чтобы иметь возможность партии его на куски просмотреть счет моделей, что я тогда вызовите метод ProcessAsync, и только один раз все эти партии будут продолжены.

В идеале мне бы хотелось, чтобы, если бы даже одна из партий закончилась, она двигалась дальше, но только когда-либо сохраняла тот же размер партии.

Так что, если у меня есть партия из 5 и 1 штук, я бы хотел, чтобы другой начал, но только один, пока не будет доступно больше места.

ответ

0

Как обычно, Пол Беттс ответил на подобный вопрос, который решает мою проблему:

Вопрос: Reactive Extensions Parallel processing based on specific number

имеет некоторую информацию об использовании Observable.Defer, а затем объединять в пакеты, используя, что я изменил мой предыдущий код следующим образом:

return inputs.Select(i => new AccountViewModel(i)) 
    .ToObservable() 
    .ObserveOn(RxApp.MainThreadScheduler) 
    .ToList() 
    .Do(l => 
    { 
     using (Accounts.SuppressChangeNotifications()) 
     { 
      Accounts.AddRange(l); 
     } 
    }) 
    .SelectMany(x => x) 
    .Select(x => Observable.DeferAsync(async _ => 
    { 
     var res = await x.ProcessAsync(config, m, outputPath); 
     processed++; 
     var prog = ((double) processed/inputs.Count())*100.0; 
     OverallProgress.Message.OnNext(string.Format("Processing Accounts ({0:000}%)", prog)); 
     OverallProgress.Progress.OnNext(prog); 
     return Observable.Return(res); 
    })) 
    .Merge(5) 
    .All(x => x); 

И, конечно, я получаю качение поведения завершения (например, если 1/5 закончить тогда только один начинается).

Ясно, что у меня есть еще несколько основополагающих принципов, но это замечательно!

+0

Fyi вы можете использовать 'FromAsync' вместо' DeferAsync', который позволяет вашей асинхронной лямбда более естественным образом возвращать свой результат без необходимости обертывать его 'Return'. – Brandon

+0

Также обратите внимание, что ваш код для отслеживания и отслеживания прогресса имеет некоторые многопоточные условия гонки и потенциально нарушает некоторые правила параллельного управления Rx. Я предлагаю переместить весь код отслеживания прогресса в предложение 'Do' сразу после предложения' Merge'. Это устранит гонки. – Brandon

+0

@Brandon не будет перемещать это в Do после слияния, просто означает, что отчет о прогрессе происходит только после каждой партии? Я хочу сообщать о прогрессе после каждого элемента. – Clint

Смежные вопросы