Я отделил проблему потребительского/продюсера от своего приложения, чтобы убедиться, что мои потоки работают так, как должны.Многопоточный производитель/потребитель с четырьмя очередями
У меня есть одна нить производителя и поток пулов потребителей: в моем приложении один поток принимает соединения и ставит их в очередь (в пределах моей собственной структуры) в одной из четырех очередей, а четыре потока выходят из очередей и обрабатывают с подключениями, стоящими перед ними; здесь мои очереди будут содержать случайные int
между 1 и 4, без настраиваемой структуры.
Четыре mutex
обеспечивают защиту данных для каждой очереди (плюс один мьютекс для достойного cout
на терминале при печати очередей); a priority_queue
используется для синхронизации удаления из четырех очередей. Строка-производитель нажимает новое значение int
в правой очереди, а затем толкает тоже в priority_queue
, так что когда поток хочет прочитать, ему сначала нужно pop()
от priority_queue
, чтобы понять, какая очередь была нажата (поскольку она сортируется, после некоторых случайное нажатие моего priority_queue
будет выглядеть как 1 1 1 2 3 3 3 3 4 4
, поэтому потребительский поток будет pop()
, см. значение 1
и поймите, что он должен удалить из очереди 1
).
Почему четыре очереди? Поскольку каждая очередь имеет свой собственный приоритет (1 = max, 4 = минимум), элементы из очереди 1 должны быть удалены перед удалением элементов из очереди 2; такое же рассуждение для всех остальных очередей. Поскольку здесь у меня случайное нажатие значения от 1 до 4, не должно быть голода.
Составлено с: g++ -std=c++11 -o producer-consumer-multiqueue producer-consumer-multiqueue.cpp -pthread
по Ubuntu 14.04 x86_64, gcc версия 4.8.4.
Проблема: помимо странный вывод из-планировщика, потребительские потоки не действуют, как я хочу, так как вы можете видеть на выходе ниже он не отдают приоритет удаления элементов из очереди 1, но удаление выполняется не в соответствии с приоритетом (очередь 1 макс, очередь 4 мин). Я хотел бы достичь своей цели, не используя внешние библиотеки, нет boost
et similia.
(0 0 1 0) // (elements in queue 1, in queue 2, in queue 3, in queue 4)
(1 0 1 0)
(1 1 1 0)
(0 0 0 0)
(0 0 0 0)
(0 0 0 0)
(1 0 0 0)
(2 0 0 0)
(2 1 0 0)
(1 1 0 1)
(1 0 0 1)
(1 0 0 0)
(1 0 0 0)
(0 0 0 0)
(1 0 0 0)
...CTRL+c
Код: это мой полный файл тестирования, скомпилированы и исполняемый файл, как это:
#include <iostream>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <random>
using namespace std;
// modify this to modify the number of consumer threads
#define WORKERS_THREADS 4
// max size of each of four queues
#define MAX_QUEUE_SIZE 100
// debug
#define DEFAULTCOLOR "\033[0m"
#define RED "\033[22;31m"
#define YELLOW "\033[1;33m"
#define GREEN "\033[0;0;32m"
class MultiQueue {
public:
void initThreadPool(void);
void insert(int num);
void remove(void);
void insertPriorityQueue(int num);
int removePriorityQueue(void);
void printQueues(string what);
int getQueue1Size(void);
int getQueue2Size(void);
int getQueue3Size(void);
int getQueue4Size(void);
int getPrioQueueSize(void);
private:
vector<thread> workers;
queue<int>q1;
queue<int>q2;
queue<int>q3;
queue<int>q4;
priority_queue<int, vector<int>, greater<int>> prioq;
// mutex for push/pop in priority queue
mutex priority_queue_mutex;
// 4 mutexes for each queue
mutex m1, m2, m3, m4;
// mutex for printing 4 queues size
mutex print;
// mutex for push/pop to priority_queue
condition_variable prioq_cond;
// 4 conds for consumer threads
condition_variable w1, w2, w3, w4;
};
int MultiQueue::getQueue1Size() { return q1.size(); }
int MultiQueue::getQueue2Size() { return q2.size(); }
int MultiQueue::getQueue3Size() { return q3.size(); }
int MultiQueue::getQueue4Size() { return q4.size(); }
int MultiQueue::getPrioQueueSize() { return prioq.size(); }
void MultiQueue::initThreadPool(void) {
for (int i=0; i<WORKERS_THREADS; i++) {
workers.push_back(thread(&MultiQueue::remove, this));
workers[i].detach();
}
}
void MultiQueue::printQueues(string what) {
lock_guard<mutex> l(print);
if (what == "insert")
cout << GREEN << '(' << getQueue1Size() << ' ' << getQueue2Size() << ' ' << getQueue3Size() << ' ' << getQueue4Size() << ')' << DEFAULTCOLOR << '\n' << flush;
else
cout << YELLOW << '(' << getQueue1Size() << ' ' << getQueue2Size() << ' ' << getQueue3Size() << ' ' << getQueue4Size() << ')' << DEFAULTCOLOR << '\n' << flush;
}
// called from producer thread to tell consumer threads
// what queues to pop() from
void MultiQueue::insertPriorityQueue(int num) {
lock_guard<mutex> prio(priority_queue_mutex);
prioq.push(num);
prioq_cond.notify_one();
}
// called from consumer threads to see what queues
// have elements to pop() from
int MultiQueue::removePriorityQueue(void) {
int ret = 0;
unique_lock<mutex> prio(priority_queue_mutex);
prioq_cond.wait(prio, [this]() { return getPrioQueueSize() > 0; });
ret = prioq.top();
prioq.pop();
return ret;
}
// producer thread
void MultiQueue::insert(int num) {
switch(num) {
case 1: {
unique_lock<mutex> locker(m1);
w1.wait(locker, [this]() { return getQueue1Size() < MAX_QUEUE_SIZE; });
q1.push(num);
break;
}
case 2: {
unique_lock<mutex> locker(m2);
w2.wait(locker, [this]() { return getQueue2Size() < MAX_QUEUE_SIZE; });
q2.push(num);
break;
}
case 3: {
unique_lock<mutex> locker(m3);
w3.wait(locker, [this]() { return getQueue3Size() < MAX_QUEUE_SIZE; });
q3.push(num);
break;
}
case 4: {
unique_lock<mutex> locker(m4);
w4.wait(locker, [this]() { return getQueue4Size() < MAX_QUEUE_SIZE; });
q4.push(num);
break;
}
default: {
cout << "number not 1, 2, 3 nor 4: " << num << '\n' << flush;
break;
}
}
printQueues("insert");
insertPriorityQueue(num);
}
void MultiQueue::remove(void) {
int which_queue = 0;
while (true) {
which_queue = removePriorityQueue();
switch (which_queue) {
case 1: {
lock_guard<mutex> lock(m1);
int ret = q1.front();
q1.pop();
printQueues("remove");
break;
}
case 2: {
lock_guard<mutex> lock(m2);
int ret = q2.front();
q2.pop();
printQueues("remove");
break;
}
case 3: {
lock_guard<mutex> lock(m3);
int ret = q3.front();
q3.pop();
printQueues("remove");
break;
}
case 4: {
lock_guard<mutex> lock(m4);
int ret = q4.front();
q4.pop();
printQueues("remove");
break;
}
default: {
break;
}
}
}
}
int main(void) {
int random_num = 0;
MultiQueue mq;
mq.initThreadPool();
default_random_engine eng((random_device())());
uniform_int_distribution<int> idis(1, 4);
while (true) {
random_num = idis(eng);
mq.insert(random_num);
}
return 0;
}
_ "** Почему четыре очереди? ** Поскольку каждая очередь имеет свой собственный приоритет" _ Не является ли это скорее заданием для одного ['std :: priority_queue'] (http://en.cppreference.com/ж/CPP/контейнер/priority_queue)? –
Я бы просто использовал один 'std :: priority_queue', но сохранил в очереди приоритет data' std :: pair'. Вы можете сортировать очередь по приоритетной части пары, и вы гарантированно будете всегда использовать объект с наивысшим приоритетом. – NathanOliver