2016-07-20 2 views
1

Я пытаюсь сделать это с поддержкой параллелизма C++ 11.Правильный способ ожидания переменной состояния, которая уведомляется несколькими потоками

У меня есть пул потоков рабочих потоков, которые все делают то же самое, где главный поток имеет массив переменных условия (по одному для каждого потока, им нужно «запускать» синхронизированный, т. Е. Не запускать один цикл их цикла).

for (auto &worker_cond : cond_arr) { 
     worker_cond.notify_one(); 
    } 

0Показать эту новость от этого потока, чтобы оживить уведомление о каждом потоке пула, чтобы снова запустить его цикл. Каков правильный способ сделать это? Есть ли одна переменная условия и ждать какого-то целого, каждый поток, который не является мастером, будет увеличиваться? что-то вроде (до сих пор в главном потоке)

unique_lock<std::mutex> lock(workers_mtx); 
    workers_finished.wait(lock, [&workers] { return workers = cond_arr.size(); }); 
+1

Быстрый поиск: вы считали ['std :: experimental :: барьер'] (http://en.cppreference.com/w/cpp/experimental/barrier)? –

ответ

1

Я вижу два варианта здесь:

Вариант 1: join()

В принципе вместо использования переменного условия для начала вычислений в ваших потоках, вам создайте новый поток для каждой итерации и используйте join(), чтобы дождаться завершения. Затем вы создаете новые потоки для следующей итерации и так далее.

Вариант 2: блокирует

Вы не хотите, главное-нить уведомить, пока один из потоков до сих пор работает. Таким образом, каждый поток получает свой собственный замок, который он блокирует перед выполнением вычислений и затем разблокируется. Ваш основной поток блокирует все из них перед вызовом notify() и затем разблокирует их.

+0

Для первого не было бы проще использовать. Второй не работает, потому что может случиться так, что в то время как главный поток ждет некоторую блокировку другого потока, который имеет другую блокировку, может цикл больше, чем один раз, чего я не хочу! – Aram

+0

Нет, если после каждого цикла рабочие потоки освобождают блокировку, а затем ждут переменную условия и только если оповещенный о блокировке и выполняет ровно один цикл. – Anedar

1

Я не вижу ничего принципиально неправильного в вашем решении.

Охрана workers с workers_mtx и сделано.

Мы можем абстрагировать это с помощью семафора подсчета.

struct counting_semaphore { 
    std::unique_ptr<std::mutex> m=std::make_unique<std::mutex>(); 
    std::ptrdiff_t count = 0; 
    std::unique_ptr<std::condition_variable> cv=std::make_unique<std::condition_variable>(); 

    counting_semaphore(std::ptrdiff_t c=0):count(c) {} 
    counting_semaphore(counting_semaphore&&)=default; 

    void take(std::size_t n = 1) { 
    std::unique_lock<std::mutex> lock(*m); 
    cv->wait(lock, [&]{ if (count-std::ptrdiff_t(n) < 0) return false; count-=n; return true; }); 
    } 
    void give(std::size_t n = 1) { 
    { 
     std::unique_lock<std::mutex> lock(*m); 
     count += n; 
     if (count <= 0) return; 
    } 
    cv->notify_all(); 
    } 
}; 

take принимает count прочь, и блоки, если не хватает.

give добавляет к count и уведомляет, есть ли положительная сумма.

Теперь рабочие потоки паромные жетоны между двумя семафорами.

std::vector<counting_semaphore> m_worker_start{count}; 
counting_semaphore m_worker_done{0}; // not count, zero 
std::atomic<bool> m_shutdown = false; 

// master controller: 
for (each step) { 
    for (auto&& starts:m_worker_start) 
    starts.give(); 
    m_worker_done.take(count); 
} 

// master shutdown: 
m_shutdown = true; 
// wake up forever: 
for (auto&& starts:m_worker_start) 
    starts.give(std::size_t(-1)/2); 

// worker thread: 
while (true) { 
    master->m_worker_start[my_id].take(); 
    if (master->m_shutdown) return; 
    // do work 
    master->m_worker_done.give(); 
} 

или somesuch.

live example.

Смежные вопросы