2015-04-13 4 views
0

У меня есть функция, которая устанавливает значение двух переменных и ожидает, что два потока печатают значение. Затем обновите значение и так далее ...Boost thread condition variable three thread

С основной и одной нитью он работает, но с двумя нет. Это мой код:

void t(int id){ 

    bool block_me = true; 
    while(1) 
    { 
     { 
      boost::mutex::scoped_lock lock(m); 
      while(!start_t || thread_executed > 0) 
       start_thread.wait(lock); 
     } 

     // Print myself 
     cout<<id<<endl; 

     { 
      boost::mutex::scoped_lock lock(m); 
      thread_executed++; 
      if(thread_executed == 2){ 
       start_main.notify_one(); 
      } 
     }  
    } 
} 

int main(){ 

    thread_executed = 0; 
    start_t = false; 

    boost::thread t1(boost::bind(&t, 1)); 
    boost::thread t2(boost::bind(&t, 2)); 

    for(int i = 1; i < 10; i++){ 
     cout<<"i = "<<i<<endl; 
     { 
      boost::mutex::scoped_lock lock(m); 
      start_t = true; 
      thread_executed = 0; 
      start_thread.notify_all(); 

      while(thread_executed != 2){ 
       start_main.wait(lock); 
      } 
      start_t = false; 
      thread_executed = 0; 
      start_thread.notify_all(); 
     }  
    } 
    return 0; 
} 
+0

Если вы хотите, чтобы сигнализировать производитель обратно, это costumary иметь второе переменное условие. Кроме того, доступ ** все * общие переменные только под блокировкой (вы нарушаете это слева и справа) – sehe

+0

Похоже, вы ищете [барьер] (http://www.boost.org/doc/libs/ 1_57_0/doc/html/thread/synchronization.html # thread.synchronization.barriers) – sehe

+0

Я не знаю ... Мне нужно условие для потока, чтобы запустить печать Simpe и условие для основного изменить значение «один» и «два» ... – gepeppe

ответ

1

Я изменил свой исходный код ... ммм, очередь за ??

Давайте продемонстрируем это!

Я обобщил немного, потому что у вас в основном были две одноэлементные очереди с общей переменной условия.

Гораздо проще рассуждать о двух отдельных очередях с отдельными условиями и замками. Это сразу же распутывает их для синхронизации, и если вы определяете емкость, чтобы быть> 1, у рабочих может быть отставание нескольких элементов в очереди, прежде чем основной поток должен замедлить работу.

Live On Coliru

#include <thread> 
#include <queue> 
#include <mutex> 
#include <condition_variable> 
#include <iostream> 

static constexpr size_t queue_capacity = 1; 

struct Processor { 

    Processor(int id) : id_(id) {} 

    void work() { 
     while (running) { 
      int value; 
      { // pop under lock 
       std::unique_lock<std::mutex> lk(mx_); 
       cv_.wait(lk, [this] { return !running || !queue_.empty(); }); 

       if (!running) 
        break; 

       // invariant: queue cannot be empty here 
       value = queue_.front(); 
       queue_.pop(); 
       cv_.notify_one(); 
      } 
      std::cout << "work " << id_ << ": " << value << "\n"; 
     } 
    } 

    void enqueue(int value) { 
     std::unique_lock<std::mutex> lk(mx_); 
     cv_.wait(lk, [this] { return !running || queue_.size() < queue_capacity; }); 

     if (running) { 
      queue_.push(value); 
      cv_.notify_one(); 
     } 
    } 

    ~Processor() { 
     { 
      std::unique_lock<std::mutex> lk(mx_); 
      cv_.notify_one(); 
      running = false; 
     } 
     if (th_.joinable()) 
      th_.join(); 
    } 
private: 
    bool running = true; 
    std::mutex mx_; 
    std::condition_variable cv_; 
    std::thread th_ {std::bind(&Processor::work, this)}; 
    int id_; 

    std::queue<int> queue_; 
}; 

int main() { 
    Processor w1(1), w2(2); 

    for (int i = 1; i < 10; ++i) 
    { 
     w1.enqueue(i*10); 
     w2.enqueue(i*20); 

     std::this_thread::sleep_for(std::chrono::milliseconds(150)); 
    } 

    std::this_thread::sleep_for(std::chrono::seconds(4)); 
} 

печати, например

work work 1: 10 
2: 20 
work work 2: 40 
1: 20 
work 2: 60 
work 1: 30 
work 2: 80 
work 1: 40 
work 2: 100 
work 1: 50 
work 2: 120 
work 1: 60 
work 2: 140 
work 1: 70 
work 2: 160 
work 1: 80 
work 2: 180 
work 1: 90 
1

Что, скорее всего, происходит здесь в том, что первый запуск потоков и изменить значение переменной «опубликованной», а потом сидит и ждать, второй поток просто сидит и ждет опубликован снова будет 0, но этого никогда не будет, так как основной поток ожидает, что он будет равен 2, чтобы вернуть его обратно на 0.

Есть несколько вещей, которые можно было бы сделать, чтобы реализовать его таким образом, чтобы работы:

  1. Позвольте потокам e nqueue, что вам нужно для печати, а затем другой поток выберет вещи из очереди и распечатает их (или напишет их на диск или что вам действительно нужно делать с ними). Это довольно распространенная картина и упростит общую реализацию. Там может быть ценой, и тогда вы не сможете реализовать ее таким образом.

  2. Добавить «состояния» в ваши потоки, так что, когда вы находитесь в состоянии «PRINT», поток будет просыпаться и печатать значения, когда вы находитесь в состоянии «ОБРАБОТКА», поток будет ждать в состоянии переменная, а основной поток будет «обрабатывать» значения. Вы можете использовать boost barriers, чтобы подождать, пока все мьютексы не выполнили свою работу до изменения состояния.

Пар внушение:

  • Не изменяйте переменные вне защиты мьютекса, если вы читаете их в резьбе, переменные «опубликованы» и «termiante_thread» должны быть защищен когда вы меняете их, чтобы остановить поток, например.

  • Не используйте сон, вы не нуждаетесь в них для достижения рабочего примера, и это признак того, что что-то не так в реализации.

+0

Я изменяю свой исходный код ... но, разве нет решения без использования boost барьера? – gepeppe

+0

Да, использование барьера - это лишь один из способов сделать это, вы можете посмотреть один из ответов, который показывает, как вы можете использовать очередь для этого и не использовать барьеры. – Giordano