2012-06-12 5 views
4

Я бы хотел использовать boost::asio для создания пула потоков. Мой вопрос: как я могу прикрепить конкретные данные к каждому из созданных потоков и как я могу управлять отдельными выходами?Использование пула потоков для моделирования: boost-thread и boost-asio

Чтобы быть более конкретным, я написал класс Simulation, который выполняет моделирование с помощью метода, принимающего некоторые параметры на входе. Этот класс содержит все данные, необходимые для расчета. Поскольку данные не слишком большие, я хотел бы их дублировать, чтобы использовать в каждом потоке пула другой экземпляр класса Simulation.

Я хотел бы сделать что-то вроде этого: (Настройка пула потоков объясняется здесь: SO и Asio recipes)

class ParallelSimulation 
{ 
    public: 
    static const std::size_t N = 10; 

    protected: 
    std::vector< boost::shared_ptr<Simulation> > simuInst; // N copy of a reference instance. 

    public: 

    ... 

    // Simulation with a large (>>N) number of inputs 
    void eval(std::vector<SimulationInput> inputs) 
    { 
     // Creation of the pool using N threads 
     asio::io_service io_service; 
     asio::io_service::work work(io_service); 
     boost::thread_group threads; 
     for (std::size_t i = 0; i < N; ++i) 
     threads.create_thread(boost::bind(&asio::io_service::run, &io_service)); 

     // Here ? Attaching the duplicates instances of class Simulation ? 

     // Adding tasks 
     for(std::size_t i = 0, i_end = inputs.size(); i<i_end; ++i) 
     io_service.post(...); // add simulation with inputs[i] to the queue 

     // How to deal with outputs ? 

     // End of the tasks 
     io_service.stop(); 
     threads.join_all(); 
    } 
}; 

Может быть, метод, используемый для создания пула потоков (с помощью boost::asio) является не адаптированный к моей проблеме. У вас есть предложения? Спасибо.

ответ

1

Вот результаты моих исследований!

Распределенное моделирование основано на основном классе DistributedSimulation с использованием двух классов реализации: impl::m_io_service и impl::dispatcher.

boost::asio нить бассейн основан на прикреплении io_service::run() способ к другим темам.
Идея состоит в том, чтобы переопределить этот метод и включить механизм для идентификации текущего потока. Нижеприведенное решение основано на локальном хранилище потоков boost::thread_specific_ptr от boost::uuid. После прочтения комментария Tres, я считаю, что определение потока с использованием boost::thread::id является лучшим решением (но эквивалентным и не слишком разным).
Наконец, еще один класс используется для отправки входных данных в экземпляры класса Simulation. Этот класс создает несколько экземпляров одного и того же класса Simulation и использует их для вычисления результатов в каждом потоке.

namespace impl { 

    // Create a derived class of io_service including thread specific data (a unique identifier of the thread) 
    struct m_io_service : public boost::asio::io_service 
    { 
    static boost::thread_specific_ptr<boost::uuids::uuid> ptrSpec_; 

    std::size_t run() 
    { 
     if(ptrSpec_.get() == 0) 
     ptrSpec_.reset(new boost::uuids::uuid(boost::uuids::random_generator()()) ); 

     return boost::asio::io_service::run(); 
    } 
    }; 


    // Create a class that dispatches the input data over the N instances of the class Simulation 
    template <class Simulation> 
    class dispatcher 
    { 
    public: 
     static const std::size_t N = 6; 

     typedef Simulation::input_t input_t; 
     typedef Simulation::output_t output_t; 

     friend DistributedSimulation; 

    protected: 
     std::vector< boost::shared_ptr<Simulation> > simuInst; 
     std::vector<boost::uuids::uuid>   map; 

    public: 

     // Constructor, creating the N instances of class Simulation 
     dispatcher(const Simulation& simuRef) 
     { 
     simuInst.resize(N); 
     for(std::size_t i=0; i<N; ++i) 
      simuInst[i].reset(simuRef.clone()); 
     } 

     // Record the unique identifiers and do the calculation using the right instance of class Simulation 
     void dispatch(const Simulation::input_t& in ) 
     { 
     if(map.size() == 0) { 
      map.push_back(*m_io_service::ptrSpec_); 
      simuInst[0]->eval(in, *m_io_service::ptrSpec_); 
     }  
     else { 
      if(map.size() < N) { 
      map.push_back(*m_io_service::ptrSpec_); 
      simuInst[map.size()-1]->eval(in, *m_io_service::ptrSpec_); 
      } 
      else { 
      for(size_t i=0; i<N;++i) { 
       if(map[i] == *m_io_service::ptrSpec_) { 
       simuInst[i]->eval(in, *m_io_service::ptrSpec_); 
       return; 
       } 
      } 
      } 
     } 
     } 
    }; 

    boost::thread_specific_ptr<boost::uuids::uuid> m_io_service::ptrSpec_; 
} 


    // Main class, create a distributed simulation based on a class Simulation 
    template <class Simulation> 
    class DistributedSimulation 
    { 
    public: 
    static const std::size_t N = impl::dispatcher::N; 

    protected: 
    impl::dispatcher _disp; 

    public: 
    DistributedSimulation() : _disp(Simulation()) {} 

    DistributedSimulation(Simulation& simuRef) 
    : _disp(simuRef) { } 


    // Simulation with a large (>>N) number of inputs 
    void eval(const std::vector<Simulation::input_t>& inputs, std::vector<Simulation::output_t>& outputs) 
    { 

     // Clear the results from a previous calculation (and stored in instances of class Simulation) 
     ... 

     // Creation of the pool using N threads 
     impl::m_io_service io_service; 
     boost::asio::io_service::work work(io_service); 
     boost::thread_group threads; 
     for (std::size_t i = 0; i < N; ++i) 
     threads.create_thread(boost::bind(&impl::m_io_service::run, &io_service)); 

     // Adding tasks 
     for(std::size_t i = 0, i_end = inputs.size(); i<i_end; ++i) 
     io_service.post(boost::bind(&impl::dispatcher::dispatch, &_disp, inputs[i])); 

     // End of the tasks 
     io_service.stop(); 
     threads.join_all(); 

     // Gather the results iterating through instances of class simulation 
     ... 
    } 
    }; 

Редактировать

ниже код является обновлением моего предыдущего решения, принимая во внимание замечание Tres. Как я уже говорил, это намного проще читать!

template <class Simulation> 
    class DistributedSimulation 
    { 
    public: 
     typedef typename Simulation::input_t input_t; 
     typedef typename Simulation::output_t output_t; 

     typedef boost::shared_ptr<Simulation> SimulationSPtr_t; 
     typedef boost::thread::id    id_t;  
     typedef std::map< id_t, std::size_t >::iterator IDMapIterator_t; 

    protected: 
     unsigned int     _NThreads; // Number of threads 
     std::vector<SimulationSPtr_t> _simuInst; // Instances of class Simulation 
     std::map< id_t, std::size_t > _IDMap;  // Map between thread id and instance index. 

    private: 
     boost::mutex _mutex; 

    public: 

     DistributedSimulation() {} 

     DistributedSimulation(const Simulation& simuRef, const unsigned int NThreads = boost::thread::hardware_concurrency()) 
     { init(simuRef, NThreads); } 

     DistributedSimulation(const DistributedSimulation& simuDistrib) 
     { init(simuRef, NThreads); } 

     virtual ~DistributedSimulation() {} 

     void init(const Simulation& simuRef, const unsigned int NThreads = boost::thread::hardware_concurrency()) 
     { 
     _NThreads = (NThreads == 0) ? 1 : NThreads; 
     _simuInst.resize(_NThreads); 
     for(std::size_t i=0; i<_NThreads; ++i) 
      _simuInst[i].reset(simuRef.clone()); 
     _IDMap.clear(); 
     } 


     void dispatch(const input_t& input) 
     { 
     // Get current thread id 
     boost::thread::id id0 = boost::this_thread::get_id(); 

     // Get the right instance 
     Simulation* sim = NULL;   
     { 
      boost::mutex::scoped_lock scoped_lock(_mutex); 
      IDMapIterator_t it = _IDMap.find(id0); 
      if(it != _IDMap.end()) 
      sim = _simuInst[it->second].get(); 
     } 

     // Simulation 
     if(NULL != sim) 
      sim->eval(input); 
     } 


     // Distributed evaluation. 
     void eval(const std::vector<input_t>& inputs, std::vector<output_t>& outputs) 
     { 
     //--Initialisation 
     const std::size_t NInputs = inputs.size(); 

     // Clear the ouptuts f(contained in instances of class Simulation) from a previous run 
     ... 

     // Create thread pool and save ids 
     boost::asio::io_service io_service; 
     boost::asio::io_service::work work(io_service); 
     boost::thread_group threads; 
     for (std::size_t i = 0; i < _NThreads; ++i) 
     { 
      boost::thread* thread_ptr = threads.create_thread(boost::bind(&boost::asio::io_service::run, &io_service)); 
      _IDMap[ thread_ptr->get_id() ] = i; 
     } 

     // Add tasks 
     for(std::size_t i = 0; i < NInputs; ++i) 
      io_service.post(boost::bind(&DistributedSimulation::dispatch, this, inputs[i])); 

     // Stop the service 
     io_service.stop(); 
     threads.join_all(); 

     // Gather results (contained in each instances of class Simulation) 
     ... 
     } 
    }; 
0

Это должно хорошо работать для вашего приложения. Когда вы делаете звонок до io_service.post, в качестве параметра вы передадите функцию моделирования с inputs[i]. В этой функции (предположительно, функция-член от Simulation) просто сохраните результат вычисления в объекте Simulation, а затем перейдите по объектам после того, как вы присоедините потоки для сбора вывода.

Вы также можете передать i в качестве параметра, если вам нужно определить конкретный поток, который выполнял работу. Это предполагает, что сбор данных после завершения моделирования будет в порядке.

Если вам нужен доступ к выходу, поскольку он работает, просто выполните функцию post задание вывода на io_service при необходимости. Обязательно защитите любые общие структуры данных с помощью мьютекса!

+0

Благодарим за ответ. Но я не вижу, как вы используете N экземпляров класса Simulation, чтобы гарантировать, что поток 'i' вычисляет результаты с экземпляром' i' (и поэтому использует свои собственные данные для выполнения работы). Способ борьбы с выходами кажется прекрасным, спасибо! –

+0

@ gleeen.gould Можете ли вы пояснить, почему важно, чтобы вы использовали thread 'i' для вычисления работы для моделирования' i'? Это возможно (вызовите 'thread-> get_id()', когда вы его создадите, передайте это 'simulation [i]', а затем проверьте работу функции моделирования [i] ', когда он будет выполнен, что' boost :: this_thread :: get_id() 'match, if not return, else keep going), но я не думаю, что это необходимо. – Tres

+0

Посмотрите на мое решение, я надеюсь, что это прояснит мои цели. –

Смежные вопросы