0

Я отделил проблему потребительского/продюсера от своего приложения, чтобы убедиться, что мои потоки работают так, как должны.Многопоточный производитель/потребитель с четырьмя очередями

У меня есть одна нить производителя и поток пулов потребителей: в моем приложении один поток принимает соединения и ставит их в очередь (в пределах моей собственной структуры) в одной из четырех очередей, а четыре потока выходят из очередей и обрабатывают с подключениями, стоящими перед ними; здесь мои очереди будут содержать случайные int между 1 и 4, без настраиваемой структуры.

Четыре mutex обеспечивают защиту данных для каждой очереди (плюс один мьютекс для достойного cout на терминале при печати очередей); a priority_queue используется для синхронизации удаления из четырех очередей. Строка-производитель нажимает новое значение int в правой очереди, а затем толкает тоже в priority_queue, так что когда поток хочет прочитать, ему сначала нужно pop() от priority_queue, чтобы понять, какая очередь была нажата (поскольку она сортируется, после некоторых случайное нажатие моего priority_queue будет выглядеть как 1 1 1 2 3 3 3 3 4 4, поэтому потребительский поток будет pop(), см. значение 1 и поймите, что он должен удалить из очереди 1).

Почему четыре очереди? Поскольку каждая очередь имеет свой собственный приоритет (1 = max, 4 = минимум), элементы из очереди 1 должны быть удалены перед удалением элементов из очереди 2; такое же рассуждение для всех остальных очередей. Поскольку здесь у меня случайное нажатие значения от 1 до 4, не должно быть голода.

Составлено с: g++ -std=c++11 -o producer-consumer-multiqueue producer-consumer-multiqueue.cpp -pthread по Ubuntu 14.04 x86_64, gcc версия 4.8.4.

Проблема: помимо странный вывод из-планировщика, потребительские потоки не действуют, как я хочу, так как вы можете видеть на выходе ниже он не отдают приоритет удаления элементов из очереди 1, но удаление выполняется не в соответствии с приоритетом (очередь 1 макс, очередь 4 мин). Я хотел бы достичь своей цели, не используя внешние библиотеки, нет boostet similia.

(0 0 1 0) // (elements in queue 1, in queue 2, in queue 3, in queue 4) 
(1 0 1 0) 
(1 1 1 0) 
(0 0 0 0) 
(0 0 0 0) 
(0 0 0 0) 
(1 0 0 0) 
(2 0 0 0) 
(2 1 0 0) 
(1 1 0 1) 
(1 0 0 1) 
(1 0 0 0) 
(1 0 0 0) 
(0 0 0 0) 
(1 0 0 0) 
...CTRL+c 

Код: это мой полный файл тестирования, скомпилированы и исполняемый файл, как это:

#include <iostream> 
#include <queue> 
#include <mutex> 
#include <condition_variable> 
#include <thread> 
#include <random> 

using namespace std; 

// modify this to modify the number of consumer threads 
#define WORKERS_THREADS  4 
// max size of each of four queues 
#define MAX_QUEUE_SIZE  100 
// debug 
#define DEFAULTCOLOR  "\033[0m" 
#define RED     "\033[22;31m" 
#define YELLOW    "\033[1;33m" 
#define GREEN    "\033[0;0;32m" 

class MultiQueue { 
    public: 
     void initThreadPool(void); 
     void insert(int num); 
     void remove(void); 
     void insertPriorityQueue(int num); 
     int removePriorityQueue(void); 
     void printQueues(string what); 
     int getQueue1Size(void); 
     int getQueue2Size(void); 
     int getQueue3Size(void); 
     int getQueue4Size(void); 
     int getPrioQueueSize(void); 
    private: 
     vector<thread> workers; 

     queue<int>q1; 
     queue<int>q2; 
     queue<int>q3; 
     queue<int>q4; 

     priority_queue<int, vector<int>, greater<int>> prioq; 

     // mutex for push/pop in priority queue 
     mutex priority_queue_mutex; 
     // 4 mutexes for each queue 
     mutex m1, m2, m3, m4; 
     // mutex for printing 4 queues size 
     mutex print; 

     // mutex for push/pop to priority_queue 
     condition_variable prioq_cond; 
     // 4 conds for consumer threads 
     condition_variable w1, w2, w3, w4; 
}; 

int MultiQueue::getQueue1Size() { return q1.size(); } 

int MultiQueue::getQueue2Size() { return q2.size(); } 

int MultiQueue::getQueue3Size() { return q3.size(); } 

int MultiQueue::getQueue4Size() { return q4.size(); } 

int MultiQueue::getPrioQueueSize() { return prioq.size(); } 

void MultiQueue::initThreadPool(void) { 
    for (int i=0; i<WORKERS_THREADS; i++) { 
     workers.push_back(thread(&MultiQueue::remove, this)); 
     workers[i].detach(); 
    } 
} 

void MultiQueue::printQueues(string what) { 
    lock_guard<mutex> l(print); 
    if (what == "insert") 
     cout << GREEN << '(' << getQueue1Size() << ' ' << getQueue2Size() << ' ' << getQueue3Size() << ' ' << getQueue4Size() << ')' << DEFAULTCOLOR << '\n' << flush; 
    else 
     cout << YELLOW << '(' << getQueue1Size() << ' ' << getQueue2Size() << ' ' << getQueue3Size() << ' ' << getQueue4Size() << ')' << DEFAULTCOLOR << '\n' << flush; 
} 

// called from producer thread to tell consumer threads 
// what queues to pop() from 
void MultiQueue::insertPriorityQueue(int num) { 
    lock_guard<mutex> prio(priority_queue_mutex); 
    prioq.push(num); 
    prioq_cond.notify_one(); 
} 

// called from consumer threads to see what queues 
// have elements to pop() from 
int MultiQueue::removePriorityQueue(void) { 
    int ret = 0; 
    unique_lock<mutex> prio(priority_queue_mutex); 
    prioq_cond.wait(prio, [this]() { return getPrioQueueSize() > 0; }); 
    ret = prioq.top(); 
    prioq.pop(); 
    return ret; 
} 

// producer thread 
void MultiQueue::insert(int num) { 
    switch(num) { 
     case 1: { 
      unique_lock<mutex> locker(m1); 
      w1.wait(locker, [this]() { return getQueue1Size() < MAX_QUEUE_SIZE; }); 
      q1.push(num); 
      break; 
     } 
     case 2: { 
      unique_lock<mutex> locker(m2); 
      w2.wait(locker, [this]() { return getQueue2Size() < MAX_QUEUE_SIZE; }); 
      q2.push(num); 
      break; 
     } 
     case 3: { 
      unique_lock<mutex> locker(m3); 
      w3.wait(locker, [this]() { return getQueue3Size() < MAX_QUEUE_SIZE; }); 
      q3.push(num); 
      break;  
     } 
     case 4: { 
      unique_lock<mutex> locker(m4); 
      w4.wait(locker, [this]() { return getQueue4Size() < MAX_QUEUE_SIZE; }); 
      q4.push(num); 
      break; 
     } 
     default: { 
      cout << "number not 1, 2, 3 nor 4: " << num << '\n' << flush; 
      break; 
     } 
    } 
    printQueues("insert"); 
    insertPriorityQueue(num); 
} 

void MultiQueue::remove(void) { 
    int which_queue = 0; 
    while (true) { 
     which_queue = removePriorityQueue(); 
     switch (which_queue) { 
      case 1: { 
       lock_guard<mutex> lock(m1); 
       int ret = q1.front(); 
       q1.pop(); 
       printQueues("remove"); 
       break; 
      } 
      case 2: { 
       lock_guard<mutex> lock(m2); 
       int ret = q2.front(); 
       q2.pop(); 
       printQueues("remove"); 
       break; 
      } 
      case 3: { 
       lock_guard<mutex> lock(m3); 
       int ret = q3.front(); 
       q3.pop(); 
       printQueues("remove"); 
       break; 
      } 
      case 4: { 
       lock_guard<mutex> lock(m4); 
       int ret = q4.front(); 
       q4.pop(); 
       printQueues("remove"); 
       break; 
      } 
      default: { 
       break; 
      } 
     } 
    } 
} 

int main(void) { 
    int random_num = 0; 

    MultiQueue mq; 
    mq.initThreadPool(); 

    default_random_engine eng((random_device())()); 
    uniform_int_distribution<int> idis(1, 4); 
    while (true) { 
     random_num = idis(eng); 
     mq.insert(random_num); 
    } 

    return 0; 
} 
+1

_ "** Почему четыре очереди? ** Поскольку каждая очередь имеет свой собственный приоритет" _ Не является ли это скорее заданием для одного ['std :: priority_queue'] (http://en.cppreference.com/ж/CPP/контейнер/priority_queue)? –

+1

Я бы просто использовал один 'std :: priority_queue', но сохранил в очереди приоритет data' std :: pair'. Вы можете сортировать очередь по приоритетной части пары, и вы гарантированно будете всегда использовать объект с наивысшим приоритетом. – NathanOliver

ответ

0

Я вижу следующие проблемы в вашем коде:

  1. печать не обязательно отражают порядок ввода элементов. Один элемент извлечения потока из очереди, а затем может ждать print блокировки в течение длительного времени, а другой поток, который получил элемент после, может быть первым, который получает print lock.
  2. Аналогичная проблема с приоритетной очередью. Может быть такая ситуация: первый поток получил элемент из очереди приоритетов и знает, что он должен появиться queue1, а затем первый поток отключается планировщиком, и второй поток начинает работать. Он также выдает очередь приоритетов, а затем переходит в поп queue2 (в то время как первый поток отключен).

Я хотел бы последовать совету от комментариев и использовать единый priority_queue<std::pair<int,int>>, где первым элементом std::pair<int,int> является приоритетом, а второй из них является полезной. Это поможет вам разобраться с проблемой 2. Что касается проблемы 1, вы должны печатать материал под тем же замком, что и вы, pop.

+0

Выполнено с 'priority_queue >' предоставление ему моего собственного класса 'Compare', с помощью только одного мьютекса и двух переменных условия гораздо проще обрабатывать. – elmazzun