2015-03-05 2 views
0

Я пытаюсь отправить шаблон master-slave реализации, в котором мастер имеет массив (действует как очередь заданий) и отправляет данные на подчиненные процессоры. На основе данных, полученных от мастера, ведомые вычисления вычисляют результаты и возвращают ответы на мастер. Мастер получает результаты, узнает ранг ранга, из которого был получен msg, а затем отправляет следующее задание этому ведомому.Программа застряла где-то

Это скелет кода, который я реализовал:

 if (my_rank != 0) 
     { 
      MPI_Recv(&seed, 1, MPI_FLOAT, 0, tag, MPI_COMM_WORLD, &status); 

        //.. some processing 

      MPI_Send(&message, 100, MPI_FLOAT, 0, my_rank, MPI_COMM_WORLD); 
     } 
     else 
     { 
      for (i = 1; i < p; i++) { 
       MPI_Send(&A[i], 1, MPI_FLOAT, i, tag, MPI_COMM_WORLD); 
      } 

      for (i = p; i <= S; i++) { 
       MPI_Recv(&buf, 100, MPI_FLOAT, MPI_ANY_SOURCE, MPI_ANY_TAG, 
         MPI_COMM_WORLD, &status); 
       //.. processing to find out free slave rank from which above msg was received (y) 
       MPI_Send(&A[i], 1, MPI_FLOAT, y, tag, MPI_COMM_WORLD); 
      } 

      for (i = 1; i < p; i++) { 
       MPI_Recv(&buf, 100, MPI_FLOAT, MPI_ANY_SOURCE, MPI_ANY_TAG,MPI_COMM_WORLD, &status); 

       // .. more processing 
      } 

     } 

Если я использую 4 процессора; 1 является ведущим, а 3 - подчиненными; программа отправляет и получает сообщения для первых 3 заданий в очереди заданий, но после этого программа зависает. В чем может быть проблема?

+0

Звучит как один из процессов умирает, прежде чем он отправит ответ. Выясните, какой процесс не отправляет ответ на основной процесс. Некоторый код отладки был бы полезен здесь. – tdbeckett

+0

Это очень неполно. –

+0

^Это единственный код MPI, где я выполняю отправку и получение. другие вещи кажутся мне нормальными. – Kany

ответ

0

Если это всего-навсего ваш код на основе MPI, то похоже, что вам не хватает цикла while на внешней стороне кода клиента. Я делал это раньше, и я обычно разбить его как надсмотрщик и пеоны

в надсмотрщике:

for (int i = 0; i < commSize; ++i){ 
    if (i == commRank){ // commRank doesn't have to be 0 
     continue; 
    } 

    if (taskNum < taskCount){ 
     // tasks is vector<Task>, where I have crated a Task 
     // class and send it as a stream of bytes 
     toSend = tasks.at(taskNum); 
     jobList.at(i) = taskNum; // so we no which rank has which task 
     taskNum += 1; 
     activePeons += 1; 
    } else{ 
     // stopTask is a flag value to stop receiving peon 
     toSend = stopTask; 
     allTasksDistributed = true; 
    } 

    // send the task, with the size of the task as the tag 
    taskSize = sizeof(toSend); 
    MPI_Send(&toSend, taskSize, MPI_CHAR, i, taskSize, MPI_COMM_WORLD); 
} 

MPI_Status status; 

while (activePeons > 0){ 
    // get the results from a peon (but figure out who it is coming from and what the size is) 
    MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status); 
    MPI_Recv( &toSend,     // receive the incoming task (with result data) 
       status.MPI_TAG,    // Tag holds number of bytes 
       MPI_CHAR,     // type, may need to be more robust later 
       status.MPI_SOURCE,   // source of send 
       MPI_ANY_TAG,    // tag 
       MPI_COMM_WORLD,    // COMM 
       &status);     // status 

    // put the result from that task into the results vector 
    results[jobList[status.MPI_SOURCE]] = toSend.getResult(); 

    // if there are more tasks to send, distribute the next one 
    if (taskNum < taskCount){ 
     toSend = tasks.at(taskNum); 
     jobList[status.MPI_SOURCE] = taskNum; 
     taskNum += 1; 
    } else{ // otherwise send the stop task and decrement activePeons 
     toSend = stopTask; 
     activePeons -= 1; 
    } 

    // send the task, with the size of the task as the tag 
    taskSize = sizeof(toSend); 
    MPI_Send(&toSend, taskSize, MPI_CHAR, status.MPI_SOURCE, taskSize, MPI_COMM_WORLD); 
} 

в функции пеонов:

while (running){ 
    MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status); // tag holds data size 

    incoming = (Task *) malloc(status.MPI_TAG); 

    MPI_Recv( incoming,   // memory location of input 
       status.MPI_TAG,  // tag holds data size 
       MPI_CHAR,   // type of data 
       status.MPI_SOURCE, // source is from distributor 
       MPI_ANY_TAG,  // tag 
       MPI_COMM_WORLD,  // comm 
       &status);   // status 

    task = Task(*incoming); 

    if (task.getFlags() == STOP_FLAG){ 
     running = false; 
     continue; 
    } 

    task.run(); // my task class has a "run" method 
    MPI_Send( &task,     // string to send back 
       status.MPI_TAG,   // size in = size out 
       MPI_CHAR,    // data type 
       status.MPI_SOURCE,  // destination 
       status.MPI_TAG,   // tag doesn't matter 
       MPI_COMM_WORLD);  // comm 

    free(incoming); 
} 

Есть некоторые bool и int значения, которые должны быть назначены (и, как я уже сказал, у меня есть класс Task), но это дает основную структуру для того, что я думаю, что вы хотите сделать.

+0

Я, вероятно, занимаюсь здесь, но почему вы отправляете размер данных как тег, когда есть 'MPI_GET_COUNT', который может быть использован для получения количества элементов в (peeked) сообщении с объектом' MPI_Status'? –

+0

Честно говоря, я раньше не слышал о функции. Вероятно, я бы скорее использовал 'status.count' вместо того, чтобы делать вызов функции со статусом, который у меня уже есть, но это то, что хорошо сработало для меня. –

+0

Если вы это сделаете, вы должны иметь в виду, что для стандарта MPI требуется, чтобы максимальное значение тега составляло не менее 32767. Реализации могут предоставлять гораздо большее пространство тегов и большинство из них, но если вы неявно полагаетесь на предположение что это всегда верно и установить тег на значения больше 32767 (например, ваши сообщения длинны), ваши программы MPI не будут переносимыми. –

Смежные вопросы