Это является хорошим кандидатом для PLINQ (или Rx - я сосредоточусь на PLINQ, так как это часть библиотеки базового класса).
IEnumerable<FinalObject> bag = allData
.AsParallel()
.WithDegreeOfParallelism(Environment.ProcessorCount)
.Select(dataObj =>
{
FinalObject theData = Process(dataObj);
Thread.Sleep(100);
return theData;
});
DataTable table = createTable();
foreach (FinalObject moveObj in bag)
{
table.Rows.Add(moveObj.x);
}
Реалистично, вместо того, чтобы задушить петлю через Thread.Sleep
, вы должны ограничить максимальную степень параллелизма дальше, пока вы не получите загрузку процессора до нужного уровня.
Отказ от ответственности: все нижеследующее предназначено только для развлечения, хотя оно делает фактически работает.
Конечно, вы всегда можете пнуть его на ступеньку выше и производите полную на асинхронном Parallel.ForEach
реализации, которая позволяет обрабатывать ввод параллельно и сделать свое дросселирование асинхронно, без блокирования каких-либо пул потоков потоков.
async Task ParallelForEachAsync<TInput, TResult>(IEnumerable<TInput> input,
int maxDegreeOfParallelism,
Func<TInput, Task<TResult>> body,
Action<TResult> onCompleted)
{
Queue<TInput> queue = new Queue<TInput>(input);
if (queue.Count == 0) {
return;
}
List<Task<TResult>> tasksInFlight = new List<Task<TResult>>(maxDegreeOfParallelism);
do
{
while (tasksInFlight.Count < maxDegreeOfParallelism && queue.Count != 0)
{
TInput item = queue.Dequeue();
Task<TResult> task = body(item);
tasksInFlight.Add(task);
}
Task<TResult> completedTask = await Task.WhenAny(tasksInFlight).ConfigureAwait(false);
tasksInFlight.Remove(completedTask);
TResult result = completedTask.GetAwaiter().GetResult(); // We know the task has completed. No need for await.
onCompleted(result);
}
while (queue.Count != 0 || tasksInFlight.Count != 0);
}
Usage (full Fiddle here):
async Task<DataTable> ProcessAllAsync(IEnumerable<InputObject> allData)
{
DataTable table = CreateTable();
int maxDegreeOfParallelism = Environment.ProcessorCount;
await ParallelForEachAsync(
allData,
maxDegreeOfParallelism,
// Loop body: these Tasks will run in parallel, up to {maxDegreeOfParallelism} at any given time.
async dataObj =>
{
FinalObject o = await Task.Run(() => Process(dataObj)).ConfigureAwait(false); // Thread pool processing.
await Task.Delay(100).ConfigureAwait(false); // Artificial throttling.
return o;
},
// Completion handler: these will be executed one at a time, and can safely mutate shared state.
moveObj => table.Rows.Add(moveObj.x)
);
return table;
}
struct InputObject
{
public int x;
}
struct FinalObject
{
public int x;
}
FinalObject Process(InputObject o)
{
// Simulate synchronous work.
Thread.Sleep(100);
return new FinalObject { x = o.x };
}
То же поведение, но без Thread.Sleep
и ConcurrentBag<T>
.
Вы можете преобразовать FinalObject в DataRow также в параллельный цикл, чтобы добавить еще большую производительность, делая сумку как Concurrent. –
Nemo
Значит, вы просто добавляете одно свойство базовых объектов в таблицу данных? Если у вас уже есть объекты в коллекции, зачем нужна таблица данных? Почему бы просто не заполнить таблицу данных в первую очередь? –
Я упростил это для этого примера. Используемые данные (я использую 9 из них) варьируются от 7 столбцов до 13 столбцов. – user2124871