2016-07-16 3 views
0

Мой код зависает всякий раз, когда я пытаюсь запустить его со многими потоками. Вот это,Почему этот код openmp + mpi висит

double CMpifun::sendData2() 
{ 
    double *tStatistics=new double[8], tmp_time; // wall clock time 
    double SY, Sto, header[SZ_HEADER]; 
    int a_tasks=0, file_p=0; 
    vector<myDataType *> d = getData(); 

    int idx=0; 
    opt_k.k=1; opt_k.proc_files=0; opt_k.p=this->node_sz; SY=0; Sto=0; 
    std::fill(header,header+SZ_HEADER,-1); 

    omp_set_num_threads(4);// for now 
    // parallel region 
    #pragma omp parallel default(none) shared(idx,SY,Sto,d,a_tasks,stdout) firstprivate(header) //firstprivate(dat_dim,dat) 
    { 
     int tid = omp_get_thread_num(), cur_idx, cur_k; int N=d.size(); 
     while (idx<N) { // Assign tasks and fetch results where available 
      printf("-------------------------\n%d - 1\n", tid); fflush(stdout); 
      #pragma omp critical(update__a_task) 
      { 
       printf("%d - critique 1\n", tid); fflush(stdout); 
       if (idx<N) { 
        printf("%d - critique 2\n", tid); fflush(stdout); 
        if (a_tasks<node_sz-1){ // available nodes to assign 
         printf("%d - 2.1\n", tid); fflush(stdout); 
         MPI_Recv(header,SZ_HEADER,MPI_DOUBLE,MPI_ANY_SOURCE,TAG_HEADER,MY_COMM_GRP,this->Stat); 
         cur_idx=idx; cur_k=opt_k.k; idx+=cur_k; 
         a_tasks+=cur_k; 
        } else {// all nodes assigned. only fetch result 
         printf("%d - 2.2\n", tid); fflush(stdout); 
         MPI_Recv(header,SZ_HEADER,MPI_DOUBLE,MPI_ANY_SOURCE,TAG_RESULT,MY_COMM_GRP,this->Stat); 
        } 
      }else ;//printf("%d - done task assignment\n", tid); fflush(stdout); 

     } 

      printf("%d - 3\n", tid); fflush(stdout); 
      if (cur_idx<N) { 
      printf("%d: cur_idx:%d, opt_k.k:%d, idx:%d, N:%d \n", tid, cur_idx,opt_k.k,idx,N); fflush(stdout); 
      if(this->Stat->MPI_TAG == TAG_HEADER){ // serve tasks 
       printf("%d - task %d being assigned to %d\n", tid,cur_idx,(int)header[4]); fflush(stdout); 
       while (cur_k && cur_idx<N) { 
        printf("%d - T1\n", tid); fflush(stdout); 
         header[1]=d[cur_idx]->nRows; header[2]=d[cur_idx]->nCols; header[3]=cur_idx; 
         header[9]=--cur_k; 
         MPI_Send(header,SZ_HEADER,MPI_DOUBLE,(int)header[4],TAG_HEADER,MY_COMM_GRP); 
         printf("%d - T2 %d\n", tid,(int)header[4]); fflush(stdout); 
         MPI_Send(d[cur_idx]->data,d[cur_idx]->nRows*d[cur_idx]->nCols,MPI_DOUBLE,(int)header[4],TAG_DATA,MY_COMM_GRP); 
         printf("%d - T3 %d\n", tid,(int)header[4]); fflush(stdout); 
         delete[] d[cur_idx]->data; ++cur_idx; 
       } 
      } else if(this->Stat->MPI_TAG == TAG_RESULT){ // collect results 
       printf("%d - result from %d\n", tid,(int)header[4]); fflush(stdout); 
       while(true){ 
        printf("%d - R1\n", tid); fflush(stdout); 
        #pragma omp atomic 
         --a_tasks; 
        double *results = new double[(int)(header[1]*header[2])]; 
       MPI_Recv(results,(int)(header[1]*header[2]),MPI_DOUBLE,(int)header[4],TAG_DATA,MY_COMM_GRP,this->Stat); 
        printf("%d - R2 received result from %d\n", tid,(int)header[4]); fflush(stdout); 
       delete[] results; 
        if ((int)header[9]>0) { 
         MPI_Recv(header,SZ_HEADER,MPI_DOUBLE,(int)header[4],TAG_RESULT,MY_COMM_GRP,this->Stat); 
        } else break; 
       } //end while 
      } // end collect results 
     } //end if(loopmain) 
     printf("%d - NExt idx: %d\n", tid,idx); fflush(stdout); 
    } // end while(loopmain) 
    } // end parallel section] 

    printf("<<<<<<<<<<<<< MASTER - COLLECTING RESULTS >>>>>>>>>>>> "); fflush(stdout); 
    printf("MASTER - pending tasks:%d\n",a_tasks); fflush(stdout); 
    while (a_tasks>0) { 
     printf("MASTER - wait for slave result request... pending tasks:%d\n",a_tasks); fflush(stdout); 
     MPI_Recv(header,SZ_HEADER,MPI_DOUBLE,MPI_ANY_SOURCE,TAG_RESULT,MY_COMM_GRP,this->Stat); 
     while (true) { 
      double *results = new double[(int)(header[1]*header[2])]; 
      printf("MASTER - wait for result from %d... pending tasks\n",(int)header[4]); fflush(stdout); 
      MPI_Recv(results,(int)(header[1]*header[2]),MPI_DOUBLE,(int)header[4],TAG_DATA,MY_COMM_GRP,this->Stat); 
      delete[] results; 
      --a_tasks; 
      if ((int)header[9]>0) { 
       printf("MASTER - result from slave .. some more\n"); fflush(stdout); 
       MPI_Recv(header,SZ_HEADER,MPI_DOUBLE,(int)header[4],TAG_RESULT,MY_COMM_GRP,this->Stat); 
      } else break; 
     } 
    } 

    message("<<<<<<<<<<<<<<<<<< MASTER - terminate slaves >>>>>>>>>>>>>>>>>"); 
    for(int i=1;i<node_sz;++i){ // terminate 
     MPI_Recv(header,SZ_HEADER,MPI_DOUBLE,MPI_ANY_SOURCE,TAG_HEADER,MY_COMM_GRP,this->Stat); 
     printf("MASTER - terminate to signal %d\n",(int)header[4]); fflush(stdout); 
     MPI_Send(header,SZ_HEADER,MPI_DOUBLE,(int)header[4],TAG_TERMINATE,MY_COMM_GRP); 
     printf("MASTER - done terminated %d\n",(int)header[4]); fflush(stdout); 
    } 
    printf("MASTER - bye\n"); fflush(stdout); 
    return 0; 

Функция ведомого устройства заключается в следующем,

void CMpifun::slave2() 
{ 
    double *Data; vector<myDataType> dataQ; vector<hist_type> resQ; 
    char out_opt='b'; // irrelevant 
    myDataType *out_im = new myDataType; hist_type *out_hist; CLdp ldp; 
    int file_cnt=0; double tmp_t; //local variables 
    double time_arr[3]={}; //1: task wait latency, 2: task set total send time, 3: taskset total process time 

    while (true) { // main while loop 
     printf("Slave: %d - ........... ready for task......\n",myRank); fflush(stdout); 
     header[4]=myRank; MPI_Send(header,SZ_HEADER,MPI_DOUBLE,MASTER,TAG_HEADER,MY_COMM_GRP); 
     time_arr[0] = MPI_Wtime(); 
     printf("Slave: %d - got master. waiting for task\n",myRank); fflush(stdout); 
     MPI_Recv(header,SZ_HEADER,MPI_DOUBLE,MASTER,MPI_ANY_TAG,MY_COMM_GRP,this->Stat); 
     time_arr[0] = MPI_Wtime() - time_arr[0]; // wait for task latency 
     if(this->Stat->MPI_TAG == TAG_TERMINATE) { 
      printf("Slave: %d - terminate signal received\n",myRank); fflush(stdout); 
      break; 
     } 
     printf("Slave: %d - got header. waiting for data\n",myRank); fflush(stdout); 
     //receive data 
     tmp_t = MPI_Wtime(); 
     while(true) { 
      Data=new double[(int)(header[1]*header[2])]; 
      MPI_Recv(Data,(int)(header[1]*header[2]),MPI_DOUBLE,MASTER,TAG_DATA,MY_COMM_GRP,this->Stat); 
      myDataType d; d.data=Data; d.nRows=(int)header[1]; d.nCols=(int)header[2]; 
      dataQ.push_back(d); 
      file_cnt++; 
      if ((int)header[9]) { 
       MPI_Recv(header,SZ_HEADER,MPI_DOUBLE,MASTER,TAG_HEADER,MY_COMM_GRP,this->Stat); 
      } else break; 
     } 
     time_arr[1] = (MPI_Wtime()-tmp_t); // Total bandwidth time for entire taskset 

     file_cnt = dataQ.size(); 
     tmp_t = MPI_Wtime(); 
     printf("Slave: %d - got data. processing\n",myRank); fflush(stdout); 
     while (dataQ.size()) { // process data 
      out_hist = new hist_type(); 
      myDataType d = dataQ.back(); dataQ.pop_back(); // critical section 
      ldp.process(d.data, d.nRows,d.nCols,out_opt,out_im, out_hist); 
      resQ.push_back(*out_hist); out_hist=0; 
      delete[] d.data; delete[] out_im->data; 
     } 
     time_arr[2] = (MPI_Wtime()-tmp_t); // Total processing time for entire taskset 

     // tuma results 
     //time_arr[1] /= file_cnt; time_arr[2] /= file_cnt; 
     printf("Slave: %d - sending results\n",myRank); fflush(stdout); 
     header[4]=myRank; header[6]=time_arr[0]; header[7]=time_arr[1]; header[8]=time_arr[2]; 
     for (size_t i = 0; i < resQ.size(); i++) { 
      header[1]=resQ[i].h_nHists; header[2]=resQ[i].h_binSz; header[9]=resQ.size()-i-1; 
      MPI_Send(header,SZ_HEADER,MPI_DOUBLE,MASTER,TAG_RESULT,MY_COMM_GRP); 
      MPI_Send(resQ[i].hist_data,resQ[i].h_nHists*resQ[i].h_binSz,MPI_DOUBLE,MASTER,TAG_DATA,MY_COMM_GRP); 
     } 
     resQ.clear(); 

    } // end main while loop 
    message("terminating"); 
} 

Он висит после случайного числа итераций цикла if (idx<N). Я занимаюсь этим в течение двух целых дней. Может кто-нибудь, пожалуйста, внимательно изучите код и дайте мне знать о том, что вызывает проблему? Все справка оценивается заранее

+2

У вас слишком длинный код, чтобы вручную просмотреть его (по крайней мере, для меня). Он также является неполным, поэтому мы не можем воспроизвести проблему. Пожалуйста, переустановите свою проблему до [mcve]. В этом процессе вы действительно сможете понять это сами. Также вы должны приложить больше усилий для описания того, как он висит. Используйте (параллельный) анализатор отладки/правильности, чтобы выяснить, где находится код. – Zulan

ответ

0

Я решил создать уникальные теги заголовков для каждой пары коммуникаций MPI - это действительно решило проблему. Есть ли способ удалить этот вопрос?

+0

Нажмите, чтобы удалить ... –

Смежные вопросы