2009-09-10 3 views
3

У меня есть следующий код:Требуются: элегантное решение гонки состояние

class TimeOutException 
{}; 

template <typename T> 
class MultiThreadedBuffer 
{ 
public: 
    MultiThreadedBuffer() 
    { 
     InitializeCriticalSection(&m_csBuffer); 
     m_evtDataAvail = CreateEvent(NULL, TRUE, FALSE, NULL); 
    } 
    ~MultiThreadedBuffer() 
    { 
     CloseHandle(m_evtDataAvail); 
     DeleteCriticalSection(&m_csBuffer); 
    } 
    void LockBuffer() 
    { 
     EnterCriticalSection(&m_csBuffer); 
    } 
    void UnlockBuffer() 
    { 
     LeaveCriticalSection(&m_csBuffer); 
    } 
    void Add(T val) 
    { 
     LockBuffer(); 
     m_buffer.push_back(val); 
     SetEvent(m_evtDataAvail); 
     UnlockBuffer(); 
    } 
    T Get(DWORD timeout) 
    { 
     T val; 
     if (WaitForSingleObject(m_evtDataAvail, timeout) == WAIT_OBJECT_0) { 
      LockBuffer(); 

      if (!m_buffer.empty()) { 
       val = m_buffer.front(); 
       m_buffer.pop_front(); 
      } 

      if (m_buffer.empty()) { 
       ResetEvent(m_evtDataAvail); 
      } 

      UnlockBuffer(); 
     } else { 
      throw TimeOutException(); 
     } 
     return val; 
    } 
    bool IsDataAvail() 
    { 
     return (WaitForSingleObject(m_evtDataAvail, 0) == WAIT_OBJECT_0); 
    } 
    std::list<T> m_buffer; 
    CRITICAL_SECTION m_csBuffer; 
    HANDLE m_evtDataAvail; 
}; 

Модульное тестирование показывает, что этот код работает отлично при использовании на одном потоке, пока конструктор T по умолчанию и копировать операторы/присваивания не» т бросить. Поскольку я пишу T, это приемлемо.

Моя проблема - метод Get. Если данных нет (т. Е. M_evtDataAvail не установлен), то несколько потоков могут блокировать вызов WaitForSingleObject. Когда новые данные становятся доступными, все они попадают на вызов Lock(). Только один пройдет и может получить данные и двигаться дальше. После разблокировки() другой поток может перемещаться и обнаруживать, что данных нет. В настоящее время он вернет объект по умолчанию.

Что мне нужно для этого второго потока (и других), чтобы вернуться к вызову WaitForSingleObject. Я мог бы добавить блок else, который был разблокирован и сделал goto, но это просто чувствует зло.

Это решение также добавляет возможность бесконечного цикла, так как каждый откат будет перезапускать таймаут. Я мог бы добавить код для проверки часов на входе и настроить таймаут в каждой поездке назад, но затем этот простой метод Get начинает очень усложняться.

Любые идеи о том, как решить эти проблемы, сохраняя при этом проверяемость и простоту?

О, для кого-то интересно, функция IsDataAvail существует только для тестирования. Он не будет использоваться в производственном коде. Add и Get - это единственные методы, которые будут использоваться в среде без тестирования.

ответ

7

Вам необходимо создать событие автоматического перезапуска вместо события ручного сброса. Это гарантирует, что если несколько событий ждут в событии, и когда событие будет установлено, будет выпущен только один поток. Все остальные потоки останутся в состоянии ожидания. Вы можете создать событие автоматического сброса, передав FALSE во второй параметр API CreateEvent. Также обратите внимание, что этот код не является безопасным для исключения, т. Е. После блокировки буфера, если какой-либо оператор выдает исключение, ваш критический раздел не будет разблокирован. Используйте принцип RAII, чтобы ваш критический раздел был разблокирован даже в случае исключений.

+0

Во-первых, я исторически использовал RAII именно в этих обстоятельствах. В этом случае я решил не увеличивать площадь тестирования. Я просто попадаю в TDD. –

+0

Во-вторых, БРИЛЛИАНТ! Я изменил блок if-empty-reset на if-not-empty-set, и каждый путь, о котором я могу думать, обрабатывается должным образом. Благодаря! –

+0

+1, авторешетка будет делать. Существует один недостаток использования autoReset здесь. Если в m_buffer добавлено несколько элементов, используя метод «Добавить», существует вероятность того, что потоки ждут, но m_buffer все еще не отобрал некоторые элементы. –

5

Вы можете использовать объект Semaphore вместо общего объекта Event. Счет семафора должен быть инициализирован до 0 и сгенерирован 1 с помощью ReleaseSemaphore каждый раз, когда вызывается метод Add. Таким образом, WaitForSingleObject в Get никогда не будет выпускать больше потоков для чтения из буфера, чем в буфере.

+0

Ничего себе. Из левого поля появляется выдающаяся альтернатива. Это элегантно и просто. Фриггин красив. –

+0

+1, хорошо подходит решение –

3

Вам всегда нужно кодировать для случая, когда событие сигнализируется, но нет данных, даже с событиями автоматического сброса. Существует условие гонки с момента, когда WaitForsingleevent просыпается до тех пор, пока не будет вызван LockBuffer, и в этом интервале другой поток может вытолкнуть данные из буфера. Ваш код должен помещать WaitForSingleEvent в цикл. Уменьшите тайм-аут с временем, проведенным в каждой итерации цикла ...

В качестве альтернативы я могу заинтересовать вас более масштабируемыми и эффективными альтернативами? Interlocked Singly Linked Lists, OS поток пульт QueueUserWorkItem и idempotent обработка. Добавьте pushes запись в список и отправьте рабочий элемент. Рабочий элемент pops запись, а если не NULL, обработайте ее. Вы можете походить на увлечение и иметь дополнительную логику для процессора, чтобы зациклиться и сохранить состояние, обозначающее его «активное» присутствие, так что «Добавить» не требует лишних рабочих элементов, но это строго не требуется. Для еще более высокого уровня нагрузки и многоядерных/мульти-процессорных нагрузок я рекомендую использовать порты завершения очереди. Подробности описаны в статьях Рика Вичика, у меня есть запись в блоге, которая связывает все 3 сразу: High Performance Windows programs.

+0

Мне очень нравится идея списка нолоков, но насколько я могу судить, эти списки LIFO, и мне нужен FIFO. Или я чего-то не хватает? –

+0

Вы ничего не пропустили. Я не знал, что вы хотите, чтобы жесткий FIFO был принудительным. Списки свободного доступа FIFO Afaik недоступны, поскольку они не могут поддерживаться в одной операции interlockedexchange, например, в списках LIFO. –

Смежные вопросы