2014-09-18 3 views
3

Я играю с std :: атомными структурами и написал эту многопользовательскую многопользовательскую очередь без блокировки, которую я здесь прикрепляю. Идея очереди состоит из двух стеков - стека производителей и потребителей, которые по существу связаны структурами списков. Узлы списков содержат индексы в массиве, в котором хранятся фактические данные, где вы будете читать или писать.Атомные операции многоточечной очереди

Идея состоит в том, что узлы для списков являются взаимоисключающими, то есть указатель на узел может существовать только в списке производителей или потребителей. Производитель попытается получить узел из списка производителей, потребитель из списка потребителей и всякий раз, когда указатель на узел будет получен либо производителем, либо потребителем, он должен быть из обоих списков, чтобы никто другой не смог его приобрести. Я использую функции std :: atomic_compare_exchange для вращения до тех пор, пока не выскочит узел.

Проблема заключается в том, что с логикой должно быть что-то не так, или операции не являются атомарными, поскольку я предполагаю, что это происходит потому, что даже с 1 производителем и 1 потребителем, учитывая достаточно времени, очередь будет жить и что я заметил заключается в том, что если вы утверждаете, что ячейка! = cell-> m_next, утверждение будет поражено! Так что его, вероятно, что-то, глядя мне в лицо, и я просто не вижу, поэтому мне интересно, если кто-то может браться.

Thx

#ifndef MTQueue_h 
#define MTQueue_h 

#include <atomic> 

template<typename Data, uint64_t queueSize> 
class MTQueue 
{ 
public: 

    MTQueue() : m_produceHead(0), m_consumeHead(0) 
    { 
     for(int i=0; i<queueSize-1; ++i) 
     { 
      m_nodes[i].m_idx = i; 
      m_nodes[i].m_next = &m_nodes[i+1]; 
     } 
     m_nodes[queueSize-1].m_idx = queueSize - 1; 
     m_nodes[queueSize-1].m_next = NULL; 

     m_produceHead = m_nodes; 
     m_consumeHead = NULL; 
    } 

    struct CellNode 
    { 
     uint64_t m_idx; 
     CellNode* m_next; 
    }; 

    bool push(const Data& data) 
    { 
     if(m_produceHead == NULL) 
      return false; 

     // Pop the producer list. 
     CellNode* cell = m_produceHead; 
     while(!std::atomic_compare_exchange_strong(&m_produceHead, 
                &cell, cell->m_next)) 
     { 
      cell = m_produceHead; 
      if(!cell) 
       return false; 
     } 

     // At this point cell should point to a node that is not in any of the lists 
     m_data[cell->m_idx] = data; 

     // Push that node as the new head of the consumer list 
     cell->m_next = m_consumeHead; 
     while (!std::atomic_compare_exchange_strong(&m_consumeHead, 
                &cell->m_next, cell)) 
     { 
      cell->m_next = m_consumeHead; 
     } 
     return true; 
    } 

    bool pop(Data& data) 
    { 
     if(m_consumeHead == NULL) 
      return false; 

     // Pop the consumer list 
     CellNode* cell = m_consumeHead; 
     while(!std::atomic_compare_exchange_strong(&m_consumeHead, 
                &cell, cell->m_next)) 
     { 
      cell = m_consumeHead; 
      if(!cell) 
       return false; 
     } 

     // At this point cell should point to a node that is not in any of the lists 
     data = m_data[cell->m_idx]; 

     // Push that node as the new head of the producer list 
     cell->m_next = m_produceHead; 
     while(!std::atomic_compare_exchange_strong(&m_produceHead, 
                &cell->m_next, cell)) 
     { 
      cell->m_next = m_produceHead; 
     } 
     return true; 
    }; 

private: 

    Data m_data[queueSize]; 

    // The nodes for the two lists 
    CellNode m_nodes[queueSize]; 

    volatile std::atomic<CellNode*> m_produceHead; 
    volatile std::atomic<CellNode*> m_consumeHead; 
}; 

#endif 
+2

Это выглядит как вопрос ABA мне: https://stackoverflow.com/questions/25628225/where-is-the-race/25628314#25628314 – dohashi

+0

Может быть ... Как бы один тест для этого? – bitwise

+0

ABA довольно сложно проверить. Я бы предложил действительно понять проблему ABA, а затем продумать ваш алгоритм, чтобы убедиться, что он уязвим. Модификация вашей структуры данных как конечного автомата также может помочь. Атомные структуры данных могут быть очень сложными для правильной работы. – dohashi

ответ

1

Я вижу несколько проблем с реализацией очереди:

  1. Это не очередь, это стек: самый последний элемент толкнул первый элемент выскочил. Не то, чтобы в стеках что-то не так, но это путается назвать его очередью. Фактически это два незакрепленных стека: один стек, изначально заполненный массивом узлов, и еще один стек, в котором хранятся фактические элементы данных, используя первый стек в виде списка свободных узлов.

  2. Существует раса данных по CellNode::m_next в обоих push и pop (неудивительно, так как они оба делают то же самое, то есть, поп узел из одного стека и нажать этот узел на другой). Произнесите два потока одновременно, например. pop и оба имеют одинаковое значение от m_consumeHead. Нить 1 гонка вперед успешно всплывает и устанавливает data. Затем Thread 1 записывает значение m_produceHead в cell->m_next, а Thread 2 одновременно считывает cell->m_next, чтобы перейти к std::atomic_compare_exchange_strong_explicit. Одновременная неатомическая запись и запись cell->m_next двумя потоками по определению представляет собой гонку данных.

    Это так называемая «доброкачественная» гонка в литературе параллелизма: пронусовое/недопустимое значение читается, но никогда не используется.Если вы уверены, что ваш код никогда не понадобится запускать в архитектуре, где он может вызвать пламенные взрывы, вы можете его игнорировать, но для строгого соответствия с моделью стандартной памяти вам необходимо сделать атомарным m_next и использовать по крайней мере memory_order_relaxed, чтобы устранить гонка данных.

  3. ABA. Правильность ваших циклов с обменом-обменом основана на предположении, что атомный указатель (например, и m_consumeHead), имеющий одинаковое значение как при начальной загрузке, так и в результате последующего сравнения, подразумевает, что объект-путевод должен также оставаться неизменным , Эта предпосылка не выполняется ни в какой конструкции, в которой можно быстрее перерабатывать объект, чем какой-либо поток, совершает поездку через цикл сравнения. Рассмотрим последовательность событий:

    1. тему 1 входит pop и считывает значение m_consumeHead и m_consumeHead->m_next но блоков перед вызовом сравнения-обмена.
    2. Тема 2 успешно удаляет этот узел с m_consumeHead и блокирует также.
    3. Резьба 3 толкает несколько узлов на m_consumeHead.
    4. Резьба 2 разблокирует и толкает исходный узел на m_produceHead.
    5. Резьба 3 выталкивает этот узел из m_produceHead и выталкивает его обратно на m_consumeHead.
    6. Thread 1, наконец, разблокирует и вызывает функцию обмена с обменом, которая преуспевает, так как значение m_consumeHead такое же. Он выталкивает узел, который хорошо и хорошо, но устанавливает m_consumeHead в значение m_next значения, которое оно считывает на шаге 1. Все узлы, нажатые на Thread 3, тем временем просачиваются.
+0

Отличные наблюдения! Спасибо, я считаю, что ты прав. Если позволит время, я буду пересматривать дизайн и обновлять этот поток. – bitwise

+0

3. Проблема ABA кажется. –

1

Я считаю, что я был в состоянии взломать этот. Нет livelock в 1000000 пишет/читает для очередей размером от 2 до 1024 и от 1 производителя и 1 потребителя до 100 производителей/100 потребителей.

Вот решение. Хитрость заключается не в том, чтобы использовать cell-> m_next непосредственно в сравнении и свопинге (кстати, это относится и к производителю кода) и требовать строгих правил порядка памяти:

Это, похоже, подтверждает мое подозрение, что это был компилятор переупорядочение чтения. Вот код:

bool push(const TData& data) 
{ 
    CellNode* cell = m_produceHead.load(std::memory_order_acquire); 
    if(cell == NULL) 
     return false; 

    while(!std::atomic_compare_exchange_strong_explicit(&m_produceHead, 
                 &cell, 
                 cell->m_next, 
                 std::memory_order_acquire, 
                 std::memory_order_release)) 
    { 
     if(!cell) 
      return false; 
    } 

    m_data[cell->m_idx] = data; 

    CellNode* curHead = m_consumeHead; 
    cell->m_next = curHead; 
    while (!std::atomic_compare_exchange_strong_explicit(&m_consumeHead, 
                 &curHead, 
                 cell, 
                 std::memory_order_acquire, 
                 std::memory_order_release)) 
    { 
     cell->m_next = curHead; 
    } 

    return true; 
} 

bool pop(TData& data) 
{ 
    CellNode* cell = m_consumeHead.load(std::memory_order_acquire); 
    if(cell == NULL) 
     return false; 

    while(!std::atomic_compare_exchange_strong_explicit(&m_consumeHead, 
                 &cell, 
                 cell->m_next, 
                 std::memory_order_acquire, 
                 std::memory_order_release)) 
    { 
     if(!cell) 
      return false; 
    } 

    data = m_data[cell->m_idx]; 

    CellNode* curHead = m_produceHead; 
    cell->m_next = curHead; 
    while(!std::atomic_compare_exchange_strong_explicit(&m_produceHead, 
                 &curHead, 
                 cell, 
                 std::memory_order_acquire, 
                 std::memory_order_release)) 
    { 
     cell->m_next = curHead; 
    } 

    return true; 
}; 
Смежные вопросы