2015-09-16 5 views
1

У меня есть одна нить производителя и несколько потребителей, каждый потребитель имеет: собственную очередь с данными и уникальный идентификатор. Я использую std :: map, чтобы идентифицировать каждую очередь для потока.Один поток производителей, несколько потребителей

Каждый потребительский поток работает с данными в своей очереди, если очередь пуста, поток должен ждать данных. Если я хочу сделать это только с одним потоком, я могу использовать std :: condition_variable с std :: unique_lock, но у меня есть несколько пользователей, поэтому мне нужно несколько std :: condition_variable, но я не могу их сохранить в контейнерах (копирование/назначение удаляются). Так я использовать код, как этот

while(q.empty()) { 
    std::cout << "waiting...\n"; 
    std::this_thread::sleep_for(std::chrono::milliseconds(100)); 
} 

где д ссылка на очереди. Но как я могу синхронизировать его с лучшим способом? Спасибо заранее. P.S. Очередь всегда будет иметь данные, последние данные должны указывать «exit».

+3

Отсутствие копии/назначение не означает, что вы не можете положить вещи в контейнер. Только некоторые из операций будут недоступны. Вы все равно можете использовать 'emplace' и' at'. – zch

+1

Пожалуйста, взгляните на [это] (http://codereview.stackexchange.com/questions/18094/boost-threads-producer-consumer-threads-with-synchronization). –

+1

_ "но у меня есть несколько потребителей, поэтому мне нужно несколько std :: condition_variable" _ not true. –

ответ

0

вызова ООП для решения этой задачи:

class ConcurentTaskQueue{ 
    std::mutex lock; 
    std::condition_variable m_ConditionVariable; 
    std::queue<Task> m_TaskQueue; 


public: 
    Task getTask(){ 
     std::unique_lock<std::mutex> synchLock (lock); //NOTE: consider doing this with a while(programIsRunning){} loop 
     while(m_TaskQueue.empty()){ 
      m_ConditionVariable.wait(synchLock); 
     } 
     Task task(std::move(m_TaskQueue.front())); 
     m_TaskQueue.pop(); 
     return task; 
    } 

    void addTask (Task task){ 
     std::unique_lock<std::mutex> synchLock (lock); 
     m_TaskQueue.push(std::move(task)); 
     m_ConditionVariable.notify_one(); 
    } 
}; 

и теперь просто:

std::map<size_t,ConcurentTaskQueue> inputQueue; 
std::thread producer ([&]{ 
     Task task = produceTask() 
     inputQueue[ID].addTask(task); 
}); 

std::thread consumer1([&]{ 
     Task task = inputQueue[ID].getTask(); 
}); 


std::thread consumer2([&]{ 
     Task task = inputQueue[ID].getTask(); 
}); 

EDIT2: использовать ThreadPool

+1

Ваша реализация не рассматривает ложные пробуждения на 'wait', что может привести к неопределенному поведению. – ComicSansMS

+0

Вы правы, но это только направление, а не полная реализация. –

+0

Тогда, пожалуйста, включите ваши предположения в ответ. Сломанный код хуже, чем никакого кода. – ComicSansMS

1

Одного std::conditional_variable с одним std::mutex должно быть достаточно.

Task t; 
{ 
    std::unique_lock<std::mutex> lock(mtx); 
    while (q.empty()) 
    cond_var.wait(lock); 
    t = std::move(q.front()); 
    q.pop_front(); 
} 

и основной поток будет делать

{ 
    std::lock_guard<std::mutex> lock(mtx); 
    q.emplace_front(/*...*/); 
    cond_var.notify_all(); 
} 

Главной нити проснется все темы, но большинство из них будет идти спать, так как их очередь еще пуста.

+0

Но как насчет производительности? Пробуйте много потоков, это операция на волнах, не так ли? – user3365834

+0

Так что используйте 'notify_one()' и только разбудите одного потребителя, который возьмет на себя задачу и поработает над ней. –

+0

Вы не можете передать 'std :: mutex' в' condition_variable :: wait() ', и этот пример не является исключением. Вы не должны использовать 'mtx.lock()' и 'mtx.unlock()', вместо этого используйте 'unique_lock'. –

2

Поскольку каждый потребитель имеет свою собственную очередь, и для всех потребителей существует только один производитель, это, по сути, сценарий один производитель-один-потребитель.

Другими словами, у вас нет единой очереди, разделяемой между всеми потребителями.

+0

Производитель производит данные для всех потребителей. В потоке производителя у меня есть данные и идентификатор пользователя, поэтому я помещаю эти данные в очередь с идентификатором. – user3365834

+0

@ user3365834 Тем не менее, до тех пор, пока потребители не используют * такую ​​же очередь для получения своих данных, это, по сути, один производитель-один потребитель. Тот факт, что существует более одного потребителя и что ваш продюсер играет одного производителя для более чем одного потребителя, не меняет того факта, что потоки синхронизируют друг друга в режиме одного производителя-одного потребителя. – ComicSansMS

+0

@Maxim Егорушкин, насколько я понимаю, вопрос один поток заполняет очереди, и многие потоки могут потреблять одну очередь –

0

Если вы не делаете это для изучения/обучения, но и для производственный код, я бы рекомендовал использовать одну из многих реализаций. Реализация этого правильно, эффективно и масштабируемо не так просто, но проблемы были решены другими людьми. Существует много вариантов, например.

+0

Учитывая ужасный сломанный код, предлагаемый в ответах здесь, я хотел бы согласиться. –

+0

@ JonathanWakely jee, спасибо. –

Смежные вопросы