Я следую ответу на этот вопрос: Rx extensions: Where is Parallel.ForEach?, чтобы параллельно выполнять несколько операций с использованием Rx.Rx расширения Parallel.ForEach дросселирование
Проблема, с которой я сталкиваюсь, заключается в том, что она, как представляется, выделяет новый поток для каждый запрос, тогда как с использованием Parallel.ForEach
сделано значительно меньше.
Процессы, которые я запускаю параллельно, довольно интенсивно хранятся в памяти, поэтому, если я пытаюсь обрабатывать сотни элементов сразу, то ответ, предоставленный связанному вопросу, быстро обнаруживает, что у меня заканчивается память.
Есть ли способ изменить этот ответ, чтобы уменьшить количество элементов, выполняемых в любой момент времени?
Я принял взглянуть на Window
и Buffer
операций, мой код выглядит следующим образом:
return inputs.Select(i => new AccountViewModel(i))
.ToObservable()
.ObserveOn(RxApp.MainThreadScheduler)
.ToList()
.Do(l =>
{
using (Accounts.SuppressChangeNotifications())
{
Accounts.AddRange(l);
}
})
.SelectMany(x => x)
.SelectMany(acc => Observable.StartAsync(async() =>
{
var res = await acc.ProcessAsync(config, m, outputPath);
processed++;
var prog = ((double) processed/inputs.Count())*100.0;
OverallProgress.Message.OnNext(string.Format("Processing Accounts ({0:000}%)", prog));
OverallProgress.Progress.OnNext(prog);
return res;
}))
.All(x => x);
В идеале я хочу, чтобы иметь возможность партии его на куски просмотреть счет моделей, что я тогда вызовите метод ProcessAsync
, и только один раз все эти партии будут продолжены.
В идеале мне бы хотелось, чтобы, если бы даже одна из партий закончилась, она двигалась дальше, но только когда-либо сохраняла тот же размер партии.
Так что, если у меня есть партия из 5 и 1 штук, я бы хотел, чтобы другой начал, но только один, пока не будет доступно больше места.
Fyi вы можете использовать 'FromAsync' вместо' DeferAsync', который позволяет вашей асинхронной лямбда более естественным образом возвращать свой результат без необходимости обертывать его 'Return'. – Brandon
Также обратите внимание, что ваш код для отслеживания и отслеживания прогресса имеет некоторые многопоточные условия гонки и потенциально нарушает некоторые правила параллельного управления Rx. Я предлагаю переместить весь код отслеживания прогресса в предложение 'Do' сразу после предложения' Merge'. Это устранит гонки. – Brandon
@Brandon не будет перемещать это в Do после слияния, просто означает, что отчет о прогрессе происходит только после каждой партии? Я хочу сообщать о прогрессе после каждого элемента. – Clint