2015-07-15 3 views
3

Я программирую незащищенную однопользовательскую однопроцессорную растущую очередь на C++ для системы реального времени. Внутренняя очередь работает, но она должна быть устойчивой. Нить производителя в режиме реального времени, поэтому любая операция должна быть детерминированной (так что нет ожиданий, блокировок, распределения памяти), а потребительский поток - нет.Определение безопасности удаления параллельной очереди

Таким образом, идея состоит в том, что потребительский поток иногда увеличивает размер очереди, если это необходимо. Реализация очереди такова, что потребительский конец не может расти. Поэтому фактическая очередь косвенно обернута внутри объекта, который отправляет вызовы, а фактический рост реализуется путем замены ссылки на внутреннюю очередь на новую, сохраняя при этом старую ветку, если поток производителя использует ее.

Проблема заключается в том, что я не могу понять, как доказать, когда нить производителя перестает использовать старую очередь, и поэтому ее можно безопасно удалить, не прибегая к блокировкам. Вот псевдо-представление кода:

template<typename T> 
class queue 
{ 
public: 

    queue() 
     : old(nullptr) 
    { 
     current.store(nullptr); 
     grow(); 
    } 

    bool produce(const T & data) 
    { 
     qimpl * q = current.load(); 
     return q->produce(data); 
    } 

    bool consume(T & data) 
    { 
     // the queue has grown? if so, a new and an old queue exists. consume the old firstly. 
     if (old) 
     { 
      // here is the problem. we never really know when the producer thread stops using 
      // the old queue and starts using the new. it could be concurrently halfway-through inserting items 
      // now, while the following consume call fails meanwhile. 
      // thus, it is not safe yet to delete the old queue. 
      // we know however, that it will take at most one call to produce() after we called grow() 
      // before the producer thread starts using the new queue. 

      if (old->consume(data)) 
      { 
       return true; 
      } 
      else 
      { 
       delete old; 
       old = nullptr; 
      } 
     } 

     if (current.load()->consume(data)) 
     { 
      return true; 
     } 
     return false; 
    } 
    // consumer only as well 
    void grow() 
    { 
     old = current.load(); 
     current.store(new qimlp()); 
    } 
private: 

    class qimpl 
    { 
    public: 
     bool produce(const T & data); 
     bool consume(const T & data); 
    }; 

    std::atomic<qimpl *> current; 
    qimpl * old; 
}; 

Обратите внимание, что ATOMIC_POINTER_LOCK_FREE == 2 является условием для кода для компиляции. Единственное доказуемое условие, которое я вижу, заключается в том, что при вызове grow() следующий вызов production() будет использовать новую внутреннюю очередь. Таким образом, если количество атомов внутри продукта увеличивается каждый раз, то его безопасно удалять старую очередь в N + 1, где N - это счет во время вызова grow(). Проблема, однако, в том, что вам тогда необходимо атомизировать swap новый указатель и хранить счет, который кажется невозможным.

Любые идеи можно только приветствовать, и для справки, это то, как система будет работать:

queue<int> q; 

void consumer() 
{ 
    while (true) 
    { 
     int data; 

     if (q.consume(data)) 
     { 
      // .. 
     } 
    } 

} 

void producer() 
{ 
    while (true) 
    { 
     q.produce(std::rand()); 
    } 
} 

int main() 
{ 
    std::thread p(producer); std::thread c(consumer); 
    p.detach(); c.detach(); 
    std::this_thread::sleep_for(std::chrono::milliseconds(1000)); 
} 

EDIT: Хорошо, проблема решается в настоящее время. Мне стало ясно, что старая очередь оказывается устаревшей, когда предмет переносится в новую очередь. Таким образом, фрагмент кода теперь выглядит следующим образом:

bool pop(T & data) 
{ 
    if (old) 
    { 
     if (old->consume(data)) 
     { 
      return true; 
     } 

    } 
    // note that if the old queue is empty, and the new has an enqueued element, we can conclusively 
    // prove that it is safe to delete the old queue since it is (a) empty and (b) the thread state 
    // for the producer is updated such that it uses all the new entities and will never use the old again. 
    // if we successfully dequeue an element, we can delete the old (if it exists). 
    if (current.load()->consume(data)) 
    { 
     if (old) 
     { 
      delete old; 
      old = nullptr; 
     } 
     return true; 
    } 
    return false; 
} 

ответ

0

Я не совсем понимаю, использование grow() в вашем алгоритме, но мне кажется, что вам нужно какое-то чтение Прописи Update (РКО) механизм для safetly удаления очереди когда это не требуется.

Этот article описывает различные варианты этого механизма, связанные с Linux, но вы можете использовать google RCU, подходящие для других платформ.

+0

Рост на самом деле не реализован. Он просто расширяет очередь (путем замены старой очереди на новую, более крупную). Я прочитал большую часть статьи, но я не смог определить применимый шаблон или прецедент для rcu, это в основном барьеры памяти? Кроме того, этот проект является кросс-платформенным. – Shaggi

+0

@Shaggi: Замена старого объекта на новый - фактически основной прецедент RCU. И проблема, когда безопасно удалять старый объект, также является основной проблемой реализации RCU. Обычно эта проблема решается с использованием intervension в некоторые состояния * quiescent *, где гарантируется, что данный поток/процессор не использует объект и которые являются обычными. – Tsyvarev

Смежные вопросы