В распределенной системе заданий, написанной на C++ 11, я реализовал забор (т. Е. Поток за пределами пула рабочих потоков может запросить блокировку до тех пор, пока все текущие запланированные задания сделано), используя следующую структуру:Синхронизация ненадежна с использованием std :: atomic и std :: condition_variable
struct fence
{
std::atomic<size_t> counter;
std::mutex resume_mutex;
std::condition_variable resume;
fence(size_t num_threads)
: counter(num_threads)
{}
};
код реализации забор выглядит следующим образом:
void task_pool::fence_impl(void *arg)
{
auto f = (fence *)arg;
if (--f->counter == 0) // (1)
// we have zeroed this fence's counter, wake up everyone that waits
f->resume.notify_all(); // (2)
else
{
unique_lock<mutex> lock(f->resume_mutex);
f->resume.wait(lock); // (3)
}
}
Это работает очень хорошо, если нити ввести забор в течение определенного периода времени. Однако, если они пытаются сделать это почти одновременно, иногда случается, что между атомным декрементом (1) и началом ожидания условного var (3) поток дает процессорное время, а другой поток уменьшает счетчик до нуля (1) и запускает конд. var (2). Это приводит к тому, что предыдущий поток ожидает в (3), потому что он начинает ждать его после того, как он уже был уведомлен.
Рукав, чтобы сделать вещь работоспособной, нужно поставить 10 мс сна как раз перед (2), но это неприемлемо по очевидным причинам.
Любые предложения о том, как исправить это по-исполнительски?
'unique_lock' отсутствует необходимый ее аргумент шаблона:' 'unique_lock, как в вопросе и ответе. Но в остальном я согласен, +1. –
О, в классе есть 'typedef' (' fence' - это вложенная структура), но я добавил аргумент шаблона для удобочитаемости, спасибо. – IneQuation
@Maxim Егорушкин, почему вы добавили оператор 'while (f-> counter)' в этот последний блок? – IneQuation