2014-02-13 3 views
25

Я пытаюсь понять основные механизмы многопоточности в новом стандарте C++ 11. Самый простой пример, я могу думать о следующих:Stopping C++ 11 std :: threads ожидания на std :: condition_variable

  • Производитель и потребитель реализованы в отдельных потоках
  • продюсерского местами определенное количество элементов в очереди
  • Потребитель берет элементы из очередь, если есть какие-либо присутствующие

Этот пример также используется во многих школьных книгах о многопоточности, и все, что касается процесса коммуникации, отлично работает. Однако у меня есть проблема, когда дело доходит до остановки потребительского потока.

Я хочу, чтобы потребитель работал до тех пор, пока он не получил явный сигнал остановки (в большинстве случаев это означает, что я жду, пока продюсер закончит, чтобы я мог остановить пользователя до окончания программы). К сожалению, в потоках C++ 11 отсутствует механизм прерывания (который я знаю из многопоточности в Java, например). Таким образом, я должен использовать такие флаги, как isRunning, чтобы сигнализировать, что я хочу остановить поток.

Основная проблема заключается в следующем: после того, как я остановил поток производителя, очередь пуста и потребитель ждет на condition_variable, чтобы получить сигнал, когда очередь заполняется снова. Поэтому мне нужно разбудить поток, вызвав notify_all() над переменной перед выходом.

Я нашел рабочее решение, но оно кажется каким-то грязным. Пример кода приведен ниже (я извиняюсь, но я почему-то не может уменьшить размер кода любой furhter для «минимального» минимального примера):

Очередь Класс:

class Queue{ 
public: 
    Queue() : m_isProgramStopped{ false } { } 

    void push(int i){ 
     std::unique_lock<std::mutex> lock(m_mtx); 
     m_q.push(i); 
     m_cond.notify_one(); 
    } 

    int pop(){ 
     std::unique_lock<std::mutex> lock(m_mtx); 
     m_cond.wait(lock, [&](){ return !m_q.empty() || m_isProgramStopped; }); 

     if (m_isProgramStopped){ 
      throw std::exception("Program stopped!"); 
     } 

     int x = m_q.front(); 
     m_q.pop(); 

     std::cout << "Thread " << std::this_thread::get_id() << " popped " << x << "." << std::endl; 
     return x; 
    } 

    void stop(){ 
     m_isProgramStopped = true; 
     m_cond.notify_all(); 
    } 

private: 
    std::queue<int> m_q; 
    std::mutex m_mtx; 
    std::condition_variable m_cond; 
    bool m_isProgramStopped; 
}; 

Продюсер:

class Producer{ 
public: 
    Producer(Queue & q) : m_q{ q }, m_counter{ 1 } { } 

    void produce(){ 
     for (int i = 0; i < 5; i++){ 
      m_q.push(m_counter++); 
      std::this_thread::sleep_for(std::chrono::milliseconds{ 500 }); 
     } 
    } 

    void execute(){ 
     m_t = std::thread(&Producer::produce, this); 
    } 

    void join(){ 
     m_t.join(); 
    } 

private: 
    Queue & m_q; 
    std::thread m_t; 

    unsigned int m_counter; 
}; 

Потребитель:

class Consumer{ 
public: 
    Consumer(Queue & q) : m_q{ q }, m_takeCounter{ 0 }, m_isRunning{ true } 
    { } 

    ~Consumer(){ 
     std::cout << "KILL CONSUMER! - TOOK: " << m_takeCounter << "." << std::endl; 
    } 

    void consume(){ 
     while (m_isRunning){ 
      try{ 
       m_q.pop(); 
       m_takeCounter++; 
      } 
      catch (std::exception e){ 
       std::cout << "Program was stopped while waiting." << std::endl; 
      } 
     } 
    } 

    void execute(){ 
     m_t = std::thread(&Consumer::consume, this); 
    } 

    void join(){ 
     m_t.join(); 
    } 

    void stop(){ 
     m_isRunning = false; 
    } 

private: 
    Queue & m_q; 
    std::thread m_t; 

    unsigned int m_takeCounter; 
    bool m_isRunning; 
}; 

И, наконец, main():

int main(void){ 
    Queue q; 

    Consumer cons{ q }; 
    Producer prod{ q }; 

    cons.execute(); 
    prod.execute(); 

    prod.join(); 

    cons.stop(); 
    q.stop(); 

    cons.join(); 

    std::cout << "END" << std::endl; 

    return EXIT_SUCCESS; 
} 

Является ли это право способ закончить поток, который ожидает ее переменной условия или есть более эффективные методы? В настоящее время очередь должна знать, остановлена ​​ли программа (которая, на мой взгляд, уничтожает свободную связь компонентов), и мне нужно называть stop() в очереди явно, что кажется неправильным.

Дополнительная информация, переменная условия, которая должна использоваться только как сингл, если очередь пуста, теперь означает другое условие - если программа закончилась. Если я не ошибаюсь, каждый раз, когда поток ожидает переменную условия для какого-либо события, он также должен будет проверить, нужно ли остановить поток, прежде чем продолжить его выполнение (что также кажется неправильным).

Есть ли у меня эти проблемы, потому что вся моя конструкция неисправна или мне не хватает некоторых механизмов, которые можно использовать для чистого потока потоков?

+0

Это почти что мы делаем в нашем коде. Установите переменную «stopping» и сообщите переменную условия и проверите ее, как первое, что она делает. Кажется, что нет простого, более элегантного общего решения, которое мы нашли. – jcoder

+0

Вы можете вызвать 'stop()' из деструктора вашей очереди. См. Аналогичное решение http://stackoverflow.com/a/9711916/412080 –

+0

Если вы просто хотите реализовать шаблон «Продюсер/Потребитель», тогда есть другие подходы, которые решают эту проблему, без необходимости найти решение для неприятную проблему «аннулирования» или «отмены» примитива синхронизации для возобновления потока. – CouchDeveloper

ответ

5

Нет, в вашем дизайне нет ничего плохого, и это нормальный подход к решению этой проблемы.

Это совершенно верно для вас, чтобы иметь несколько условий (например, что-либо в очереди или остановке программы), привязанных к переменной условия. Главное, что бит в состоянии проверяется, когда возвращается wait.

Вместо того чтобы иметь флаг в Queue, чтобы указать, что программа останавливается, вы должны думать о том, что флаг «может я принимаю». Это лучшая общая парадигма и лучше работает в многопоточной среде.

Кроме того, вместо того, чтобы pop бросить исключение, если кто-то называет его и stop был назван вы могли бы заменить метод с bool try_pop(int &value) который будет возвращать true если значение было возвращено, иначе false. Таким образом, вызывающий может проверить, не удалось ли остановить очередь (добавьте метод bool is_stopped() const). Хотя обработка исключений работает здесь, она немного тяжелая, и на самом деле это не исключительный случай в многопоточной программе.

+0

Но если задача, выполняемая потоком, занимает очень много времени для обработки, тогда вам нужно подождать некоторое время до того, как вызов stop() вступит в силу. Итак, в этом плане, это лучший дизайн? – user18490

+0

Не могли бы вы рассказать о том, что вы подразумеваете под «Вместо того, чтобы иметь флаг в очереди, чтобы указать, что программа останавливается, вы должны думать о том, что флаг« могу ли я принять »? – ksl

1

wait можно вызвать с таймаутом. Элемент управления возвращается в поток и может быть проверен stop. В зависимости от этого значения он может wait на большее количество предметов, которые необходимо использовать или завершить. Хорошим введением в многопоточность с C++ является C++11 Concurrency.

Смежные вопросы