2016-03-23 5 views
0

Итак, вот сценарий:Лучший способ конвертировать потокобезопасную коллекцию в DataTable?

Мне нужно взять группу данных, обработать их, построить объект, а затем вставить эти объекты в базу данных.

Чтобы увеличить производительность, я многопоточно обрабатываю данные с помощью параллельного цикла и сохраняю объекты в списке CollectionBag.

Эта часть работает хорошо. Тем не менее, проблема здесь заключается в том, что мне теперь нужно взять этот список, преобразовать его в объект DataTable и вставить данные в базу данных. Это очень некрасиво, и я чувствую, что я делаю это не в лучшем виде (псевдо ниже):

ConcurrentBag<FinalObject> bag = new ConcurrentBag<FinalObject>(); 

ParallelOptions parallelOptions = new ParallelOptions(); 
parallelOptions.MaxDegreeOfParallelism = Environment.ProcessorCount; 

Parallel.ForEach(allData, parallelOptions, dataObj => 
{ 
    .... Process data .... 

    bag.Add(theData); 

    Thread.Sleep(100); 
}); 

DataTable table = createTable(); 
foreach(FinalObject moveObj in bag) { 
    table.Rows.Add(moveObj.x); 
} 
+1

Вы можете преобразовать FinalObject в DataRow также в параллельный цикл, чтобы добавить еще большую производительность, делая сумку как Concurrent . – Nemo

+0

Значит, вы просто добавляете одно свойство базовых объектов в таблицу данных? Если у вас уже есть объекты в коллекции, зачем нужна таблица данных? Почему бы просто не заполнить таблицу данных в первую очередь? –

+0

Я упростил это для этого примера. Используемые данные (я использую 9 из них) варьируются от 7 столбцов до 13 столбцов. – user2124871

ответ

1

Это является хорошим кандидатом для PLINQ (или Rx - я сосредоточусь на PLINQ, так как это часть библиотеки базового класса).

IEnumerable<FinalObject> bag = allData 
    .AsParallel() 
    .WithDegreeOfParallelism(Environment.ProcessorCount) 
    .Select(dataObj => 
    { 
     FinalObject theData = Process(dataObj); 

     Thread.Sleep(100); 

     return theData; 
    }); 

DataTable table = createTable(); 

foreach (FinalObject moveObj in bag) 
{ 
    table.Rows.Add(moveObj.x); 
} 

Реалистично, вместо того, чтобы задушить петлю через Thread.Sleep, вы должны ограничить максимальную степень параллелизма дальше, пока вы не получите загрузку процессора до нужного уровня.

Отказ от ответственности: все нижеследующее предназначено только для развлечения, хотя оно делает фактически работает.

Конечно, вы всегда можете пнуть его на ступеньку выше и производите полную на асинхронном Parallel.ForEach реализации, которая позволяет обрабатывать ввод параллельно и сделать свое дросселирование асинхронно, без блокирования каких-либо пул потоков потоков.

async Task ParallelForEachAsync<TInput, TResult>(IEnumerable<TInput> input, 
               int maxDegreeOfParallelism, 
               Func<TInput, Task<TResult>> body, 
               Action<TResult> onCompleted) 
{ 
    Queue<TInput> queue = new Queue<TInput>(input); 

    if (queue.Count == 0) { 
     return; 
    } 

    List<Task<TResult>> tasksInFlight = new List<Task<TResult>>(maxDegreeOfParallelism); 

    do 
    { 
     while (tasksInFlight.Count < maxDegreeOfParallelism && queue.Count != 0) 
     { 
      TInput item = queue.Dequeue(); 
      Task<TResult> task = body(item); 

      tasksInFlight.Add(task); 
     } 

     Task<TResult> completedTask = await Task.WhenAny(tasksInFlight).ConfigureAwait(false); 

     tasksInFlight.Remove(completedTask); 

     TResult result = completedTask.GetAwaiter().GetResult(); // We know the task has completed. No need for await. 

     onCompleted(result); 
    } 
    while (queue.Count != 0 || tasksInFlight.Count != 0); 
} 

Usage (full Fiddle here):

async Task<DataTable> ProcessAllAsync(IEnumerable<InputObject> allData) 
{ 
    DataTable table = CreateTable(); 
    int maxDegreeOfParallelism = Environment.ProcessorCount; 

    await ParallelForEachAsync(
     allData, 
     maxDegreeOfParallelism, 
     // Loop body: these Tasks will run in parallel, up to {maxDegreeOfParallelism} at any given time. 
     async dataObj => 
     { 
      FinalObject o = await Task.Run(() => Process(dataObj)).ConfigureAwait(false); // Thread pool processing. 

      await Task.Delay(100).ConfigureAwait(false); // Artificial throttling. 

      return o; 
     }, 
     // Completion handler: these will be executed one at a time, and can safely mutate shared state. 
     moveObj => table.Rows.Add(moveObj.x) 
    ); 

    return table; 
} 

struct InputObject 
{ 
    public int x; 
} 

struct FinalObject 
{ 
    public int x; 
} 

FinalObject Process(InputObject o) 
{ 
    // Simulate synchronous work. 
    Thread.Sleep(100); 

    return new FinalObject { x = o.x }; 
} 

То же поведение, но без Thread.Sleep и ConcurrentBag<T>.

+0

Благодарим вас за предложение - я потратил время, чтобы опробовать его и посмотреть, что даст мне конечный результат. Это было примерно на 2 секунды быстрее, поэтому это не слишком много. Тем не менее, я смотрю на некоторые из ваших других предложений, чтобы увидеть, что я могу получить от них. Спасибо! – user2124871

+0

@ user2124871, если «быстрее» - это то, что вам нужно, тогда вы определенно не хотите «Thread.Sleep», «Task.Delay» или любую другую искусственную задержку. Если вы идете с вашим решением Parallel.ForEach или моей версией PLINQ, это не будет иметь особого значения, кроме как в стиле кода. Вы должны профилировать компонент обработки вашего решения (* «данные процесса» *) и оптимизировать его для сокращения использования ЦП. –

0

Я думаю, что что-то подобное должно дать лучшую производительность, похоже, что объект [] является лучшим вариантом, чем DataRow, поскольку вам нужен DataTable для получения объекта DataRow.

ConcurrentBag<object[]> bag = new ConcurrentBag<object[]>(); 

Parallel.ForEach(allData, 
    new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount }, 
    dataObj => 
{ 
    object[] row = new object[colCount]; 

    //do processing 

    bag.Add(row); 

    Thread.Sleep(100); 
}); 

DataTable table = createTable(); 
foreach (object[] row in bag) 
{ 
    table.Rows.Add(row); 
} 
0

Похоже, у Вас есть сложные вещи совсем немного, Тринг, чтобы сделать все, что работать параллельно, но если вы храните DataRow obejcts в вашей сумке вместо простых объектов, в конце концов, вы можете использовать DataTableExtensions создать DataTable из общей коллекции довольно легко:

var dataTable = bag.CopyToDataTable(); 

Просто добавьте ссылку на System.Data.DataSetExtensions в вашем проекте.

+1

'CopyToDataTable ' имеет общее ограничение 'T: DataRow', поэтому оно появляется только для небольшого подмножества общих коллекций (что приводит к обходным методам вроде этого: https://msdn.microsoft.com/en-us/library /bb669096(v=vs.110).aspx). Я что-то упускаю? –

+0

@KirillShlenskiy Вот почему я сказал: «Если вы храните объекты DataRow в сумке вместо простых объектов». Я все еще смущен тем, почему OP использует всю эту сложность для создания коллекции параллельно, только для того, чтобы развернуться и преобразовать ее в «DataTable» серийно. –

+0

Причина параллельной коллекции - это объем обработки, который происходит в цикле. Выполнение его последовательно занимает много времени, и это позволяет мне двигаться намного быстрее. Данной способ используется для вставки всех записей, которые мне нужны для размещения в БД. Так как это не потокобезопасная коллекция, я храню в потокобезопасной коллекции и конвертирую в среду для вставки в БД. Это может быть излишним, пытаясь понять это. – user2124871

Смежные вопросы