2009-05-22 4 views
5

Мне нужно создать систему рабочих (представленных в виде потоков) и (несколько) очередей. Отдельные задания ждут в одной из очередей и ждут, когда рабочий поток обработает их. Каждый рабочий может обрабатывать задания только из некоторых очередей. Нет ожидания ожидания. C/C++, pthreads, стандартный POSIX.C++ - потоки и несколько очередей

Проблема для меня - вещь с несколькими очередями. Я знаю, как реализовать это с помощью одной очереди. Рабочим нужно ждать на всех очередях, которые они могут обрабатывать (дождаться ЛЮБОГО из них).

В Windows я бы использовал WaitForMultipleObjects, но это должно быть многоплатформенным.

Мне не нужен какой-либо конкретный код для этого, только подсказка или описание модели, которую я должен использовать. Заранее спасибо.

+0

Можете ли вы использовать boost на всех? – PiNoYBoY82

ответ

4

Что вы можете сделать, это использовать condition variable. Ожидайте, чтобы ваши рабочие потоки зависели от переменной условия. Когда задание добавляется в любую из очередей заданий, сигнализируйте переменную условия. Затем, когда рабочий поток просыпается, он проверяет очереди, в которых он ждет. Если у кого-то из них есть задание, он снимает эту работу с очереди. В противном случае он возвращается к ожиданию переменной условия. Ожидание переменной условия помещает поток в режим сна, поэтому он не потребляет процессорное время.

Конечно, само собой разумеется, что вы должны защищать все обращения к очереди заданий с помощью мьютекса (например, pthread_mutex_t).

+0

Хорошая идея, но это пробудит все рабочие потоки при добавлении задания в пустую очередь. Полагаю, я все равно сделаю так, но есть ли способ пробудить только «правильную» нить (и в идеале только один из всех правильных потоков)? – 2009-05-22 17:13:55

+0

pthread_cond_signal() пробудит «хотя бы один» поток, ожидающий переменную условия, тогда как pthread_cond_broadcast() пробудит все потоки. Теоретически pthread_cond_signal() должен только просыпаться один поток большую часть времени, но я не знаю, правда ли это или нет на практике. –

+0

Но в вашем решении я * должен * просыпать все потоки, потому что задание может находиться в очереди, которую рабочий * one * не обрабатывает. – 2009-05-22 17:25:39

1

Если в каждой очереди не так много рабочих, вы можете сделать переменную условия для каждого рабочего.

0

Что бы я делал, это использовать boost :: asio, чтобы помещать в очередь данные, чтобы ваши несколько потоков могли идти по нему. Вы можете передать ref в очередь через команду post и соответствующим образом обработать поток.

0

Вместо того, чтобы иметь отдельную блокировку для каждой очереди, почему бы не иметь одну блокировку для всей очереди?

  1. Wait на замок
  2. Получить замок
  3. Dequeue из какой очереди
  4. Освободить замок
  5. Обработать из очереди элемент
  6. Goto шаг 1

Предполагая, что dequeue занимает незначительное время (поэтому блокировка проводится только в течение незначительного времени). может потребоваться не более одного замка.

+0

Я не думаю, что это сработает. Рабочий обрабатывает только задания из определенной очереди. Если в «плохой» очереди есть единственное задание, рабочий будет занят, пока другой работник не выполнит эту работу. – 2009-05-22 17:17:24

0

Вы можете сделать что-то вроде этого: каждое задание связано с «очередью». Например:

Скажем, у вас есть 2 очереди. Ваши рабочие места могут сказать:

job[0].queue = 1; /* That job is in queue 1 */ 
job[1].queue = 1; 
job[2].queue = 2; /* this job is in queue 2 */ 
... 
etc 

Итак, у вас есть «мешок с резьбой». Нить просто выбирает работу - скажем, работу [2]. Если этому потоку разрешено обрабатывать задания только из очереди 1, он возвращает это задание в готовую очередь и выбирает другое задание.

Таким образом, каждый поток знает, в каких очередях он может обрабатываться, и когда он выбирает задание, он обеспечивает соответствие поля «очередь» задания. Если это не так, он выбирает другую работу.

(во многих отношениях, как планирование процессов работает в Linux на нескольких ядрах.Каждый процесс имеет битмаск, говорящий о том, какие процессоры разрешено запускать, а затем процессор гарантирует, что он «разрешен» для запуска этого процесса прежде чем делать это)

5

Как насчет:.

  • все рабочие потоки ждать семафора
  • когда-либо добавляется в очередь семафора увеличивается, который будит одну нитку
  • нить chec кс очередь она интересуется, обрабатывает один из них и возвращается к ожиданию на семафоре

Вам потребуются дополнительный семафор (ы) для контроля фактического чтения & пишут в очередь.

+1

+1 Я пойду с Нилом на этом :) – ralphtheninja

0

Я думал об этом недавно, и единственная идея, которую я мог придумать, - это (при условии, что у вас есть потокобезопасные очереди), чтобы иметь только несколько потоков, обслуживающих одну очередь.

У вас может быть одна или несколько рабочих потоков, добавляющих задания в одну очередь и один или несколько рабочих потоков, блокирующих очередь, пока они не найдут что-то для обработки.

Если у вас когда-либо было несколько очередей, которые нужно выполнить нескольким рабочим потокам, то решением может быть добавить один дополнительный поток для каждой очереди и дополнительную очередь. Дополнительные потоки каждого блока в отдельной очереди, но просто перенаправляют задания на дополнительную очередь. Теперь существующий рабочий поток возвращается к блокировке в одной очереди.

Смежные вопросы