2014-12-17 4 views
2

У меня есть класс очереди, данные которой хранятся в векторе:C++ очистка очереди и безопасность потока

std::vector<boost::shared_ptr<rxImage> > queue; 

Существует поток, который добавляет к очереди, которая основана на этом цикле:

while(runRxThread){ 
    this->rxImage(); 
} 

Где rxImage() определяется по формуле:

zmq::message_t img; 
imageSocket->recv(&img); 

//addToQueue is a push back: 
//queue.push_back(boost::shared_ptr<rxImage> (new rxImage(data, imgSize))); 
localQueue->addToQueue((unsigned char*) img.data()); 

изображения, полученные в порядке в этой теме (я тестировал с 10000 или так, и это кажется прекрасным).

переменная

runRxThread устанавливается через некоторые функции инкубационные в классе, что функция потока определяется в

При выполнении процесса в основном потоке, таких как:.

startRx(); //start the thread 

/*process to stimulate the sending of network data from another program*/ 

stopRX(); //stop the thread from accessing the queue 

queue.clear(); 

Существует segfault, вызванный clear(). Я проверил, что это определенно эта линия, а не внутренняя сантехника объектов, и это определенно есть.

Как представляется, проблема безопасности потоков, но я не знаю, как ее исправить, и, что более важно, я не знаю почему. Я понимаю, что два потока могут записываться в одну и ту же память, но не в одно и то же время. Разумеется, установив переменные runRxThread, я гарантирую, что этого не произойдет.

Мне очень понравилось бы решение, которое не связано с мьютексами или семафорами - я действительно не думаю, что они должны быть необходимы для такой проблемы.

Спасибо!

EDIT: runRXThread летуч и петля нити теперь:

while(1){ 
    if(runRxThread == 1){ 
     this->rxImage(); 
    } 
} 

EDIT2: «использовать семафор разделяемых объектов»

ОК, это явно проблема безопасности нити, мне нужно сделайте мои общие переменные потокобезопасными. Но ...

1) rxImage(); не прекращается, если не отправляются данные

2) Сегфакт происходит в пределах rxImage();

3) Если я заблокировать очереди с взаимной блокировкой, конечно, программа будет висеть в rxImage пока нет данных, потому что мьютекс не будет выпущен

4) Там не будет никаких данных, посланные, чтобы программа будет вечно вечно.

Возможно, это неправильно?

EDIT3:

Я изменил rxImage(), чтобы быть не блокировка:

zmq::message_t img; 
imageSocket->recv(&img,ZMQ_NOBLOCK); 
if((int)img.size() > 0){ 
    cout<<"in the thread conditional"<<endl;  
    localQueue->addToQueue((unsigned char*) img.data()); 
    cout<<"leaving thread conditional"<<endl; 
} 

Проблема раньше было очевидно, что localQueue писался, когда я убирала очередь. Теперь очередь может быть записана только в этой функции, когда есть данные для ее записи.Я могу гарантировать, что, когда я вызываю clear(), нет данных для записи, ((int) img.size()> 0) возвращает false, а очередь не обращается к потоку. Почему все еще существует segfault? Несомненно, это доказывает, что этот поток не вызывает segfault?

Вот терминальный вывод:

in the thread 
pushing back1 of size: 16000000 
Added image to queue. queue size: 650 
leaving thread conditional 

image server stopped 
stopping image server 
clearing vector 
Segmentation fault 

Можно видеть, что поток завершается с вектором, то сервер изображение останавливается, то вектор очищается. Именно в этом порядке без какого-либо непредсказуемого поведения. Но есть еще segfault.

+2

Re. Изменить: volatile не является потокобезопасным в C++. Вам нужны синхронизация/доступ к атомной памяти – sehe

+0

Да, вам нужно вызвать imageSocket-> recv (&img);), чтобы прервать его как-то. Обычно api связи имеет функцию Stop() или Abort(), которая является потокобезопасной и которую вы можете вызвать из stopRX() Это приведет к тому, что recv вызовет исключение или вернет код ошибки. Когда вы получите эту ошибку, вернитесь из rxImage(). –

+0

Спасибо, я добавил неблокирующую версию функции, чтобы остановить зависание, см. Edit three. – user2290362

ответ

2

Я ОП, я исправил проблему.

Проблема явно не проблема разногласий, как это было предложено другими пользователями. Это подтверждается в 3-м изначальном вопросе. Выход терминала имитирует, где мьютекс был заблокирован и выпущен, и доказывает, что они необходимы в этом случае - по мере того, как потоки синхронизируются по сети. Я согласен с тем, что это очень маленькое дело.

Я проследил проблему до деструктора класса изображения, который находится в очереди, переменная удалена, и это вызывает segfault.

4

При доступе к изменяемым общим данным из двух потоков вам необходимо защитить от гонок данных. Неважно, насколько простая может быть ваша проблема, вы не можете гарантировать правильность вашего кода, если у него есть гонка данных. Типичным решением является использование мьютекса или тому подобного, чтобы гарантировать, что только один поток одновременно обращается к совместно используемому состоянию. Вам не пришлось бы делать это вручную, если используемая вами очередь была потокобезопасной (std :: vector, очевидно, нет).

Ниже приведен пример потоковой безопасности, но, похоже, нет операции clear(): http://www.boost.org/doc/libs/1_53_0/doc/html/boost/lockfree/queue.html. Фактически, он не блокируется, поэтому он не использует мьютекс, но это не означает, что он проще, чем потоковая сеть с мьютексом. На самом деле это наоборот - трудно написать правильный код без блокировки.

+0

lockfree - это совсем другой бизнес, чем просто потокобезопасный.Блокировка бесплатно не является общеприменимой (если только для низкой пропускной способности низкой загрузки, когда вы можете позволить себе нагрузку на сервер) – sehe

+0

Я сказал что-то, что противоречит этому утверждению? –

+0

Да. Указывая - исключительно - на ресурс о беспорядочных очередях для очевидного новичка, который не сделает этого распознавания – sehe

5

Ваша раса данные здесь:

while(runRxThread){ 
    this->rxImage(); 
} 

Вы не проверять runTxThread() на протяжении цикла (не говоря уже о том, что если runRxThread не отмечен летучим, он не может даже быть прочитан из основной памяти, но «предполагается» неизменным в регистре.

(ПРИМЕЧАНИЕ даже с летучими гонка там я просто указываю, компилятор предполагает однопоточных абстрактную машину, если явные atomic режимы упорядочения памяти не используются)

Вам необходимо обоюдное исключение.

+0

Я не верю, что volatile (в компиляторе, совместимом с C++ 11) поможет. Некоторые упоминания о том, почему здесь: http://stackoverflow.com/questions/12878344/volatile-in-c11 –

+0

@ScottLangham Я никогда не говорил, что это поможет. Я упомянул об этом, потому что он не изменчивый, его даже не может читать! – sehe

+0

Я добавил флаг volatile и исправил цикл (см. Редактирование сообщения). Это не решает проблему. Почему определенно определенная гонка данных? Конечно, этот поток должен прекратить доступ к очереди ... runRxThread должен действовать как блокировка очереди – user2290362

3

Проблема в том, что даже если вы установили runRxThread в значение false, поток все еще может обрабатывать вещи в пределах this->rxImage() и может иметь доступ к вектору. Вам нужно дождаться, пока это закончится, и еще раз проверьте условие цикла, прежде чем разрешить основной поток очистить вектор. Было бы нехорошо «очищать» вектор, пока поток все равно обращается к нему.

Итак, вы должны дождаться завершения this->rxImage(), прежде чем разрешить основной поток очистить вектор.

Одним из решений было бы получить StopRx(), чтобы дождаться вашего «потока, который добавляется в очередь» для завершения, вызывая thread.join() после установки runRxThread false (при условии, что вы используете std :: thread).

Я бы посоветовал вам также изменить runRxThread на тип std :: atomic, чтобы обе потоки всегда имели последовательное представление о том, что это за значение.

+0

rxImage() не будет завершен, если не будут получены данные, и данные поступают из другой программы, которая контролируется этой программой - так что нет никаких данных, которые будут получены в функции, никогда не прекратятся. Есть ли способ обойти это? – user2290362

+0

Обычно вы подключаете api (imageSocket в вашем случае), чтобы иметь доступную функцию, которая может остановить или прервать любые текущие операции связи. StopRx() также должен вызвать эту функцию. Прерывание, надеюсь, приведет к тому, что imageSocket-> recv (&img);) вернет код ошибки или выдаст исключение. Вам нужно найти эту ошибку и выйти из rxImage(), когда это произойдет. –

3

Мое понимание состоит в том, что два потока могут записываться в одну и ту же память, но не в одно и то же время.

Если вы не добавили явной синхронизации с вашим кодом (например,используя мьютексы, семафоры или атомные операции), вы не можете осмысленно сказать, происходят ли два события «одновременно» или нет. Без синхронизации вы не можете сказать, что одно происходит даже до другого.

Мне очень понравилось решение, которое не связано с мьютексами или семафорами - я действительно не думаю, что они должны быть необходимы для такой проблемы.

Вы ошибаетесь. Вам либо нужно что-то вроде мьютекса, либо что-то много сложнее, как блокировка очереди с использованием атомных операций.

Поскольку вы не являетесь экспертом в этой области, просто используйте мьютекс, чтобы защитить все общие данные, к которым вы обращаетесь, из нескольких потоков (если только все обращений только для чтения и записи нет).

+0

Ну да, если вы просто заблокируете мьютексы и подождите, пока он будет заблокирован навсегда, поэтому не делайте этого. Вам нужно заблокировать мьютексы, проверить, есть ли данные, затем разблокировать, чтобы дать другому потоку возможность создавать данные. И вместо того, чтобы просто вращаться в цикле, проверяя , вы можете использовать переменную условия, чтобы заставить пользователя ждать, пока производитель не сообщит, что есть доступные данные. Найдите учебник, это очень распространенные идиомы, вы не должны пытаться писать многопоточный код, гадая. –

0

В случае параллелизма с использованием Mutex замки являются хорошим вариантом. Вы можете использовать блокировки в методах push и pop вашего класса.

Когда вы нажимаете, выскакиваете или получаете доступ к данным Mutex замки обеспечат безопасность потока.

Позвольте мне продемонстрировать это -

Предположим, вы получаете доступ к очереди, делая операцию, предположим, размер очереди 5. Если заблокировать блок этой операции, используя тот же мьютекса замок, который в настоящее время используется в то время как push, pop и другие операции в очереди. Тогда другие операции в очереди не будут выполняться до тех пор, пока не будет выполнен рабочий блок.

Для примера см этого простого псевдокода пример

CQUEUE::Push(element) 
{ 
    Mutex lock; // this is a pseudo code please ignore syntax error. You can find the exact syntax in any where through the web :P 
    AddToQUEUE(element); 

    //other operations under the same mutex will not be executed if the mutex lock variable is same 
} 

Пожалуйста, обратите внимание, что при использовании мьютекса замков могут вызвать тупики, если он не используется должным образом. Поэтому, пожалуйста, используйте Mutex с большой осторожностью.

Смежные вопросы