2012-05-21 2 views
6

я считаю себя часто писать C++ кода вида:Идиоматический способ распараллеливания функции через строки файла в C++

while (getline(strm, line)) { 
    cout << computationally_intensive_function(line) << endl; 
} 

Я хотел бы распараллелить этот код. Лучшее решение, которое я придумал до сих пор построить вектор строк, чтобы держать большое (10000-100000) количество строк, а затем распараллеливание над этим вектором с

#pragma omp parallel for 

Затем опорожнить вектор и повторите время линии остаются. Однако для этого метода требуется много памяти, а остальные ядра неактивны, в то время как основной процесс - это буферизация строк. Есть ли способ лучше? Что-то вроде Python's multiprocessing.Pool.map или Hadoop? (Я хотел бы избежать с помощью C++ API Hadoop в однако, поскольку Hadoop довольно тяжеловесный и не может быть установлен везде мой код будет работать.)

+2

Если у вас есть доступ к компилятору C++ 11, вы можете использовать 'std :: async' или реализовать свой собственный пул потоков с помощью' std :: packaged_task'. – xDD

+0

Если каждая строка независима и доступен ваш файл, почему бы не рубить его раньше времени и не передать его нескольким экземплярам этого процесса? –

+0

длинный рассказ длинный, вам понадобятся потенциально более чем одна строка буферизации ядра. вы должны иметь возможность балансировать нагрузку между ядрами, так что вычислительно интенсивная функция выполняется как можно больше, а потоки, которые являются буферизацией, также идут на полную скорость. это не тривиальный ответ –

ответ

5

Там существует, что не очень хорошо известна особенность OpenMP 3.0 задач, что весьма неудачный, поскольку они были специально созданы для покрытия таких дел, как этот. Если ваш компилятор поддерживает эту стандартную версию, вы обязательно должны пойти на задачи OpenMP. Но имейте в виду, что запись в stdout (или std::cout) из нескольких потоков обычно смешивает их вывод плохо, и вы, скорее всего, хотят, чтобы синхронизировать на нем:

#pragma omp parallel 
{ 
    #pragma omp master 
    while (getline(strm, line)) 
    #pragma omp task 
    { 
     result_type result = computationally_intensive_function(line); 
     #pragma omp critical 
     { 
      cout << result << endl; 
      cout.flush(); 
     } 
    } 
    #pragma omp taskwait 
} 

Я оставляю это до вас, чтобы решить, какие переменные должны быть shared и что должно быть private.

+0

То, что мне нужно! Благодаря!! – gilesc

1

Вы должны накладывать свои вычисления на строки чтения из файла. Один из хороших способов сделать это - использовать алгоритм конвейерной сборки Threading Building Blocks. Что вы делаете, так это указать три (на основе того, что вы показываете в примере с псевдокодом), два последовательных и один параллельный. Последовательные фильтры являются входными и выходными. Первый считывает данные из файла по строкам и передает каждую строку второму фильтру, который является параллельным, и запускает вашу функцию вычисления/обработки в многопоточном режиме. Последний этап/фильтр также является серийным, и он делает вывод. Я копировать-вставить пример из Т учебника, который, кажется, делают именно то, что вы хотите достичь:

// Holds a slice of text. 
/** Instances *must* be allocated/freed using methods herein, because the 
C++ declaration 
represents only the header of a much larger object in memory. */ 
class TextSlice { 
    // Pointer to one past last character in sequence 
    char* logical_end; 
    // Pointer to one past last available byte in sequence. 
    char* physical_end; 
public: 
    // Allocate a TextSlice object that can hold up to max_size characters. 
    static TextSlice* allocate(size_t max_size) { 
     // +1 leaves room for a terminating null character. 
     TextSlice* t = (TextSlice*)tbb::tbb_allocator<char>().allocate(sizeof(TextSlice)+max_size+1); 
     t->logical_end = t->begin(); 
     t->physical_end = t->begin()+max_size; 
     return t; 
    } 
    // Free this TextSlice object 
    void free() { 
     tbb::tbb_allocator<char>().deallocate((char*)this, 
     sizeof(TextSlice)+(physical_end-begin())+1); 
    } 
    // Pointer to beginning of sequence 
    char* begin() {return (char*)(this+1);} 
    // Pointer to one past last character in sequence 
    char* end() {return logical_end;} 
    // Length of sequence 
    size_t size() const {return logical_end-(char*)(this+1);} 
    // Maximum number of characters that can be appended to sequence 
    size_t avail() const {return physical_end-logical_end;} 
    // Append sequence [first,last) to this sequence. 
    void append(char* first, char* last) { 
     memcpy(logical_end, first, last-first); 
     logical_end += last-first; 
    } 
    // Set end() to given value. 
    void set_end(char* p) {logical_end=p;} 
}; 

И функции, чтобы получить это Запускаемым:

void RunPipeline(int ntoken, FILE* input_file, FILE* output_file) { 
    tbb::parallel_pipeline(
    ntoken, 
    tbb::make_filter<void,TextSlice*>(
    tbb::filter::serial_in_order, MyInputFunc(input_file)) 
    & 
    tbb::make_filter<TextSlice*,TextSlice*>(
    tbb::filter::parallel, MyTransformFunc()) 
    & 
    tbb::make_filter<TextSlice*,void>(
    tbb::filter::serial_in_order, MyOutputFunc(output_file))); 
} 
Смежные вопросы