2014-01-28 4 views
8

У меня есть приложение C++ 11 с высокоприоритетным потоком, который производит данные, и поток с низким приоритетом, который его потребляет (в моем случае, записывая его на диск) , Я хотел бы удостовериться, что поток приоритетных производителей никогда не блокируется, т. Е. Он использует только блокирующие алгоритмы.C++ 11 неблокирующий производитель/потребитель

С заблокированной очередью я могу передавать данные в очередь из потока производителей и опросить ее из потребительского потока, тем самым выполняя мои цели выше. Я бы хотел изменить мою программу, чтобы потребительский поток блокировался, когда он неактивен, вместо опроса.

Кажется, что переменная условия C++ 11 может быть полезна для блокировки потребительского потока. Может ли кто-нибудь показать мне пример того, как его использовать, избегая при этом возможности, что потребитель спит с данными, находящимися в очереди? В частности, я хочу убедиться, что потребитель всегда пробуждается в течение некоторого конечного времени после того, как производитель нажимает последний элемент в очередь. Также важно, чтобы производитель оставался неблокирующим.

+0

См. Пример использования переменной условия в http: //codereview.stackexchange.com/questions/39199/multi-producer-consumer-queue-without-boost-in-c11 – GabiMe

+0

Это невозможно сделать без блокировки с переменной условия, потому что ждать или сигнализировать на одном вы должны были получить блокировку , На практике это не делает реальной разницы (большинство блокировок будет иметь короткую оживленную стратегию ожидания, которой должно быть достаточно, чтобы избежать любых вызовов ядра или иначе, так как критический раздел для читателя невероятно короткий) – Voo

+1

@Voo Нет необходимости получить блокировку, чтобы сигнализировать переменную условия. – Casey

ответ

1

Кажется, что переменная условия C++ 11 может быть полезна для блокировки потребительского потока. Может ли кто-нибудь показать мне пример того, как его использовать, избегая при этом возможности, что потребитель спит с данными, находящимися в очереди?

Для использования переменной условия вам необходимо мьютекс и условие. В вашем случае условие будет «есть данные, доступные в очереди». Поскольку производитель будет использовать обновления без блокировки для создания работы, потребитель должен использовать одну и ту же форму синхронизации для использования этой работы, поэтому мьютекс фактически не будет использоваться для синхронизации и нужен только потребительскому потоку, потому что нет другой способ ожидания по переменной условия.

// these variables are members or otherwise shared between threads 
std::mutex m_mutex; 
std::condition_variable m_cv; 
lockfree_queue m_data; 

// ... 

// in producer thread: 
while (true) 
{ 
    // add work to queue 
    m_data.push(x); 
    m_cv.notify_one(); 
} 

// in consumer thread: 
while (true) 
{ 
    std::unique_lock<std::mutex> lock(m_mutex); 
    m_cv.wait(lock, []{ return !m_data.empty(); }); 
    // remove data from queue and process it 
    auto x = m_data.pop(); 
} 

Переменная условие будет блокировать только в wait вызова, если очередь пуста перед ожиданием. Переменная условия может просыпаться ложно или потому, что она была уведомлена производителем, но в любом случае будет возвращаться только из вызова wait (а не спать снова), если очередь не является пустой. Это гарантируется с помощью перегрузки condition_variable::wait, которая берет предикат, потому что переменная условия всегда перепроверяет предикат для вас.

Поскольку мьютекс используется только потребительской нитью, он может быть локальным для этого потока (если у вас есть только один потребитель, с несколькими из них все они должны использовать один и тот же мьютекс, чтобы ждать на одном и том же condvar).

+0

Если производитель вызывает notify_one() перед тем, как потребитель вызывает wait(), не будет ли потребитель спать с данными в очереди (и таким образом спать вечно, если эти данные являются последними данными, когда-либо выпущенными)? – nonagon

+0

Нет, потому что вызов 'wait' проверяет его предикат перед сном, поэтому, если' m_data_available' уже верно, он не будет спать. Однако в реальном коде вы, вероятно, не хотите использовать простой «атомный », чтобы указать, что данные доступны, вы бы хотели проверить «m_queue.size()» или «m_available_count», иначе потребитель не знает, должен установить 'm_data_available = false' после потребления единственного фрагмента данных, поскольку он может быть более доступным. –

+0

Я отредактировал ответ, чтобы заменить 'atomic ' на операции с объектом без блокировки, чтобы решить проблему, описанную в комментарии выше, и пояснил, что CV не будет блокироваться, если в очереди уже есть данные –

0

Одним из решений, которое я нашел в прошлом, было использование событий Windows (http://msdn.microsoft.com/en-us/library/windows/desktop/ms682396(v=vs.85).aspx). В этом случае событие остается сигналом до тех пор, пока он не просыпает ожидающий поток, и если ни один нить не ждет, он остается сигнализируемым. Поэтому продюсер просто должен сигнализировать о событии после нажатия данных в очередь. Затем мы гарантируем, что потребитель будет просыпаться через некоторое конечное время после этого.

Я не смог найти способ реализовать это, используя стандартную библиотеку (хотя бы не блокируя поток производителя).

0

Я думаю, что семафоры могут быть использованы для решения этой проблемы безопасности:

// in producer thread: 
while (true) 
{ 
    m_data.push(); 
    m_semaphore.release(); 
} 

// in consumer thread: 
while (true) 
{ 
    m_semaphore.wait(); 
    m_data.pop(); 
} 

К сожалению, я не думаю, что C++ 11 включает в себя семафор? Я также не смог подтвердить, что освобождение семафора является неблокирующей операцией. Конечно, реализации с мьютексами, например. C++0x has no semaphores? How to synchronize threads? не допускает неблокирующий поток производителей.

Смежные вопросы