2016-07-07 6 views
0

У меня есть набор данных, которые необходимо обрабатывать одновременно, используя многопоточность, количество данных, предположительно, больше, чем количество потоков. Я решил поместить данные в какую-то очередь, поэтому каждый свободный поток может погладить свою часть и обрабатывать ее до тех пор, пока очередь не будет пустой. Я мог бы использовать простую STL-очередь и блокировать ее с помощью мьютекса, когда я хочу удалить из него элемент, но я бы хотел попробовать использовать блокировку. В то же время мой проект слишком мал, чтобы зависеть от какой-то сторонней библиотеки, которая предоставляет блокирующие структуры, на самом деле мне нужно только атомное удаление. Поэтому я решил осуществить свою собственную очередь на основе вектора с указателем на «голове» и увеличиваем этот указатель атомарно:Самый простой способ вывести из строя атомарно?

template <typename T> 
class AtomicDequeueable 
{ 
public: 

    // Assumption: data vector never changes 
    AtomicDequeueable(const std::vector<T>& data) : 
     m_data(data), 
     m_pointer(ATOMIC_VAR_INIT(0)) 
    {} 

    const T * const atomicDequeue() 
    { 
     if (std::atomic_load(&m_pointer) < m_data.size()) 
     { 
      return &m_data 
      [ 
       std::atomic_fetch_add(&m_pointer, std::size_t(1)) 
      ]; 
     } 

     return nullptr; 
    } 

private: 

    AtomicDequeueable(const AtomicDequeueable<T>&) {} 

    std::atomic_size_t m_pointer; 
    const std::vector<T>& m_data; 
}; 

функция темы выглядит следующим образом:

void f(AtomicDequeueable<Data>& queue) 
{ 
    while (auto dataPtr = queue.atomicDequeue()) 
    { 
     const Data& data = *dataPtr; 
     // processing data... 
     std::this_thread::sleep_for(std::chrono::milliseconds(1)); 
    } 
} 

Мой опыт использования незакрепленные структуры и примитивы действительно бедны, поэтому я задаюсь вопросом: будет ли мой подход работать правильно? Конечно, я тестировал его на Ideone, но я не знаю, как он будет вести себя с реальными данными.

+2

Без блокировки и просты? О, парень. В вашем коде есть гонка данных. Представьте, что ваша очередь размером 'n' заполнена до' n - 1', теперь два потока одновременно преуспевают в 'std :: atomic_load (& m_pointer) user2296177

+2

Ваша функция 'atomicDequeue' не выглядит атомной. Подумайте, что произойдет, если 2 потока пройдут через оператор if, когда 'm_pointer' является' m_data.size() - 1'. – Kevin

ответ

1

В настоящее время ваша функция atomicDequeue имеет гонку данных: возможно, для двух потоков оба выполняли первую команду atomic перед выполнением второго. Тем не менее, это может быть исправлено, как вам действительно нужно только 1 атомарной операции, в соответствии со следующим изменением:

const T * const atomicDequeue() 
{ 
    auto myIndex = std::atomic_fetch_add(&m_pointer, std::size_t(1)); 

    if(myIndex >= m_data.size()) 
     return nullptr; 

    return &m_data[myIndex]; 
} 

Это не работает при условии, ничего изменяет входной вектор во время работы потока.

-1

Ваш код очень глючный. Позвольте мне быть очень откровенным в моей рекомендации здесь:

  • «ACTUM Ne Agas:   Не делать вещи уже сделано.»   У вас есть много доступных существующих классов C++, которые могут надежно выполнять очередность и межпроцессную связь (IPC) в целом. Используйте один из них.

  • Не беспокойтесь о «свободе блокировки». Это то, что замки для, и они надежные, быстрые и дешевые.

Ваше понятие «с использованием очереди» звук, но вы собираетесь ненужной работы ... и создавать ошибки ... когда вы можете просто «захватить что-то с полки.» Вы знаете, что стандартная часть будет работать правильно, потому что другие люди проверяют ее до смерти.

Смежные вопросы