2014-01-05 3 views
12

У меня есть очень большие файлы, которые я должен читать и обрабатывать. Можно ли это сделать параллельно с использованием Threading?Параллельно чтение и обработка файлов C#

Вот немного кода, который я сделал. Но, похоже, он не получает более короткое время выполнения чтения и обработки файлов один за другим.

String[] files = openFileDialog1.FileNames; 

Parallel.ForEach(files, f => 
{ 
    readTraceFile(f); 
});   

private void readTraceFile(String file) 
{ 
    StreamReader reader = new StreamReader(file); 
    String line; 

    while ((line = reader.ReadLine()) != null) 
    { 
     String pattern = "\\s{4,}"; 

     foreach (String trace in Regex.Split(line, pattern)) 
     { 
      if (trace != String.Empty) 
      { 
       String[] details = Regex.Split(trace, "\\s+"); 

       Instruction instruction = new Instruction(details[0], 
        int.Parse(details[1]), 
        int.Parse(details[2])); 
       Console.WriteLine("computing..."); 
       instructions.Add(instruction); 
      } 
     } 
    } 
} 
+4

Вы связаны с CPU или связаны с IO? – SLaks

+1

Является ли «инструкциями» потокобезопасными? (ответ: нет) – SLaks

+2

Системы ввода-вывода не очень быстры, как ваш процессор, поэтому не стоит удивляться, чтобы не использовать преимущества нескольких потоков при подключении IO. –

ответ

15

Похоже, что производительность вашего приложения в основном ограничена IO. Тем не менее, у вас все еще есть часть работы, связанной с процессором, в вашем коде. Эти два бита работы взаимозависимы: ваша работа с процессором не может начаться до тех пор, пока IO не выполнит свою работу, и IO не перейдет к следующему рабочему элементу, пока ваш процессор не завершит предыдущий. Они оба держат друг друга. Поэтому можно (объяснено в самом низу), что вы будете видеть улучшение пропускной способности при выполнении вашего io- и CPU переплете работать параллельно, например, так:

void ReadAndProcessFiles(string[] filePaths) 
{ 
    // Our thread-safe collection used for the handover. 
    var lines = new BlockingCollection<string>(); 

    // Build the pipeline. 
    var stage1 = Task.Run(() => 
    { 
     try 
     { 
      foreach (var filePath in filePaths) 
      { 
       using (var reader = new StreamReader(filePath)) 
       { 
        string line; 

        while ((line = reader.ReadLine()) != null) 
        { 
         // Hand over to stage 2 and continue reading. 
         lines.Add(line); 
        } 
       } 
      } 
     } 
     finally 
     { 
      lines.CompleteAdding(); 
     } 
    }); 

    var stage2 = Task.Run(() => 
    { 
     // Process lines on a ThreadPool thread 
     // as soon as they become available. 
     foreach (var line in lines.GetConsumingEnumerable()) 
     { 
      String pattern = "\\s{4,}"; 

      foreach (String trace in Regex.Split(line, pattern)) 
      { 
       if (trace != String.Empty) 
       { 
        String[] details = Regex.Split(trace, "\\s+"); 

        Instruction instruction = new Instruction(details[0], 
         int.Parse(details[1]), 
         int.Parse(details[2])); 
        Console.WriteLine("computing..."); 
        instructions.Add(instruction); 
       } 
      } 
     } 
    }); 

    // Block until both tasks have completed. 
    // This makes this method prone to deadlocking. 
    // Consider using 'await Task.WhenAll' instead. 
    Task.WaitAll(stage1, stage2); 
} 

Я сильно сомневаюсь, что это ваша работа CPU держит вещи, но если это случается так, то вы можете также parallelise этап 2, как так:

var stage2 = Task.Run(() => 
    { 
     var parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount }; 

     Parallel.ForEach(lines.GetConsumingEnumerable(), parallelOptions, line => 
     { 
      String pattern = "\\s{4,}"; 

      foreach (String trace in Regex.Split(line, pattern)) 
      { 
       if (trace != String.Empty) 
       { 
        String[] details = Regex.Split(trace, "\\s+"); 

        Instruction instruction = new Instruction(details[0], 
         int.Parse(details[1]), 
         int.Parse(details[2])); 
        Console.WriteLine("computing..."); 
        instructions.Add(instruction); 
       } 
      } 
     }); 
    }); 

Имейте в виду, если ваша работа процессора компонент пренебрежимо мала по сравнению с компонентом IO, вы не увидите много ускорения. Чем больше рабочая нагрузка, тем лучше трубопровод будет работать по сравнению с последовательной обработкой.

Поскольку мы говорим об оценке эффективности, я не особенно волнуюсь о количестве блокирующих вызовов в приведенном выше коде. Если бы я делал это в своем собственном проекте, я бы пошел по маршруту асинхронного/ожидающего. Я решил не делать этого в этом случае, потому что я хотел, чтобы все было легко понять и легко интегрировать.

6

С точки зрения того, что вы пытаетесь сделать, вы почти наверняка связаны с I/O. Попытка параллельной обработки в случае не поможет и может фактически замедлить обработку из-за операций поиска сложения на дисках (если только вы не можете разделить данные на несколько шпинделей).

+0

И если я связан с I/O, может ли что-нибудь сделать для повышения производительности? – patentul

+0

@ user2936347 обычно делает много асинхронных вызовов лучше для ввода-вывода. взгляните на новый шаблон 'async-await' – i3arnon

+0

@ user2936347: Существует несколько стратегий, помогающих с проблемами ввода-вывода. Однако большинство из них требуют инвестиций в оборудование. Это означает один более быстрый диск (например, SSD), RAID 0 или 1 или даже просто разделение файлов на нескольких дисках, каждый со своими независимыми контроллерами или их комбинацией. – NotMe

0

Попробуйте обрабатывать линии параллельно. Например:

var q = from file in files 
     from line in File.ReadLines(file).AsParallel() // for smaller files File.ReadAllLines(file).AsParallel() might be faster 
     from trace in line.Split(new [] {" "}, StringSplitOptions.RemoveEmptyEntries) // split by 4 spaces and no need for trace != "" check 
     let details = trace.Split(null as char[], StringSplitOptions.RemoveEmptyEntries) // like Regex.Split(trace, "\\s+") but removes empty strings too 
     select new Instruction(details[0], int.Parse(details[1]), int.Parse(details[2])); 

List<Instruction> instructions = q.ToList(); // all of the file reads and work is done here with .ToList 

Произвольный доступ к жесткому диску, не SSD (при попытке чтения/записи различных файлов одновременно или фрагментированный файл), как правило, гораздо медленнее, чем последовательный доступ (например, чтении одного дефрагментированный файл), поэтому я ожидаю, что обработка одного файла параллельно будет быстрее с дефрагментированными файлами.

Кроме того, совместное использование ресурсов по потокам (например, Console.Write или добавление в коллекцию блокировки потоковой блокировки) может замедлить или заблокировать/заблокировать выполнение, потому что некоторым потокам придется ждать, пока другие потоки будут завершите доступ к этому ресурсу.

+0

Спасибо, но это двухгодичная тема, которая мне нужна для задания для школы :) – patentul

Смежные вопросы