2014-01-07 2 views
9

В распределенной системе заданий, написанной на C++ 11, я реализовал забор (т. Е. Поток за пределами пула рабочих потоков может запросить блокировку до тех пор, пока все текущие запланированные задания сделано), используя следующую структуру:Синхронизация ненадежна с использованием std :: atomic и std :: condition_variable

struct fence 
{ 
    std::atomic<size_t>      counter; 
    std::mutex        resume_mutex; 
    std::condition_variable     resume; 

    fence(size_t num_threads) 
     : counter(num_threads) 
    {} 
}; 

код реализации забор выглядит следующим образом:

void task_pool::fence_impl(void *arg) 
{ 
    auto f = (fence *)arg; 
    if (--f->counter == 0)  // (1) 
     // we have zeroed this fence's counter, wake up everyone that waits 
     f->resume.notify_all(); // (2) 
    else 
    { 
     unique_lock<mutex> lock(f->resume_mutex); 
     f->resume.wait(lock); // (3) 
    } 
} 

Это работает очень хорошо, если нити ввести забор в течение определенного периода времени. Однако, если они пытаются сделать это почти одновременно, иногда случается, что между атомным декрементом (1) и началом ожидания условного var (3) поток дает процессорное время, а другой поток уменьшает счетчик до нуля (1) и запускает конд. var (2). Это приводит к тому, что предыдущий поток ожидает в (3), потому что он начинает ждать его после того, как он уже был уведомлен.

Рукав, чтобы сделать вещь работоспособной, нужно поставить 10 мс сна как раз перед (2), но это неприемлемо по очевидным причинам.

Любые предложения о том, как исправить это по-исполнительски?

ответ

11

Ваша диагностика верна, этот код подвержен потерям уведомлений о состоянии, описанных вами. То есть после того, как один поток заблокировал мьютекс, но перед ожиданием переменной условия другой поток может вызвать notify_all(), чтобы первый поток пропускал это уведомление.

Простое исправление для блокировки мьютекса до того декремента счетчика и во время уведомления:

void task_pool::fence_impl(void *arg) 
{ 
    auto f = static_cast<fence*>(arg); 
    std::unique_lock<std::mutex> lock(f->resume_mutex); 
    if (--f->counter == 0) { 
     f->resume.notify_all(); 
    } 
    else do { 
     f->resume.wait(lock); 
    } while(f->counter); 
} 

В этом случае счетчик не должен быть атомарным.

дополнительный бонус (или штраф, в зависимости от точки зрения) запирающих взаимную блокировку перед уведомлением является (от here):

В pthread_cond_broadcast() или pthread_cond_signal() функции могут быть вызваны нить, независимо от того, владеет ли он в настоящее время мьютезом, что потоки, вызывающие pthread_cond_wait() или pthread_cond_timedwait(), связаны с переменной условия во время их ожидания; однако , если требуется предсказуемое поведение при планировании, то мьютекс должен быть заблокирован потоком, вызывающим pthread_cond_broadcast() или pthread_cond_signal().

Что касается while цикла (от here):

Ложные пробуждений от pthread_cond_timedwait() или pthread_cond_wait() может иметь место функции. Поскольку возврат из pthread_cond_timedwait() или pthread_cond_wait() не означает ничего о значении этого предиката, предикат должен быть переоценен при таком возврате.

+0

'unique_lock' отсутствует необходимый ее аргумент шаблона:' 'unique_lock , как в вопросе и ответе. Но в остальном я согласен, +1. –

+0

О, в классе есть 'typedef' (' fence' - это вложенная структура), но я добавил аргумент шаблона для удобочитаемости, спасибо. – IneQuation

+0

@Maxim Егорушкин, почему вы добавили оператор 'while (f-> counter)' в этот последний блок? – IneQuation

-1

Чтобы сохранить более высокую производительность атомной операции вместо полного мьютекса, вы должны изменить условие ожидания на блокировку, проверку и цикл.

Все условия ожидания должны быть выполнены таким образом. Переменная условия имеет второй аргумент для ожидания, который является предикатной функцией или лямбдой.

Код может выглядеть следующим образом:

void task_pool::fence_impl(void *arg) 
{ 
    auto f = (fence *)arg; 
    if (--f->counter == 0)  // (1) 
     // we have zeroed this fence's counter, wake up everyone that waits 
     f->resume.notify_all(); // (2) 
    else 
    { 
     unique_lock<mutex> lock(f->resume_mutex); 
     while(f->counter) { 
      f->resume.wait(lock); // (3) 
     } 
    } 
} 
+2

Насколько я могу судить, вы только добавили петлю, чтобы поймать ложные пробуждения. Это не устраняет проблему, которую я изначально описал - поток, идущий для (3), может по-прежнему давать, позволяя другому потоку достичь (2) до того, как исходный окончательно доберется до (3). – IneQuation

+0

Кроме того, по "lock, check and loop" вы имеете в виду спин-блокировку? – IneQuation

Смежные вопросы