2012-04-17 3 views
0

Я понял. Глупая ошибка с моей стороны, я фактически не удалял элемент из очереди, я просто читал первый элемент. Я изменил код и код ниже никаких работ. Спасибо всем за помощь.производитель/потребитель, использующий форсированные потоки и зависающий круговой буфер

Я пытаюсь реализовать проблему потребителя-производителя с помощью boost, который на самом деле является частью более крупного проекта. Я реализовал программу из примеров в Интернете и даже какую-то помощь, которую я нашел здесь. Однако в настоящее время мой код просто зависает. Основываясь на некоторых хороших советах, я решил использовать буфер буфера ciruclar для хранения моих данных между производителем и потребителем. Много похожего кода, и я смог объединить идеи с ними и написать что-то самостоятельно. Тем не менее, я все еще, похоже, сталкиваюсь с той же проблемой, что и раньше (это моя программа просто зависает). Я думал, что я не сделал ту же ошибку, что и раньше.

Мой код приведен ниже, я вынул свой более ранний код, где я только свой собственный список ссылок.

Буфер заголовка:

#ifndef PCDBUFFER_H 
#define PCDBUFFER_H 

#include <pcl/io/pcd_io.h> 
#include <boost/thread/mutex.hpp> 
#include <boost/thread/condition.hpp> 
#include <boost/circular_buffer.hpp> 

class pcdBuffer 
{ 
    public: 
     pcdBuffer(int buffSize); 
     void put(int data); 
     int get(); 
     bool isFull(); 
     bool isEmpty(); 
     int getSize(); 
     int getCapacity(); 
    private: 
     boost::mutex bmutex; 
     boost::condition_variable buffEmpty; 
     boost::condition_variable buffFull; 
     boost::circular_buffer<int> buffer; 
}; 


#endif 

источник буфера (только соответствующие разделы):

#include "pcdBuffer.h" 
#include <iostream> 

//boost::mutex io_mutex; 

pcdBuffer::pcdBuffer(int buffSize) 
{ 
    buffer.set_capacity(buffSize); 
} 

void pcdBuffer::put(int data) 
{ 
    { 
     boost::mutex::scoped_lock buffLock(bmutex); 
     while(buffer.full()) 
     { 
      std::cout << "Buffer is full" << std::endl; 
      buffFull.wait(buffLock); 
     } 
     buffer.push_back(data); 
    } 
    buffEmpty.notify_one(); 
} 

int pcdBuffer::get() 
{ 
    int data; 
    { 
     boost::mutex::scoped_lock buffLock(bmutex); 
     while(buffer.empty()) 
     { 
      std::cout << "Buffer is empty" << std::endl; 
      buffEmpty.wait(buffLock); 
     } 
     data = buffer.front(); 
      buffer.pop_front(); 
    } 
    buffFull.notify_one(); 
    return data; 
} 

основной драйвер для кода:

#include <iostream> 
#include <boost/thread/mutex.hpp> 
#include <boost/thread/thread.hpp> 
#include <unistd.h> 
#include "pcdBuffer.h" 

pcdBuffer buff(100); 

void producer() 
{ 
    int i = 10; 
    while (true) 
    { 
     buff.put(i); 
     i++; 
    } 
} 

void consumer() 
{ 
    int i; 
    while(true) 
    { 
     i = buff.get(); 
     std::cout << "Data: " << i << std::endl; 
    } 
} 

int main(int argc, char** argv) 
{ 
    std::cout << "Starting main...." << std::endl; 
    std::cout << "Buffer Details: " << std::endl; 
    std::cout << "Capacity: " << buff.getCapacity() << ", isEmpty: " << buff.isEmpty() << ", isFull: " << buff.isFull() << std::endl; 
    boost::thread cons(consumer); 
    sleep(5); 
    boost::thread prod(producer); 
    prod.join(); 
    cons.join(); 
    return 0; 
} 

Моя буферная емкость правильно инициализированы до 100 . Потребительский поток ждет и сообщает, что «буфер пуст» в течение 5 секунд, но после этого я просто получаю «буфер заполнен» от метода put и «Data: 10» от функции пользователя, чередующейся на stdout. Как вы видите, 10 - это первый элемент, который я вложил. Кажется, что буфер заполняется и не уведомляет потребителя, но я проверял свои блокировки и думаю, что они правы. Любая помощь по этому поводу очень ценится.

Вот ссылка ссылок, из которых я писал этот код:

http://www.boost.org/doc/libs/1_49_0/libs/circular_buffer/doc/circular_buffer.html#classboost_1_1circular__buffer_19ba12c0142a21a7d960877c22fa3ea00

http://www.drdobbs.com/article/print?articleId=184401518&siteSectionName=cpp

Thread safe implementation of circular buffer

+0

Посмотрите на это:.. Http: // www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html – stefaanv

+0

Эта статья действительно интересна, однако, еще одна причина, по которой я хочу избежать очередей stl, - это скорость. структура данных, которую я буду делиться между потоками, представляет собой облако точек, представляющее захват кадра из камеры, подобной kinect. Я хочу, чтобы кадры захвата составляли 30 кадров в секунду и сохраняли его на диске. Поэтому я хотел бы как можно больше свести к минимуму мои накладные расходы. причина, по которой я избегал STL Очереди. Я попытался разблокировать свои блокировки, прежде чем уведомлять переменную условия, как предлагает статья, но она все еще зависает. Я обновил свой код здесь. – shaun

+5

Сначала заставьте его работать, а затем сделайте это быстро. Избегайте предположений о том, какие части вашего кода/дизайна будут медленными. Избегайте преждевременной оптимизации. Сосредоточьте свою работу на важной части, используя, по возможности, стандартные компоненты. Если это окажется слишком медленным, ** измерьте, какая часть медленно, ТОГДА оптимизируйте эту часть **. –

ответ

4

Прежде всего, вместо того чтобы писать свой собственный список, вы могли бы просто оберните std::list в pcdQueue вместо того, чтобы писать свои собственные. Правильно, что std::list не является потокобезопасным as-is, но вы все равно предоставляете необходимые примитивы синхронизации в своем классе.

Причина, по которой ваша программа зависает: Вы сохраняете блокировку и заполняете очередь до ее заполнения. Ваш недостаток потребителя через notify_one бесполезен, потому что ваш потребитель снова заблокируется, так как мьютекс уже занят (блокировкой производителя).

Когда вы окончательно освободите замок (когда очередь заполнена), ожидая на condition_variable, вы не пробудите своего потребителя, поэтому и ваш потребитель, и ваш производитель заблокированы и ваша программа зависает.

Изменить его:

void pcdQueue::produce() 
{ 
    int i=0; 
    while(true) 
    { 
     { 
      boost::mutex::scoped_lock lock(qmutex); 
      while(! qlen < buffSize) { 
       std::cout << "Queue is full" << std::endl; 
       full.wait(lock); 
      } 

      enqueue(i); // or myList.push_back(i) if you switch to std::list 
     } 

     empty.notify_one(); 


    } 
} 

У вас есть те же самые проблемы в вашем методе consume().Измените его на:

pcdFrame* pcdQueue::consume() 
{ 
    pcdFrame *frame; 

    { 
     boost::mutex::scoped_lock lock(qmutex); 
     while(qlen == 0) { 
      std::cout << "Queue is empty" << std::endl; 
      empty.wait(lock); 
     } 

     frame = dequeue(); 
    } 
    full.notify_one(); 

    return frame; 
} 

В целом, позаботьтесь и обратите внимание, что уведомления вступают в силу только в том случае, если кто-то ждет. В противном случае они «потеряны». Обратите внимание, что вам не нужно блокировать блокировку мьютекса при вызове notify_one (на самом деле это может привести к дополнительным издержкам контекстного переключателя, поскольку вы пробуждаете другой поток, который затем будет ждать мьютекса, который в настоящее время) заблокирован (вами). Итак, сначала отпустите мьютекс, затем скажите другой теме продолжить.

Обратите внимание, что оба потока выполняются бесконечно, поэтому ваша основная программа все равно будет висела на первом join() и никогда не выйдет. Вы можете включить флаг остановки, который вы проверите в своем while -loops, чтобы сообщить, что ваши потоки завершатся.

0

Boost теперь предлагает тип очереди производителей/потребителей, который в значительной степени свободен от блокировки, хотя он МОЖЕТ блокироваться, если заполнение очереди вверх.

Вы найдете документацию здесь:

http://www.boost.org/doc/libs/1_54_0/doc/html/lockfree.html

Это довольно недавнее дополнение, так что я не думаю, что это часть многих стандартных пакетов еще. С другой стороны, это похоже на большинство заголовков, все в зависимости от довольно низкоуровневого системного материала, который долгое время был в состоянии повышения.

0

Я столкнулся с тем же вопросом, что и висит. Благодаря ответу Йоханнеса, я смог заставить его работать полностью. Однако мне пришлось использовать full.wait_for (lock, boost :: chrono :: milliseconds (100)); как для потребителя, так и для производителя, чтобы предотвратить зависание при использовании очень короткого кольцевого буфера (3-4 элемента).

Кроме того, вместо того, чтобы некоторое время (истина), я в то время как (buffer-> пустой()

Наконец, все работает надежно и быстро

Смежные вопросы