2016-09-15 2 views
0

Я использую этот класс для установки производителя-потребитель в C++:C++ отключение Потокобезопасных очередей

#pragma once 

#include <queue> 
#include <mutex> 
#include <condition_variable> 
#include <memory> 
#include <atomic> 

template <typename T> class SafeQueue 
{ 
public: 
    SafeQueue() : 
    _shutdown(false) 
    { 

    } 

    void Enqueue(T item) 
    { 
     std::unique_lock<std::mutex> lock(_queue_mutex); 
     bool was_empty = _queue.empty(); 
     _queue.push(std::move(item)); 
     lock.unlock(); 

     if (was_empty) 
      _condition_variable.notify_one(); 
    } 

    bool Dequeue(T& item) 
    { 
     std::unique_lock<std::mutex> lock(_queue_mutex); 

     while (!_shutdown && _queue.empty()) 
      _condition_variable.wait(lock); 

     if(!_shutdown) 
     { 
      item = std::move(_queue.front()); 
      _queue.pop(); 

      return true; 
     } 

     return false; 
    } 

    bool IsEmpty() 
    { 
     std::lock_guard<std::mutex> lock(_queue_mutex); 
     return _queue.empty(); 
    } 

    void Shutdown() 
    { 
     _shutdown = true; 
     _condition_variable.notify_all(); 
    } 

private: 
    std::mutex _queue_mutex; 
    std::condition_variable _condition_variable; 
    std::queue<T> _queue; 
    std::atomic<bool> _shutdown; 
}; 

И я использую это так:

class Producer 
{ 
public: 
    Producer() : 
     _running(true), 
     _t(std::bind(&Producer::ProduceThread, this)) 
    { } 

    ~Producer() 
    { 
     _running = false; 
     _incoming_packets.Shutdown(); 
     _t.join(); 
    } 

    SafeQueue<Packet> _incoming_packets; 

private: 
    void ProduceThread() 
    { 
     while(_running) 
     { 
      Packet p = GetNewPacket(); 
      _incoming_packets.Enqueue(p); 
     } 
    } 

    std::atomic<bool> _running; 
    std::thread _t; 
} 

class Consumer 
{ 
    Consumer(Producer* producer) : 
     _producer(producer), 
     _t(std::bind(&Consumer::WorkerThread, this)) 
    { } 

    ~Consumer() 
    { 
     _t.join(); 
    } 

private: 
    void WorkerThread() 
    { 
     Packet p; 

     while(producer->_incoming_packets.Dequeue(p)) 
      ProcessPacket(p); 
    } 

    std::thread _t; 
    Producer* _producer; 
} 

Это работает наиболее из время. Но раз в то время, когда я удаляю производитель (и заставляя его Разрушитель называть SafeQueue::Shutdown, то _t.join() блоков навсегда

Моего предположением является, что проблема здесь (в SafeQueue::Dequeue).

while (!_shutdown && _queue.empty()) 
     _condition_variable.wait(lock); 

SafeQueue::Shutdown от нити # 1 вызывается в то время как поток # 2 По окончании проверки _shutdown, но прежде чем он выполнен _condition_variable.wait(lock), поэтому он «промахивается» notify_all(). Может ли это случиться?

Если это проблема, что это лучший способ ее решить?

+0

Включите ли вы свои предупреждения на максимальном уровне? У вас тонкая ошибка в вашем классе Consumer. Порядок ваших членов данных и порядок, который вы собираетесь инициализировать в конфликтах вашего конструктора ... Проверьте его. Нить будет создана до того, как назначен '_producer' – WhiZTiM

+0

Опять вы используете неверную переменную условия ... Ее не должно быть в тактическом цикле. У этого есть перегрузка, которая допускает такое тестирование. – WhiZTiM

+0

@WhiZTiM Я знаю, что он имеет такую ​​перегрузку, но это эквивалентно циклу while: http://en.cppreference.com/w/cpp/thread/condition_variable/wait. – UnTraDe

ответ

1

Поскольку объект SafeQueue принадлежит производителю, удаление продюсера вызывает условие гонки между уведомляемым пользователем и удалением SafeQueue из-под него при завершении ~ продюсера.

Я предлагаю, чтобы общий ресурс принадлежал ни производителю, ни потребителю, но передавался как ссылка на конструктор каждого из них.

Измените конструкторы производителя и потребителя;

Producer(SafeQueue<Packet> & queue) : 
    _running(false), _incoming_packets(queue) {} 


Consumer(SafeQueue<Packet> & queue) : 
    _running(false), _incoming_packets(queue) {} 

Используйте ваши экземпляры таким образом;

SafeQueue<Packet> queue; 
Producer producer(queue); 
Consumer consumer(queue); 

...do stuff... 

queue.shutdown(); 

Это также устраняет плохую проблему дизайна, которая у вас в классе потребителей, так тесно связана с классом Producer.

Кроме того, это, вероятно, плохая идея убить и объединить потоки в деструкторе, как вы это делаете для ~ Продюсера. Лучше добавить метод Shutdown() для каждого класса потоков и вызвать их явно;

producer.shutdown(); 
consumer.shutdown(); 
queue.shutdown(); 

Shutdown заказ на самом деле не имеет значения, если вы не обеспокоены потерей необработанных пакетов, которые все еще находятся в очереди, когда вы перестанете потребителя.

+0

Почему это плохой проект для объединения потоков в деконструктор? Разве это не противоречит RAII? – UnTraDe

+0

@UnTraDe, создание и уничтожение будет RAII, запуск и остановка - это программный контроль. Но, кроме того, 'join' может генерировать исключения, что означает, что правильная обработка требует еще большей логики в деструкторе, что определенно конфликтует с RAII. – CAB

0

В вашем SafeQueue::Dequeue вы, вероятно, используете std::condition_variable неправильным способом ...Изменить это:

bool Dequeue(T& item) 
{ 
    std::unique_lock<std::mutex> lock(_queue_mutex); 

    while (!_shutdown && _queue.empty()) 
     _condition_variable.wait(lock); 
    if(!_shutdown) 
    { 
     item = std::move(_queue.front()); 
     _queue.pop(); 
     return true; 
    } 
    return false; 
} 

в

bool Dequeue(T& item) 
{ 
    std::unique_lock<std::mutex> lock(_queue_mutex); 
    _condition_variable.wait(lock, []{ return _shutdown || !_queue.empty() }); 
    if(!_shutdown) 
    { 
     item = std::move(_queue.front()); 
     _queue.pop(); 
     return true; 
    } 

    return false; 
} 

Во-вторых, порядок инициализации элементов данных в Consumer не прав в отношении своего конструктора

class Consumer 
{ 
    Consumer(Producer* producer) : 
     _producer(producer), 
     _t(std::bind(&Consumer::WorkerThread, this)) 
    { } 
    ...... 
    // _t will be constructed first, regardless of your constructor initializer list 
    // Meaning, the thread can even start running using an unintialized _producer 
    std::thread _t;  
    Producer* _producer; 
} 

Она должна быть заказана до:

class Consumer 
{ 
    Consumer(Producer* producer) : 
     _producer(producer), 
     _t(std::bind(&Consumer::WorkerThread, this)) 
    { } 
    ...... 
    Producer* _producer; 
    std::thread _t;  
} 

Другая часть вашей проблемы покрыта CAB's answer

+0

Исходный 'while (! Pred) condvar.wait (lock);' code правильный, и ваше изменение неверно, потому что при передаче предиката ожидание останавливается, когда предикат является истинным, поэтому он должен быть '_shutdown || ! _queue.empty() ' – stefaanv

+0

@stefaanv, Упс. исправленный. большое спасибо – WhiZTiM

Смежные вопросы