2

У меня есть сценарий, в котором я должен обрабатывать несколько файлов (например, 30) параллельно на основе процессорных ядер. Я должен назначить эти файлы для разделения задач на основе отсутствия процессорных ядер. Я не знаю, как сделать начальный и конечный предел каждой задачи для обработки. Например, каждая задача знает, сколько файлов она должна обрабатывать.Как обрабатывать файлы каталога в параллельной библиотеке задач?

private void ProcessFiles(object e) 
    { 
     try 
     { 
      var diectoryPath = _Configurations.Descendants().SingleOrDefault(Pr => Pr.Name == "DirectoryPath").Value; 

      var FilePaths = Directory.EnumerateFiles(diectoryPath); 
      int numCores = System.Environment.ProcessorCount; 
      int NoOfTasks = FilePaths.Count() > numCores ? (FilePaths.Count()/ numCores) : FilePaths.Count(); 


      for (int i = 0; i < NoOfTasks; i++) 
      { 
       Task.Factory.StartNew(
         () => 
         { 
          int startIndex = 0, endIndex = 0; 
          for (int Count = startIndex; Count < endIndex; Count++) 
          { 
           this.ProcessFile(FilePaths); 
          } 
         }); 

      } 
     } 
     catch (Exception ex) 
     { 
      throw; 
     } 
    } 
+2

Задача-параллельная библиотека будет посвящена многоядерной архитектуре под капотом. Нам не нужно заботиться о доступных системных ядрах при создании задач. – William

+2

Я определенно не эксперт в параллельной библиотеке задач, но не TPL должен сам обрабатывать количество ядер процессора и определять лучший способ «разделить» рабочую нагрузку? –

+3

Здесь может быть проблема, если в каталоге 100 файлов, не будет хорошей идеей создать 100 задач. Таким образом, вы можете использовать цикл Parallel.For. Он будет внутренне создавать разделы и будет устанавливать параллельную обработку, полагаясь на собственный разделитель. – Usman

ответ

2

Для таких проблем, как у тебя, есть параллельные структуры данных доступно на C#. Вы хотите использовать BlockingCollection и хранить в нем все имена файлов.

Ваша идея расчета количества задач с использованием количества ядер, доступных на машине, не очень хороша. Зачем? Потому что ProcessFile() может не принимать одинаковое время для каждого файла. Итак, было бы лучше начать количество задач, как количество ядер, которые у вас есть. Затем пусть каждая задача будет читать имя файла один за другим из BlockingCollection и затем обрабатывать файл, пока BlockingCollection не будет пустым.

try 
{ 
    var directoryPath = _Configurations.Descendants().SingleOrDefault(Pr => Pr.Name == "DirectoryPath").Value; 

    var filePaths = CreateBlockingCollection(directoryPath); 
    //Start the same #tasks as the #cores (Assuming that #files > #cores) 
    int taskCount = System.Environment.ProcessorCount; 

    for (int i = 0; i < taskCount; i++) 
    { 
     Task.Factory.StartNew(
       () => 
       { 
        string fileName; 
        while (!filePaths.IsCompleted) 
        { 
         if (!filePaths.TryTake(out fileName)) continue; 
         this.ProcessFile(fileName); 
        } 
       }); 
    } 
} 

И CreateBlockingCollection() будет выглядеть следующим образом:

private BlockingCollection<string> CreateBlockingCollection(string path) 
{ 
    var allFiles = Directory.EnumerateFiles(path); 
    var filePaths = new BlockingCollection<string>(allFiles.Count); 
    foreach(var fileName in allFiles) 
    { 
     filePaths.Add(fileName); 
    } 
    filePaths.CompleteAdding(); 
    return filePaths; 
} 

Вы должны изменить ваш ProcessFile() получить имя файла в настоящее время вместо того, чтобы все пути к файлам и обработки его кусок.

Преимущество этого подхода заключается в том, что теперь ваш процессор не будет находиться под подпиской или не будет подписан, и нагрузка будет равномерно сбалансирована.


Я не запускать код сам, так что может быть какая-то ошибка синтаксиса в моем коде. Не стесняйтесь исправить ошибку, если вы столкнетесь с ней.

+0

Спасибо, но я могу сделать заказ синхронным, потому что я должен обрабатывать файл в порядке, в котором он приходит к процессу. Также в случае исключения, как я буду обрабатывать неисправные файлы. Также я должен передать обработанные файлы в поток пользовательского интерфейса, чтобы обновить графический интерфейс с содержимым файлов. – ehafeez

+1

Вы можете сохранить другое, перейдя в очередь «BlockingCollection», как в [этом случае] [http://stackoverflow.com/a/3825322/213550]. Вы можете проверить свойство «Исключение» для каждой задачи для каждого файла и посмотреть, не является ли оно нулевым. Вы можете 'ContinueWith' или' WhenAny' методы для обновления пользовательского интерфейса. – VMAtm

+0

@ehafeez: предложения VMAtm верны. Попробовать их. – displayName

2

Основываясь на моем заведомо ограниченное понимании TPL, я думаю, что ваш код может быть переписан так:

private void ProcessFiles(object e) 
{ 
    try 
    { 
     var diectoryPath = _Configurations.Descendants().SingleOrDefault(Pr => Pr.Name == "DirectoryPath").Value; 

     var FilePaths = Directory.EnumerateFiles(diectoryPath); 

     Parallel.ForEach(FilePaths, path => this.ProcessFile(path)); 

    } 
    catch (Exception ex) 
    { 
     throw; 
    } 
} 

приветы

+0

Файлы могут быть 1000 за один раз, поэтому я не могу использовать parallel.foreach, потому что мне нужно обновить GUI в реальном времени после обработки файла. – ehafeez

+0

Это не было в вашем ОП. Как вы можете себе представить, у нас нет хрустальных шаров, чтобы читать все ваши требования. В следующий раз, пожалуйста, включите ВСЕ ваши требования в свой вопрос, вместо того, чтобы добавлять их по одному после получения ответов. Спасибо. –

Смежные вопросы