2014-01-23 4 views
1

Ниже приведен код, который я пытаюсь решить проблему с производителем, используя потоки posix. В моем коде я использую переменную условия для ожидания и мьютекса сигнала. Мой код позволяет мне настроить количество потоков производителей и потребителей.Posix Thread - (mutex and condition variable) issue,

Но когда количество потребительских потоков больше, чем количество потоков производителей, моя программа зависает и не заканчивается.

Я думаю, что функция потока производителей завершила работу с потребительскими потоками, поставив максимальные значения в очередь. И ожидающие потребительские потоки не получают сигнал. Хотя до завершения продюсерский поток генерировал сигнал для каждого значения, добавленного в очередь.

Нужна помощь в выявлении проблемы в коде, чтобы она выполнялась для любой комбинации количества производителей и потребительских потоков.

#include<iostream> 
#include<queue> 
#include<pthread.h> 

using namespace std; 

pthread_cond_t c=PTHREAD_COND_INITIALIZER; 
pthread_mutex_t m=PTHREAD_MUTEX_INITIALIZER; 

long max_item=50000; 
long produced=0; 
long consumed=0; 

const long producer_count=5; 
const long consumer_count=15; 

queue<long>q; 

void * producer(void *) 
{ 
    long i=1; 
    while(true) 
    { 
     pthread_mutex_lock(&m); 
     if(produced==max_item) 
     {pthread_mutex_unlock(&m);break;} 

     q.push(i++);++produced; 
     pthread_cond_signal(&c); 
     pthread_mutex_unlock(&m); 
    } 
    pthread_mutex_lock(&m); 
    cout<<"Producer produced:"<<i-1<<endl; 
    pthread_mutex_unlock(&m); 

} 
void *consumer(void *) 
{ 
    long i=1; 
    long val=0; 
    while(true) 
    { 
     pthread_mutex_lock(&m); 
     if(consumed==max_item){pthread_mutex_unlock(&m);break;}  
     while(q.empty()) 
      {pthread_cond_wait(&c,&m);} 
     val=q.front();q.pop(); 
     ++i,consumed++; 
     pthread_mutex_unlock(&m); 
    } 
    pthread_mutex_lock(&m); 
    cout<<"Consumer consumed:"<<i-1<<endl; 
    pthread_mutex_unlock(&m); 
} 
int main() 
{ 
    pthread_t p[producer_count],c[consumer_count]; 

    for(int i=0;i<producer_count;++i) 
    pthread_create(&p[i],NULL,producer,NULL); 

    for(int i=0;i<consumer_count;++i) 
    pthread_create(&c[i],NULL,consumer,NULL); 

    for(int i=0;i<producer_count;++i) 
    pthread_join(p[i],NULL); 

    for(int i=0;i<consumer_count;++i) 
    pthread_join(c[i],NULL); 

    cout<<"Total produced:"<<produced<<endl; 
    cout<<"Total consumed:"<<consumed<<endl; 
    cout<<"Queue size at end:"<<q.size()<<endl; 

    pthread_mutex_destroy(&m); 
    return 0; 
} 


Output: 
$ ./a.out 
Producer produced:12740 
Consumer consumed:3351 
Producer produced:16948 
Consumer consumed:2512 
Consumer consumed:3383 
Producer produced:4892 
Producer produced:7417 
Consumer consumed:5374 
Consumer consumed:4550 
Producer produced:8003 
Consumer consumed:5229 
Consumer consumed:2023 
Consumer consumed:1366 
Consumer consumed:3346 
Consumer consumed:2231 

--Program hangs here----- 


Output from different run when both producer and consumer thread count are set to 15. 
$ ./a.out 
Producer produced:1276 
Producer produced:2162 
Producer produced:1401 
Producer produced:505 
Producer produced:455 
Producer produced:1900 
Producer produced:2522 
Producer produced:901 
Producer produced:308 
Producer produced:1461 
Producer produced:755 
Producer produced:102 
Producer produced:21332 
Consumer consumed:514 
Consumer consumed:1335 
Consumer consumed:4219 
Consumer consumed:1422 
Consumer consumed:644 
Consumer consumed:231 
Producer produced:14191 
Consumer consumed:621 
Consumer consumed:541 
Consumer consumed:1505 
Consumer consumed:32234 
Consumer consumed:1985 
Producer produced:729 
Consumer consumed:2723 
Consumer consumed:1012 
Consumer consumed:69 
Consumer consumed:945 
Total produced:50000 
Total consumed:50000 
Queue size at end:0 

---Program completes here after showing produced and consumed count are equal and queue is fully consumed. 

ответ

1

Потребитель петли до тех пор, пока он получает все произведенные предметы, но с большим количеством потребителей, элементы будут разделены, так что вам нужно еще проверить, чтобы увидеть, когда потребитель закончил. Производитель может установить готовое логическое значение и сигнализировать, что с той же переменной условия с pthread_cond_broadcast, поэтому потребители должны проверить это в заблокированном мьютексе перед ожиданием в пустой очереди.

+0

В потребительской цепочке сразу после блокировки мьютекса код проверяет, используется ли (потребляется == max_item), т.е. все значения потребляются из очереди. Если вы не выбираете значение из очереди, если очередь пуста, тогда выведите условие ожидания. Как потребляется == max_item сразу после блокировки мьютекса, он должен возвращать true, если все элементы потребляются и прерываются от цикла после разблокировки мьютекса. – user3227537

+0

Потребляемый никогда не будет max_item, потому что элементы разделены над потребителями, поэтому потребители будут ждать пустую очередь – stefaanv

+0

Кажется, я присвоил неправильное имя переменной, я проверил код, (i) является локальной переменной подсчета для потребительский поток и (потребляемый) - это глобальный счет, используемый всеми потребительскими потоками. Поскольку (потребляемый) доступен и проверяется в мьютексе, первая строка после блокировки должна прерываться, когда цикл потребления (общий от всего потребителя) равен max_item, а потребительский поток не должен перейти к следующему шагу (т.е. ждать, когда очередь пуста). – user3227537