Я переписываю старую блокировку бесплатной очереди, я начал с использования memory_order_relaxed для всего, с намерением ужесточить семантику памяти и добавить автономные заборы и т. Д. Позже. Но, как ни странно, он работает. Я пробовал компиляцию с XCode и VS2015 с настройками оптимизации maxxed out. У меня был код, очень похожий на этот провал около 1-1,5 лет назад, в последний раз, когда я это написал.Почему эта свободная очередь блокировки работает?
Вот моя очередь:
#ifndef __LOCKFREEMPMCQUEUE_H__
#define __LOCKFREEMPMCQUEUE_H__
#include <atomic>
template <typename T>
class LockFreeMPMCQueue
{
public:
explicit LockFreeMPMCQueue(size_t size)
: m_data(new T[size])
, m_size(size)
, m_head_1(0)
, m_head_2(0)
, m_tail_1(0)
, m_tail_2(0)
{
}
virtual ~LockFreeMPMCQueue() { delete m_data; }
bool try_enqueue(const T& value)
{
size_t tail = m_tail_1.load(std::memory_order_relaxed);
const size_t head = m_head_2.load(std::memory_order_relaxed);
const size_t count = tail - head;
if (count == m_size)
{
return false;
}
if (std::atomic_compare_exchange_weak_explicit(&m_tail_1, &tail, (tail + 1), std::memory_order_relaxed,
std::memory_order_relaxed) == false)
{
return false;
}
m_data[tail % m_size] = value;
while (m_tail_2.load(std::memory_order_relaxed) != tail)
{
std::this_thread::yield();
}
m_tail_2.store(tail + 1, std::memory_order_relaxed);
return true;
}
bool try_dequeue(T& out)
{
size_t head = m_head_1.load(std::memory_order_relaxed);
const size_t tail = m_tail_2.load(std::memory_order_relaxed);
if (head == tail)
{
return false;
}
if (std::atomic_compare_exchange_weak_explicit(&m_head_1, &head, (head + 1), std::memory_order_relaxed,
std::memory_order_relaxed) == false)
{
return false;
}
out = m_data[head % m_size];
while (m_head_2.load(std::memory_order_relaxed) != head)
{
std::this_thread::yield();
}
m_head_2.store(head + 1, std::memory_order_relaxed);
return true;
}
size_t capacity() const { return m_size; }
private:
T* m_data;
size_t m_size;
std::atomic<size_t> m_head_1;
std::atomic<size_t> m_head_2;
std::atomic<size_t> m_tail_1;
std::atomic<size_t> m_tail_2;
};
#endif
А вот тест я написал:
#include <chrono>
#include <thread>
#include <vector>
#include "LockFreeMPMCQueue.h"
std::chrono::microseconds::rep test(LockFreeMPMCQueue<size_t>& queue, char* memory, const size_t num_threads, const size_t num_values)
{
memset(memory, 0, sizeof(char) * num_values);
const size_t num_values_per_thread = num_values/num_threads;
std::thread* reader_threads = new std::thread[num_threads];
std::thread* writer_threads = new std::thread[num_threads];
auto start = std::chrono::high_resolution_clock::now();
for (size_t i = 0; i < num_threads; ++i)
{
reader_threads[i] = std::thread([i, &queue, memory, num_values_per_thread]()
{
for (size_t x = 0; x < num_values_per_thread; ++x)
{
size_t value;
while (!queue.try_dequeue(value))
{
}
memory[value] = 1;
}
});
}
for (size_t i = 0; i < num_threads; ++i)
{
writer_threads[i] = std::thread([i, &queue, num_values_per_thread]()
{
const size_t offset = i * num_values_per_thread;
for (size_t x = 0; x < num_values_per_thread; ++x)
{
const size_t value = offset + x;
while (!queue.try_enqueue(value))
{
}
}
});
}
for (size_t i = 0; i < num_threads; ++i)
{
reader_threads[i].join();
writer_threads[i].join();
}
auto time_taken = std::chrono::high_resolution_clock::now() - start;
delete[] reader_threads;
delete[] writer_threads;
bool fail = false;
for (size_t i = 0; i < num_values; ++i)
{
if (memory[i] == 0)
{
printf("%u = 0\n", i);
fail = true;
}
}
if (fail)
{
printf("FAIL!\n");
}
return std::chrono::duration_cast<std::chrono::milliseconds>(time_taken).count();
}
int main(int argc, char* argv[])
{
const size_t num_threads_max = 16;
const size_t num_values = 1 << 12;
const size_t queue_size = 128;
const size_t num_samples = 128;
LockFreeMPMCQueue<size_t> queue(queue_size);
char* memory = new char[num_values];
const double inv_num_samples = 1.0/double(num_samples);
for(size_t num_threads = 1; num_threads <= num_threads_max; num_threads *= 2)
{
double avg_time_taken = 0.0;
for(size_t i = 0; i < num_samples; ++i)
{
avg_time_taken += test(queue, memory, num_threads, num_values) * inv_num_samples;
}
printf("%u threads, %u ms\n", num_threads, avg_time_taken);
}
delete[] memory;
char c;
scanf("%c", &c);
return 0;
}
Любая помощь очень ценится!
Код с ошибками часто делает что-то отличное от ожидаемого. Вот почему мы так стараемся, чтобы избежать ошибок. –
К сожалению, я не проверял ваш код, но: 1. всегда есть проблема с параллельным кодом: он может показаться сработавшим ... пока он не сработает. Рекомендация: напишите больше тестов (особенно с большим количеством протекторов и бегом в течение длительного времени). 2. Архитектура x86 имеет сильную модель памяти, скрывающую некоторые ошибки. (например, неправильный код может работать на x86, но сбой в ARM). 3. Используйте Sanitizer Thread. –
Имена, начинающиеся с символа подчеркивания, за которым следует большая буква ('__LOCKFREEMPMCQUEUE_H__'), и имена, содержащие два последовательных символа подчеркивания, зарезервированы для реализации. Не используйте их. –