2015-01-13 2 views
5

OpenMPI: Я хочу прочитать файл на корневом узле и отправить содержимое этого файла всем остальным узлам. я обнаружил, что MPI_Bcast делает это:Отправить динамический массив с динамическим размером с помощью MPI_Bcast

int MPI_Bcast(void *buffer, int count, MPI_Datatype datatype, 
    int root, MPI_Comm comm) 

Все примеры, которые я нашел имеют значение count уже известно, но в моем случае, значение счетчика в основном известна на корню. Другие examples говорят, что один и тот же вызов MPI_Bcast извлекает данные на других узлах.

Я добавил это:

typedef short Descriptor[128]; 
MPI_Datatype descriptorType; 
MPI_Type_contiguous(sizeof(Descriptor), MPI_SHORT, &descriptorType); 
MPI_Type_commit(&descriptorType); 



if(world_rank == 0) { 
    struct stat finfo; 

    if(stat(argv[1], &finfo) == 0) { 
     querySize = finfo.st_size/sizeof(Descriptor); 
    } 

{ 
    //read binary query 
    queryDescriptors = new Descriptor[querySize]; 
    fstream qFile(argv[1], ios::in | ios::binary); 
    qFile.read((char*)queryDescriptors, querySize*sizeof(Descriptor)); 
    qFile.close(); 

    } 
} 

    MPI_Bcast((void*)&querySize, 1, MPI_INT, 0, MPI_COMM_WORLD); 
    if (world_rank != 0) 
    { 
     queryDescriptors = new Descriptor[querySize]; 
    } 
    MPI_Bcast((void*)queryDescriptors, querySize, descriptorType, 0, MPI_COMM_WORLD); 

Когда я называю это так: mpirun -np 2 ./mpi_hello_world он работает нормально, но когда я называю его с более чем , я получаю это:

mpi_hello_world: malloc.c:3096: sYSMALLOc: Assertion `(old_top == (((mbinptr) (((char *) &((av)->bins[((1) - 1) * 2])) - __builtin_offsetof (struct malloc_chunk, fd)))) && old_size == 0) || ((unsigned long) (old_size) >= (unsigned long)((((__builtin_offsetof (struct malloc_chunk, fd_nextsize))+((2 * (sizeof(size_t))) - 1)) & ~((2 * (sizeof(size_t))) - 1))) && ((old_top)->size & 0x1) && ((unsigned long)old_end & pagemask) == 0)' failed. 
mpi_hello_world: malloc.c:3096: sYSMALLOc: Assertion `(old_top == (((mbinptr) (((char *) &((av)->bins[((1) - 1) * 2])) - __builtin_offsetof (struct malloc_chunk, fd)))) && old_size == 0) || ((unsigned long) (old_size) >= (unsigned long)((((__builtin_offsetof (struct malloc_chunk, fd_nextsize))+((2 * (sizeof(size_t))) - 1)) & ~((2 * (sizeof(size_t))) - 1))) && ((old_top)->size & 0x1) && ((unsigned long)old_end & pagemask) == 0)' failed. 
+2

Так выпустить два передач, первые с графом, вторыми с содержимым буфера. –

+0

Вы правы, это решение. Мне было интересно, есть ли в MPI механизм для таких ситуаций. – AlexandruC

+0

Не то, что я знаю, но мой MPI становится немного ржавым. –

ответ

2

Если qFile.read(...) не содержится в тесте if(rank==0){}, все процессы прочитают файл. И queryDescriptors = new Descriptor[querySize]; следует вызывать после первого MPI_Bcast() для всех процессов, кроме 0: до, querySize не имеет смысла в этих процессах.

Процесс 0 должен:

  • читать количество элементов
  • выделяют
  • читать массив
  • широковещательного количество элементов
  • транслируемых массив

Другое процессы должны:

  • получить количество элементов
  • выделить
  • получить массив

Вот пример того, как читать массив поплавка и использовать динамическое распределение:

#include <stdio.h> 
#include <iostream> 
#include <fstream> 

#include <mpi.h> 
using namespace std; 

int main (int argc, char *argv[]) 
{ 
    int rank; 
    int size; 

    MPI_Init(&argc, &argv); 

    MPI_Comm_size(MPI_COMM_WORLD, &size); 
    MPI_Comm_rank(MPI_COMM_WORLD, &rank); 

    if(rank == 0) 
    { 
     //creating the file 
     ofstream myfile; 
     myfile.open ("example.txt", ios::out |ios::binary); 
     int nbitem=42; 
     myfile.write((char*)&nbitem,sizeof(int)); 

     float a=0; 
     for(int i=0;i<nbitem;i++){ 
      myfile.write((char*)&a,sizeof(float)); 
      a+=2; 
     } 
     myfile.close();  
    } 


    //now reading the file 
    int nbitemread=0; 
    float* buffer; 
    if(rank==0){ 
     ifstream file ("example.txt", ios::in |ios::binary); 
     file.read ((char*)&nbitemread, sizeof(int)); 
     buffer=new float[nbitemread]; 
     file.read ((char*)buffer,nbitemread* sizeof(float)); 
     file.close(); 
     //communication 
     MPI_Bcast(&nbitemread, 1, MPI_INT, 0, MPI_COMM_WORLD); 
     MPI_Bcast(buffer, nbitemread, MPI_FLOAT, 0, MPI_COMM_WORLD); 
    }else{ 

     MPI_Bcast(&nbitemread, 1, MPI_INT, 0, MPI_COMM_WORLD); 
     //nbitemread is meaningfull now 
     buffer=new float[nbitemread]; 
     MPI_Bcast(buffer, nbitemread, MPI_FLOAT, 0, MPI_COMM_WORLD); 

    } 

    //printing... 
    cout<<"on rank "<<rank<<" rode "<<buffer[nbitemread/2]<<" on position "<<nbitemread/2<<endl; 

    delete[] buffer; 
    MPI_Finalize(); 

    return 0; 
} 

Скомпилируйте его с помощью mpiCC main.cpp -o main и введите mpirun -np 2 main

Еще одна проблема в вашем коде: MPI_Type_contiguous(sizeof(Descriptor), MPI_SHORT, &descriptorType);. Он должен быть MPI_Type_contiguous(sizeof(Descriptor), MPI_CHAR, &descriptorType); Вот кусок кода, основанный на ваших, что следует сделать трюк:

#include <stdio.h> 
#include <iostream> 
#include <fstream> 

#include <sys/types.h> 
#include <sys/stat.h> 
#include <unistd.h> 

#include <mpi.h> 
using namespace std; 

int main (int argc, char *argv[]) 
{ 
    int world_rank; 
    int size; 

    MPI_Init(&argc, &argv); 

    MPI_Comm_size(MPI_COMM_WORLD, &size); 
    MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); 

    int querySize; 


    typedef short Descriptor[128]; 
    MPI_Datatype descriptorType; 
    MPI_Type_contiguous(sizeof(Descriptor), MPI_CHAR, &descriptorType); 
    MPI_Type_commit(&descriptorType); 


    Descriptor* queryDescriptors; 


    if(world_rank == 0) { 
     struct stat finfo; 

     if(stat(argv[1], &finfo) == 0) { 
      cout<<"st_size "<<finfo.st_size<<" descriptor "<<sizeof(Descriptor)<< endl; 
      querySize = finfo.st_size/sizeof(Descriptor); 
      cout<<"querySize "<<querySize<<endl; 
     }else{ 
      cout<<"stat error"<<endl; 
     } 

     { 
      //read binary query 
      queryDescriptors = new Descriptor[querySize]; 
      fstream qFile(argv[1], ios::in | ios::binary); 
      qFile.read((char*)queryDescriptors, querySize*sizeof(Descriptor)); 
      qFile.close(); 

     } 
    } 

    MPI_Bcast((void*)&querySize, 1, MPI_INT, 0, MPI_COMM_WORLD); 
    if (world_rank != 0) 
    { 
     queryDescriptors = new Descriptor[querySize]; 
    } 
    MPI_Bcast((void*)queryDescriptors, querySize, descriptorType, 0, MPI_COMM_WORLD); 

    cout<<"on rank "<<world_rank<<" rode "<<queryDescriptors[querySize/2][12]<<" on position "<<querySize/2<<endl; 

    delete[] queryDescriptors; 

    MPI_Finalize(); 

    return 0; 
} 
+0

Простите, что не упомянули об этом, но я в основном делаю то же самое. Я использую mpiC++ и mpirun -np 3 main – AlexandruC

+0

Я добавил еще один код в свой вопрос. Виве-ла-Франс! – AlexandruC

+0

Интересно, если это проблема, потому что я запускаю это на одном узле? – AlexandruC

Смежные вопросы