Я программирую незащищенную однопользовательскую однопроцессорную растущую очередь на C++ для системы реального времени. Внутренняя очередь работает, но она должна быть устойчивой. Нить производителя в режиме реального времени, поэтому любая операция должна быть детерминированной (так что нет ожиданий, блокировок, распределения памяти), а потребительский поток - нет.Определение безопасности удаления параллельной очереди
Таким образом, идея состоит в том, что потребительский поток иногда увеличивает размер очереди, если это необходимо. Реализация очереди такова, что потребительский конец не может расти. Поэтому фактическая очередь косвенно обернута внутри объекта, который отправляет вызовы, а фактический рост реализуется путем замены ссылки на внутреннюю очередь на новую, сохраняя при этом старую ветку, если поток производителя использует ее.
Проблема заключается в том, что я не могу понять, как доказать, когда нить производителя перестает использовать старую очередь, и поэтому ее можно безопасно удалить, не прибегая к блокировкам. Вот псевдо-представление кода:
template<typename T>
class queue
{
public:
queue()
: old(nullptr)
{
current.store(nullptr);
grow();
}
bool produce(const T & data)
{
qimpl * q = current.load();
return q->produce(data);
}
bool consume(T & data)
{
// the queue has grown? if so, a new and an old queue exists. consume the old firstly.
if (old)
{
// here is the problem. we never really know when the producer thread stops using
// the old queue and starts using the new. it could be concurrently halfway-through inserting items
// now, while the following consume call fails meanwhile.
// thus, it is not safe yet to delete the old queue.
// we know however, that it will take at most one call to produce() after we called grow()
// before the producer thread starts using the new queue.
if (old->consume(data))
{
return true;
}
else
{
delete old;
old = nullptr;
}
}
if (current.load()->consume(data))
{
return true;
}
return false;
}
// consumer only as well
void grow()
{
old = current.load();
current.store(new qimlp());
}
private:
class qimpl
{
public:
bool produce(const T & data);
bool consume(const T & data);
};
std::atomic<qimpl *> current;
qimpl * old;
};
Обратите внимание, что ATOMIC_POINTER_LOCK_FREE == 2 является условием для кода для компиляции. Единственное доказуемое условие, которое я вижу, заключается в том, что при вызове grow() следующий вызов production() будет использовать новую внутреннюю очередь. Таким образом, если количество атомов внутри продукта увеличивается каждый раз, то его безопасно удалять старую очередь в N + 1, где N - это счет во время вызова grow(). Проблема, однако, в том, что вам тогда необходимо атомизировать swap новый указатель и хранить счет, который кажется невозможным.
Любые идеи можно только приветствовать, и для справки, это то, как система будет работать:
queue<int> q;
void consumer()
{
while (true)
{
int data;
if (q.consume(data))
{
// ..
}
}
}
void producer()
{
while (true)
{
q.produce(std::rand());
}
}
int main()
{
std::thread p(producer); std::thread c(consumer);
p.detach(); c.detach();
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
EDIT: Хорошо, проблема решается в настоящее время. Мне стало ясно, что старая очередь оказывается устаревшей, когда предмет переносится в новую очередь. Таким образом, фрагмент кода теперь выглядит следующим образом:
bool pop(T & data)
{
if (old)
{
if (old->consume(data))
{
return true;
}
}
// note that if the old queue is empty, and the new has an enqueued element, we can conclusively
// prove that it is safe to delete the old queue since it is (a) empty and (b) the thread state
// for the producer is updated such that it uses all the new entities and will never use the old again.
// if we successfully dequeue an element, we can delete the old (if it exists).
if (current.load()->consume(data))
{
if (old)
{
delete old;
old = nullptr;
}
return true;
}
return false;
}
Рост на самом деле не реализован. Он просто расширяет очередь (путем замены старой очереди на новую, более крупную). Я прочитал большую часть статьи, но я не смог определить применимый шаблон или прецедент для rcu, это в основном барьеры памяти? Кроме того, этот проект является кросс-платформенным. – Shaggi
@Shaggi: Замена старого объекта на новый - фактически основной прецедент RCU. И проблема, когда безопасно удалять старый объект, также является основной проблемой реализации RCU. Обычно эта проблема решается с использованием intervension в некоторые состояния * quiescent *, где гарантируется, что данный поток/процессор не использует объект и которые являются обычными. – Tsyvarev