2016-07-08 4 views
0

Я решил опубликовать это после нескольких часов, чтобы опробовать решения подобных проблем без успеха. Я пишу код C++ MPI + OpenMP, где один узел (сервер) MPI отправляет двойные массивы другим узлам. Сервер отправляет потоки для одновременного отправки нескольким клиентам. Серийная версия (только с MPI) работает очень хорошо, а также однопоточная версия. Многопоточная версия (openmp) продолжает бросать ошибку ошибки сегментации после случайного числа итераций. Строка printf("%d: cur_idx:%d, opt_k.k:%d, idx:%d, N:%d \n", tid, cur_idx,opt_k.k,idx,N) выводит значения на каждой итерации. Непредсказуемость - это количество итераций (в одном случае код успешно работал только для того, чтобы сбросить ошибку сбоя seg, когда я попытался запустить его снова сразу после). Однако он всегда завершается num_threads = 1. getData возвращает вектор structs, структура которого определяется как (int, int, double *).Ошибка MPI + OpenMP сегментации и непредсказуемое поведение

Вот код

double *tStatistics=new double[8], tmp_time; // wall clock time 
double SY, Sto; 
int a_tasks=0, file_p=0; 
vector<myDataType *> d = getData(); 

int idx=0; opt_k.k=1; opt_k.proc_files=0; opt_k.p=this->node_sz; 
opt_k.proc_files=0; SY=0; Sto=0; 
std::fill(header,header+SZ_HEADER,-1); 

omp_set_num_threads(5);// for now 
// parallel region 

#pragma omp parallel default(none) shared(d,idx,SY,Sto) private(a_tasks) 
{ 
    double *myHeader=new double[SZ_HEADER]; 
    std::fill(myHeader,myHeader+SZ_HEADER,0); 
    int tid = omp_get_thread_num(), cur_idx, cur_k; int N; 
    //#pragma omp atomic 
     N=d.size(); 
    while (idx<N) { 
     // Assign tasks and fetch results where available 
     cur_idx=N; 
     #pragma omp critical(update__idx) 
     { 
      if (idx<N) { 
       cur_idx=idx; cur_k=opt_k.k; idx+=cur_k; 
      } 
     } 
     if (cur_idx<N) { 
      printf("%d: cur_idx:%d, opt_k.k:%d, idx:%d, N:%d \n", tid, cur_idx,opt_k.k,idx,N); 
      MPI_Recv(myHeader,SZ_HEADER,MPI_DOUBLE,MPI_ANY_SOURCE,MPI_ANY_TAG,MY_COMM_GRP,this->Stat); 
      if(this->Stat->MPI_TAG == TAG_HEADER){ // serve tasks 
       while (cur_k && cur_idx<N) { 
        myHeader[1]=d[cur_idx]->nRows; myHeader[2]=d[cur_idx]->nCols; myHeader[3]=cur_idx; myHeader[9]=--cur_k; 
        MPI_Send(myHeader,SZ_HEADER,MPI_DOUBLE,(int)myHeader[4],TAG_DATA,MY_COMM_GRP); 
        MPI_Send(d[cur_idx]->data,d[cur_idx]->nRows*d[cur_idx]->nCols,MPI_DOUBLE,(int)myHeader[4],TAG_DATA,MY_COMM_GRP); 
        delete[] d[cur_idx]->data; ++cur_idx; 
       } 
      }else if(this->Stat->MPI_TAG == TAG_RESULT){ // collect results 
       printf("%d - 4\n", tid); 
      } 

     } //end if(loopmain) 
    } // end while(loopmain) 

} // end parallel section 

message("terminate slaves"); 
for(int i=1;i<node_sz;++i){ // terminate 
    MPI_Recv(header,SZ_HEADER,MPI_DOUBLE,MPI_ANY_SOURCE,MPI_ANY_TAG,MY_COMM_GRP,this->Stat); 
    MPI_Send(header,SZ_HEADER,MPI_DOUBLE,(int)header[4],TAG_TERMINATE,MY_COMM_GRP); 
} 
return 0; 

Другая функция соответствия является

void CMpifun::slave2() 
{ 
    double *Data; vector<myDataType> dataQ; vector<hist_type> resQ; 
    char out_opt='b'; // irrelevant 
    myDataType *out_im = new myDataType; hist_type *out_hist; CLdp ldp; 
    int file_cnt=0; double tmp_t; //local variables 

    while (true) { // main while loop 
     header[4]=myRank; MPI_Send(header,SZ_HEADER,MPI_DOUBLE,MASTER,TAG_HEADER,MY_COMM_GRP); 
     MPI_Recv(header,SZ_HEADER,MPI_DOUBLE,MASTER,MPI_ANY_TAG,MY_COMM_GRP,this->Stat); 
     if(this->Stat->MPI_TAG == TAG_TERMINATE) { 
      break; 
     } 
     //receive data 
     while(true) { 
      Data=new double[(int)(header[1]*header[2])]; 
      MPI_Recv(Data,(int)(header[1]*header[2]),MPI_DOUBLE,MASTER,TAG_DATA,MY_COMM_GRP,this->Stat); 
      myDataType d; d.data=Data; d.nRows=(int)header[1]; d.nCols=(int)header[2]; 
      //dataQ.push_back(d); 
      delete[] Data; 
      file_cnt++; 
      if ((int)header[9]) { 
       MPI_Recv(header,SZ_HEADER,MPI_DOUBLE,MASTER,TAG_DATA,MY_COMM_GRP,this->Stat); 
      } else break; 
     } 
    } // end main while loop 
    message("terminating"); 

Я перепробовал все рекомендации, касающиеся подобных проблем. Вот мои настройки среды

export OMP_WAIT_POLICY="active" 
export OMP_NUM_THREADS=4 
export OMP_DYNAMIC=true # "true","false" 
export OMP_STACKSIZE=200M # 
export KMP_STACKSIZE=$OMP_STACKSIZE 
ulimit -s unlimited 

Большое спасибо всем, что есть сколы. Я все больше убеждаюсь, что это связано с выделением памяти как-то, но и не понимаю, почему. Теперь у меня есть следующий код:

double CMpifun::sendData2() 
{ 
double *tStatistics=new double[8], tmp_time; // wall clock time 
double SY, Sto; int a_tasks=0, file_p=0; 
vector<myDataType *> d = getData(); 

int idx=0; opt_k.k=1; opt_k.proc_files=0; opt_k.p=this->node_sz; 
opt_k.proc_files=0; SY=0; Sto=0; 
std::fill(header,header+SZ_HEADER,-1); 

omp_set_num_threads(224);// for now 
// parallel region 

#pragma omp parallel default(none) shared(idx,SY,Sto,d) private(a_tasks) 
{ 
    double *myHeader=new double[SZ_HEADER]; 
    std::fill(myHeader,myHeader+SZ_HEADER,0); 
    int tid = omp_get_thread_num(), cur_idx, cur_k; int N; 

    //#pragma omp critical(update__idx) 
    { 
     N=d.size(); 
    } 
    while (idx<N) { 
     // Assign tasks and fetch results where available 
     cur_idx=N; 
     #pragma omp critical(update__idx) 
     { 
      if (idx<N) { 
       cur_idx=idx; cur_k=opt_k.k; idx+=cur_k; 
      } 
     } 
     if (cur_idx<N) { 
      //printf("%d: cur_idx:%d, opt_k.k:%d, idx:%d, N:%d \n", tid, cur_idx,opt_k.k,idx,N); 
      printf("%d: cur_idx:%d, N:%d \n", tid, cur_idx,N); 
      //#pragma omp critical(update__idx) 
      { 
       MPI_Recv(myHeader,SZ_HEADER,MPI_DOUBLE,MPI_ANY_SOURCE,MPI_ANY_TAG,MY_COMM_GRP,this->Stat); 
      } 
      if(this->Stat->MPI_TAG == TAG_HEADER){ // serve tasks 
       while (cur_k && cur_idx<N) { 
        //#pragma omp critical(update__idx) 
        { 
         myHeader[1]=d[cur_idx]->nRows; myHeader[2]=d[cur_idx]->nCols; myHeader[3]=cur_idx; 
         myHeader[9]=--cur_k; 
         MPI_Send(myHeader,SZ_HEADER,MPI_DOUBLE,(int)myHeader[4],TAG_DATA,MY_COMM_GRP); 
         MPI_Send(d[cur_idx]->data,d[cur_idx]->nRows*d[cur_idx]->nCols,MPI_DOUBLE,(int)myHeader[4],TAG_DATA,MY_COMM_GRP); 
         delete[] d[cur_idx]->data; 
        } 
        ++cur_idx; 
       } 
      }else if(this->Stat->MPI_TAG == TAG_RESULT){ // collect results 
       printf("%d - 4\n", tid); 
      } 

     } //end if(loopmain) 
    } // end while(loopmain) 

} // end parallel section 

message("terminate slaves"); 
for(int i=1;i<node_sz;++i){ // terminate 
    MPI_Recv(header,SZ_HEADER,MPI_DOUBLE,MPI_ANY_SOURCE,MPI_ANY_TAG,MY_COMM_GRP,this->Stat); 
    MPI_Send(header,SZ_HEADER,MPI_DOUBLE,(int)header[4],TAG_TERMINATE,MY_COMM_GRP); 
} 
return 0; 

И двойка

void CMpifun::slave2() 
{ 
double *Data; vector<myDataType> dataQ; vector<hist_type> resQ; 
char out_opt='b'; // irrelevant 
myDataType *out_im = new myDataType; hist_type *out_hist; CLdp ldp; 
int file_cnt=0; double tmp_t; //local variables 

while (true) { // main while loop 
header[4]=myRank; MPI_Send(header,SZ_HEADER,MPI_DOUBLE,MASTER,TAG_HEADER,MY_COMM_GRP); 
MPI_Recv(header,SZ_HEADER,MPI_DOUBLE,MASTER,MPI_ANY_TAG,MY_COMM_GRP,this->Stat); 
if(this->Stat->MPI_TAG == TAG_TERMINATE) { 
    break; 
} 
//receive data 
while(true) { 
    Data=new double[(int)(header[1]*header[2])]; 
    MPI_Recv(Data,(int)(header[1]*header[2]),MPI_DOUBLE,MASTER,TAG_DATA,MY_COMM_GRP,this->Stat); 
    myDataType *d=new myDataType; d->data=Data; d->nRows=(int)header[1]; d->nCols=(int)header[2]; 
    dataQ.push_back(*d); 
    delete[] Data; 
    file_cnt++; 
    if ((int)header[9]) { 
     MPI_Recv(header,SZ_HEADER,MPI_DOUBLE,MASTER,TAG_DATA,MY_COMM_GRP,this->Stat); 
    } else break; 
} 

// Error section: Uncommenting next line causes seg fault 
/*while (dataQ.size()) { // process data 
    out_hist = new hist_type(); 
    myDataType d = dataQ.back(); dataQ.pop_back(); // critical section 
    ldp.process(d.data, d.nRows,d.nCols,out_opt,out_im, out_hist); 
    resQ.push_back(*out_hist); out_hist=0; 
    delete[] d.data; delete[] out_im->data; 
}*/ 

//time_arr[1] /= file_cnt; time_arr[2] /= file_cnt; 
//header[6]=time_arr[0]; header[7]=time_arr[1]; header[8]=time_arr[2]; 
//header[4]=myRank; header[9]=resQ.size(); 

} // end main while loop 

Обновление является то, что если я раскомментировать время цикла в функции Slave2(), то запуск не будет завершена. Я не понимаю, что эта функция (slave2) вообще не имеет openmp/threading, но она, похоже, имеет эффект. Кроме того, он не использует никаких переменных с помощью функции threaded. Если я прокомментирую неприятный раздел, тогда код запускается независимо от количества потоков, которые я установил (4, 18, 300). Мои переменные среды OpenMP остаются прежними. Выход limit -a выглядит следующим образом,

core file size   (blocks, -c) 0 
data seg size   (kbytes, -d) unlimited 
scheduling priority    (-e) 0 
file size    (blocks, -f) unlimited 
pending signals     (-i) 30473 
max locked memory  (kbytes, -l) 64 
max memory size   (kbytes, -m) unlimited 
open files      (-n) 1024 
pipe size   (512 bytes, -p) 8 
POSIX message queues  (bytes, -q) 819200 
real-time priority    (-r) 0 
stack size    (kbytes, -s) 37355 
cpu time    (seconds, -t) unlimited 
max user processes    (-u) 30473 
virtual memory   (kbytes, -v) unlimited 
file locks      (-x) unlimited 

Мой конструктор также вызывает mpi_init_thread. Чтобы устранить проблему @Tim, причина, по которой я использовал динамическую память (с new), заключается в том, чтобы не раздувать стек стек, следуя рекомендации от решения аналогичной проблемы. Ваша помощь приветствуется.

+0

Как вы называете mpi внутри параллельной области, вы должны установить mpi-режим на mpi_init_threads и следовать инструкциям по привязке к потоку вашего mpi. – tim18

+0

@ tim18 Спасибо. Уже существует вызов 'MPI_Init (argc, argv)' для инициализации MPI, если это то, что вы имеете в виду. Есть ли какие-либо признаки нарушений безопасности потока в моем коде, которые вы можете идентифицировать? Я отправил код для проверки другими, потому что я ударил по потолку своей способности отладки. – darel

+0

Начните с создания среды MPI в потоковом режиме. Инициализируйте библиотеку MPI с помощью 'MPI_Init_thread (NULL, NULL, MPI_THREAD_MULTIPLE и предоставленных);' и убедитесь, что значение, возвращаемое в 'provided', равно' MPI_THREAD_MULTIPLE'. Если нет, библиотека MPI не поддерживает одновременные вызовы MPI из нескольких потоков. –

ответ

2

Самая большая проблема, которую я вижу, - это многие условия гонки, которые демонстрирует ваш код. Несомненное поведение, которое вы видите, вызывает сомнения. Помните, что всякий раз, когда вы получаете доступ к общей переменной в OpenMP (объявленной с помощью ключевого слова shared или глобальной областью), вы получаете доступ к памяти, которая может быть прочитана или записана любым другим потоком в банде без каких-либо гарантий порядка. Например,

N = d.size(); 

условие гонки, потому что std::vector не потокобезопасна. Поскольку вы используете OpenMP внутри класса, то любые переменные-члены также считаются «глобальными» и, по умолчанию, не являются потокобезопасными.

Как отметил @ tim18, поскольку вы вызываете подпрограммы MPI из параллельных областей OpenMP, вы должны инициализировать среду выполнения MPI, чтобы быть потокобезопасной, используя функцию MPI_Init_thread.


Как и в стороне, ваш C++ нуждается в некоторой работе. Вы никогда не должны использовать new или delete в коде пользовательского уровня. Используйте RAII для управления временем жизни объекта и переноса больших структур данных в тонкие объекты, которые управляют временем жизни для вас. Например, эта линия

delete[] d[cur_idx]->data; 

говорит мне, что есть демоны, скрывающиеся в вашем коде, ожидая, чтобы быть развязана на ничего не подозревающий пользователь (который может быть вы!). Кстати, это тоже условие гонки. Много демонов!

+0

Спасибо @Tim. Итак, теперь у меня есть все общие переменные внутри критических разделов, в частности, инструкции: 'N = d.size()', а также все внутри цикла while (cur_k && cur_idx darel

+0

Не могли бы вы опубликовать полный код для урезанной версии? Если это долго, вы можете использовать пастебин или сущность. – Tim

+0

Я уже разместил последнюю версию в качестве редактирования исходного вопроса под исходным кодом. Я проследил проблему с помощью функции 'slave2()' в * receive data * ** while ** loop. Виновником является 'delete [] Data;', который в равной степени удаляет 'd-> Data', я незаконно пытаюсь разыменовать освобожденную память в _process data_ ** while ** loop оператором' ldp.process (d.data, d.nRows, d.nCols, out_opt, out_im, out_hist); '. Я все еще смущен тем, как код, казалось, заканчивался в некоторых случаях - я надеюсь, что где-то там нет тикающей бомбы замедленного действия. Большое спасибо – darel

Смежные вопросы