2010-12-14 3 views
1

Я использую библиотеку Open MPI для реализации следующего алгоритма: у нас есть два процесса: p1 и p2. Они выполняют несколько итераций, и в конце каждой итерации они сообщают о своих результатах. Проблема заключается в том, что выполнение не обязательно сбалансировано, поэтому p1 может выполнить 10 итераций за p2. 1. Хотя я хочу, чтобы p2 читал последний результат с последней итерации, выполненной p1.MPI: Отмена неблокирующего отправления

Таким образом, моя идея состоит в том, что p1 отправляет результаты на каждой итерации. Но перед отправкой результата с итерации i он должен проверить, действительно ли p2 читает информацию с итерации i-1. Если нет, он должен отменить предыдущий отправьте, так что, когда p2 читает с p1, он будет читать последний результат.

К сожалению, я не уверен, как это сделать. Я попытался с помощью MPI_CANCEL, как показано в следующем коде:

int main (int argc, char *argv[]){ 

    int myrank, numprocs; 
    MPI_Status status; 
    MPI_Request request; 

    MPI_Init(&argc, &argv); 
    MPI_Comm_size(MPI_COMM_WORLD, &numprocs); 
    MPI_Comm_rank(MPI_COMM_WORLD, &myrank); 

    if(myrank == 0){ 
     int send_buf = 1, flag; 
     MPI_Isend(&send_buf, 1, MPI_INT, 1, 123, MPI_COMM_WORLD, 
        &request); 
     MPI_Cancel(&request); 
     MPI_Wait(&request, &status); 
     MPI_Test_cancelled(&status, &flag); 
     if (flag) printf("Send cancelled\n"); 
     else printf("Send NOT cancelled\n"); 
     send_buf = 2; 
     MPI_Isend(&send_buf, 1, MPI_INT, 1, 123, MPI_COMM_WORLD, 
        &request); 
    } 
    else { 
     sleep(5); 
     int msg; 
     MPI_Recv(&msg, 1, MPI_INT, 0, 123, 
       MPI_COMM_WORLD, &status); 
     printf("%d\n", msg); 
    } 
    MPI_Finalize(); 

    return 0; 
} 

Но когда я исполняю, это говорит о том, что отправить не может быть отменено, и p2 печатает 1 вместо 2.

Я хотел бы знать, есть ли способ достичь того, что я предлагаю, или если есть альтернатива кодированию поведения между p1 и p2.

+0

Отмена отправки - это зло, и вы не должны его использовать. И нет никакой гарантии, что вы действительно можете отменить отправку. Если вы считаете, что можете отказаться от письменного извещения удаленно с отменой, вы будете разочарованы. – Jeff

ответ

5

Я бы отказался от контроля связи. Вместо p1, отправляя ненужные сообщения, которые он должен отменить, p2 должен сообщить, что он готов принять сообщение, а p1 отправит только тогда. Тем временем p1 просто перезаписывает свой буфер отправки с последними результатами.

В (непроверенные) Код:

if (rank == 0) 
{ 
    int ready; 
    MPI_Request p2_request; 
    MPI_Status p2_status; 
    // initial request 
    MPI_Irecv(&ready, 1, MPI_INT, 1, 123, MPI_COMM_WORLD, &p2_request); 
    for (int i=0; true; i++) 
    { 
     sleep(1); 
     MPI_Test(&p2_request, &ready, &p2_status); 
     if (ready) 
     { 
      // blocking send: p2 is ready to receive 
      MPI_Send(&i, 1, MPI_INT, 1, 123, MPI_COMM_WORLD); 
      // post new request 
      MPI_Irecv(&ready, 1, MPI_INT, 1, 123, MPI_COMM_WORLD, &p2_request); 
     } 
    } 
} 
else 
{ 
    int msg; 
    MPI_Status status; 
    while (true) 
    { 
     sleep(5); 
     // actual message content doesn't matter, just let p1 know we're ready 
     MPI_Send(&msg, 1, MPI_INT, 0, 123, MPI_COMM_WORLD); 
     // receive message 
     MPI_Recv(&msg, 1, MPI_INT, 0, 123, MPI_COMM_WORLD, &status); 
    } 
} 

Теперь, как я сказал, что это непроверенный код, но вы, вероятно, можете увидеть, что я клоню там. MPI_Cancel следует использовать только в том случае, если все идет ужасно неправильно: ни одно сообщение не должно отменяться во время нормального выполнения.

5

Другим подходом было бы использование односторонней связи MPI (например, http://www.linux-mag.com/id/1793). Обратите внимание, однако, что выполнение пассивных сообщений, которые вы действительно хотите здесь, довольно сложно (хотя попарно, с mpi_win_post и mpi_win_start проще), и что односторонний материал, мы надеемся, все изменится в MPI-3, я знаю, как далеко по той дороге я бы посоветовал тебе идти.

Более непосредственно связано с тем, что вы в первую очередь пытаетесь здесь, вместо того, чтобы отменять сообщения (что, как было предложено выше, довольно резкое), вероятно, гораздо проще просто пройти через все сообщения в очереди (MPI гарантирует, что сообщения не будут обгонять друг друга - единственный нюанс, если вы используете MPI_THREAD_MULTIPLE и имеющие несколько потоков отправки в пределах одной задачи MPI, в этом случае порядок porly определена):

#include <stdio.h> 
#include <mpi.h> 
#include <stdlib.h> 
#include <unistd.h> 
#include <math.h> 

void compute() { 
    const int maxusecs=500; 
    unsigned long sleepytime=(unsigned long)round(((float)rand()/RAND_MAX)*maxusecs); 

    usleep(sleepytime); 
} 

int main(int argc, char** argv) 
{ 
    int rank, size, i; 
    int otherrank; 
    const int niters=10; 
    const int tag=5; 
    double newval; 
    double sentvals[niters+1]; 
    double othernewval; 
    MPI_Request reqs[niters+1]; 
    MPI_Status stat; 
    int ready; 

    MPI_Init(&argc, &argv); 
    MPI_Comm_rank(MPI_COMM_WORLD, &rank); 
    MPI_Comm_size(MPI_COMM_WORLD, &size); 
    if (size != 2) { 
    fprintf(stderr,"This assumes 2 processes\n"); 
    MPI_Finalize(); 
    exit(-1); 
    } 

    otherrank = (rank == 0 ? 1 : 0); 
    srand(rank); 

    compute(); 
    newval = rank * 100. + 0; 
    sentvals[0] = newval; 
    MPI_Isend(&(sentvals[0]), 1, MPI_DOUBLE, otherrank, tag, MPI_COMM_WORLD, &(reqs[0])); 
    MPI_Recv (&othernewval, 1, MPI_DOUBLE, otherrank, tag, MPI_COMM_WORLD, &stat); 
    for (i=0; i<niters; i++) { 

     MPI_Iprobe(otherrank, tag, MPI_COMM_WORLD, &ready, &stat); 
     while (ready) { 
      MPI_Recv(&othernewval, 1, MPI_DOUBLE, otherrank, tag, MPI_COMM_WORLD, &stat); 
      printf("%s[%d]: Reading queued data %lf:\n", 
        (rank == 0 ? "" : "\t\t\t\t"), rank, othernewval); 
      MPI_Iprobe(otherrank, tag, MPI_COMM_WORLD, &ready, &stat); 
     } 

     printf("%s[%d]: Got data %lf, computing:\n", 
       (rank == 0 ? "" : "\t\t\t\t"), rank, othernewval); 
     compute(); 

     /* update my data */ 
     newval = rank * 100. + i + 1; 
     printf("%s[%d]: computed %lf, sending:\n", 
       (rank == 0 ? "" : "\t\t\t\t"), rank, newval); 
     sentvals[i+1] = newval; 
     MPI_Isend(&(sentvals[i+1]), 1, MPI_DOUBLE, otherrank, tag, MPI_COMM_WORLD, &(reqs[0])); 
    } 


    MPI_Finalize(); 

    return 0; 
} 

Бег это дает вам (заметьте, что только потому, что данные передаются не означает его получение во время печати):

[0]: Got data 100.000000, computing: 
           [1]: Got data 0.000000, computing: 
[0]: computed 1.000000, sending: 
[0]: Got data 100.000000, computing: 
           [1]: computed 101.000000, sending: 
           [1]: Got data 0.000000, computing: 
[0]: computed 2.000000, sending: 
[0]: Got data 100.000000, computing: 
           [1]: computed 102.000000, sending: 
           [1]: Reading queued data 1.000000: 
           [1]: Got data 1.000000, computing: 
[0]: computed 3.000000, sending: 
[0]: Reading queued data 101.000000: 
[0]: Got data 101.000000, computing: 
           [1]: computed 103.000000, sending: 
           [1]: Reading queued data 2.000000: 
           [1]: Got data 2.000000, computing: 
[0]: computed 4.000000, sending: 
           [1]: computed 104.000000, sending: 
[0]: Reading queued data 102.000000: 
           [1]: Reading queued data 3.000000: 
           [1]: Got data 3.000000, computing: 
[0]: Got data 102.000000, computing: 
[0]: computed 5.000000, sending: 
[0]: Reading queued data 103.000000: 
[0]: Got data 103.000000, computing: 
           [1]: computed 105.000000, sending: 
           [1]: Reading queued data 4.000000: 
           [1]: Got data 4.000000, computing: 
[0]: computed 6.000000, sending: 
[0]: Reading queued data 104.000000: 
[0]: Got data 104.000000, computing: 
           [1]: computed 106.000000, sending: 
           [1]: Reading queued data 5.000000: 
           [1]: Got data 5.000000, computing: 
[0]: computed 7.000000, sending: 
[0]: Reading queued data 105.000000: 
[0]: Got data 105.000000, computing: 
           [1]: computed 107.000000, sending: 
           [1]: Reading queued data 6.000000: 
           [1]: Got data 6.000000, computing: 
[0]: computed 8.000000, sending: 
[0]: Reading queued data 106.000000: 
[0]: Got data 106.000000, computing: 
           [1]: computed 108.000000, sending: 
           [1]: Reading queued data 7.000000: 
           [1]: Got data 7.000000, computing: 
[0]: computed 9.000000, sending: 
[0]: Reading queued data 107.000000: 
[0]: Got data 107.000000, computing: 
           [1]: computed 109.000000, sending: 
           [1]: Reading queued data 8.000000: 
           [1]: Got data 8.000000, computing: 
[0]: computed 10.000000, sending: 
           [1]: computed 110.000000, sending: 

Обратите внимание, что это всего лишь демонстрационный код, окончательная версия действительно необходимо сделать waitalls и больше iprobes в конце, чтобы освободить любые ожидающие запросы и сбросить любые ожидающие сообщения.

0

Поддерживает ли ваша среда и распределение MPI многопоточность? Если это так, вы можете создать поток в P1, который вычисляет значение и сохраняет результат каждой итерации в переменной, разделяемой с основным потоком P1 (запись защищена с помощью семафора) Как было предложено суслитерпаттом выше, затем отправьте P2 «Я готов msgstr "сообщение на P1, и P1 отвечает значением от последней итерации.

Смежные вопросы