2015-06-11 2 views
10

У меня есть x ускорение потоков, которые работают в одно и то же время. Один поток производителя заполняет синхронизированную очередь заданиями вычисления. Потребительские потоки выталкивают задачи и вычисляют их.Завершение поднять потоки правильно

Synchronised Queue Источник изображения: https://www.quantnet.com/threads/c-multithreading-in-boost.10028/

Пользователь может закончить программка во время этого процесса, так что мне нужно выключение моих тема правильно. Мой нынешний подход, похоже, не работает, поскольку выбрасываются исключения. Предполагается, что при выключении системы все процессы должны быть убиты и остановить их текущую задачу независимо от того, что они делают. Не могли бы вы показать мне, как вы будете убивать теки?

Инициализация Тема:

for (int i = 0; i < numberOfThreads; i++) 
    { 
     std::thread* thread = new std::thread(&MyManager::worker, this); 
     mThreads.push_back(thread); 
    } 

Destruction Тема:

void MyManager::shutdown() 
{ 
    for (int i = 0; i < numberOfThreads; i++) 
    { 
     mThreads.at(i)->join(); 
     delete mThreads.at(i); 
    } 
    mThreads.clear(); 
} 

работник:

void MyManager::worker() 
{ 
    while (true) 
    { 

     int current = waitingList.pop(); 
     Object * p = objects.at(current); 
     p->calculateMesh(); //this task is internally locked by a mutex 

     try 
     { 
      boost::this_thread::interruption_point(); 
     } 
     catch (const boost::thread_interrupted&) 
     { 
      // Thread interruption request received, break the loop 
      std::cout << "- Thread interrupted. Exiting thread." << std::endl; 
      break; 
     } 
    } 
} 

Синхронное Очередь:

#include <queue> 
#include <thread> 
#include <mutex> 
#include <condition_variable> 

template <typename T> 
class ThreadSafeQueue 
{ 
public: 

    T pop() 
    { 
     std::unique_lock<std::mutex> mlock(mutex_); 
     while (queue_.empty()) 
     { 
      cond_.wait(mlock); 
     } 
     auto item = queue_.front(); 
     queue_.pop(); 

     return item; 
    } 

    void push(const T& item) 
    { 
     std::unique_lock<std::mutex> mlock(mutex_); 
     queue_.push(item); 
     mlock.unlock(); 
     cond_.notify_one(); 
    } 


    int sizeIndicator() 
    { 
     std::unique_lock<std::mutex> mlock(mutex_); 
     return queue_.size(); 
    } 


private: 

    bool isEmpty() { 
     std::unique_lock<std::mutex> mlock(mutex_); 
     return queue_.empty(); 
    } 

    std::queue<T> queue_; 
    std::mutex mutex_; 
    std::condition_variable cond_; 
}; 

Заброшенная стек вызовов ошибка:

... std::_Mtx_lockX(_Mtx_internal_imp_t * * _Mtx) Line 68 C++ 
... std::_Mutex_base::lock() Line 42 C++ 
... std::unique_lock<std::mutex>::unique_lock<std::mutex>(std::mutex & _Mtx) Line 220 C++ 
... ThreadSafeQueue<int>::pop() Line 13 C++ 
... MyManager::worker() Zeile 178 C++ 
+0

две вещи: IsEmpty не закрывается, и размер() может иметь более простую реализацию: после того, как мьютекс заблокирован вы можете просто вернуть queue_.size() (и MLOCK деструктор освобождает мьютекс) – marom

+0

@marom спасибо, исправил мой код. Ошибка все еще существует. – Anthea

+0

Две вещи: isEmpty и размер не могут быть общедоступными. Независимо от того, что они сообщают, может быть недействительным при оценке вызывающего абонента. Если они не используются в частном порядке, они должны быть удалены. – stefan

ответ

0

Попробуйте переместить «попробовать» вверх (как в примере ниже). Если ваш поток ожидает данных (внутри waitList.pop()), он может ожидать внутри переменной условия .wait(). Это «точка прерывания», и поэтому может бросаться, когда поток прерывается.

void MyManager::worker() 
{ 
    while (true) 
    { 
     try 
     { 
      int current = waitingList.pop(); 
      Object * p = objects.at(current); 
      p->calculateMesh(); //this task is internally locked by a mutex 

      boost::this_thread::interruption_point(); 
     } 
     catch (const boost::thread_interrupted&) 
     { 
      // Thread interruption request received, break the loop 
      std::cout << "- Thread interrupted. Exiting thread." << std::endl; 
      break; 
     } 
    } 
} 
+0

попробовал, но ошибка все еще там. Я обновляю вопрос с помощью списка вызовов отладки. – Anthea

+1

Я бы обернул весь цикл в предложение try-catch. Здесь ничего не изменится, но сама точка исключения заключается в том, что они проходят через все структуры потока управления, такие как петли, так что вам не нужно делать это вручную. –

-2

Я думаю, что это классическая проблема чтения/записи, работающая на общем буфере. Одним из наиболее защищенных способов решения этой проблемы является использование мьютексов и сигналов. (Я не могу опубликовать код здесь. Пожалуйста, пришлите мне письмо, я отправлю вам код).

0

Возможно, вы поймаете неправильный класс исключения? Это означало бы, что его не поймают. Не слишком знакомы с потоками, но это смесь std :: threads и boost :: threads, которая вызывает это?

Попробуйте поймать самое низкое родительское исключение.

3

Из моего опыта работы с потоками как в Boost, так и в Java, попытка отключить потоки извне всегда беспорядочна. Я никогда не мог заставить это работать чисто.

Лучшее, что я получил, это иметь логическое значение, доступное для всех потребительских потоков, для которых установлено значение true. Когда вы установите значение false, потоки просто вернутся самостоятельно. В вашем случае это можно легко ввести в цикл while.

Кроме того, вам понадобится синхронизация, чтобы вы могли дождаться, пока потоки вернутся, прежде чем удалять их, в противном случае вы можете определить, как сложно определить поведение.

Пример из прошлого проекта шахты:

создания темы

barrier = new boost::barrier(numOfThreads + 1); 
threads = new detail::updater_thread*[numOfThreads]; 

for (unsigned int t = 0; t < numOfThreads; t++) { 
    //This object is just a wrapper class for the boost thread. 
    threads[t] = new detail::updater_thread(barrier, this); 
} 

разрушение резьбы

for (unsigned int i = 0; i < numOfThreads; i++) { 
    threads[i]->requestStop();//Notify all threads to stop. 
} 

barrier->wait();//The update request will allow the threads to get the message to shutdown. 

for (unsigned int i = 0; i < numOfThreads; i++) { 
    threads[i]->waitForStop();//Wait for all threads to stop. 
    delete threads[i];//Now we are safe to clean up. 
} 

Некоторые методы, которые могут представлять интерес для нити обертку.

//Constructor 
updater_thread::updater_thread(boost::barrier * barrier) 
{ 
    this->barrier = barrier; 
    running = true; 

    thread = boost::thread(&updater_thread::run, this); 
} 

void updater_thread::run() { 
    while (running) { 
     barrier->wait(); 
     if (!running) break; 

     //Do stuff 

     barrier->wait(); 
    } 
} 

void updater_thread::requestStop() { 
    running = false; 
} 

void updater_thread::waitForStop() { 
    thread.join(); 
} 

 

Смежные вопросы