Я работаю над проблемой производителя/потребителя с модификацией. Тем не менее, есть состояние гонки, и я обсуждаю лучший способ этого. Могут быть более чистые способы, и мне интересно, сделал ли кто-нибудь что-то подобное и, если возможно, поделился лучшим решением.модифицированный алгоритм потребительского потребителя
он запускается как обычный производитель/потребитель, используя очередь. Один поток производителя считывает элементы с диска и помещает их в общую очередь. Затем несколько потребительских потоков пытаются удалить объекты для обработки. Однако каждый элемент имеет тег (например, идентификатор потока), который ДОЛЖЕН соответствовать тегу потребителя. Потребительский поток смотрит на фронт очереди и проверяет тег элемента. Если он не совпадает с тегом потребительского потока, потребитель должен заснуть и ждать, пока на передней части очереди не будет элемент, соответствующий его тегу. Немного сбивает с толку, но псевдокод ниже, надеюсь, уточняет алгоритм:
struct item
{
// This is unique tag that only a specific consumer can consumer
int consumerTag;
// data for the consumer to consume
void *pData;
}
///////////////////////////////
// PRODUCER THREAD -> only 1
///////////////////////////////
// producer reads items
// each item has a tag to a specific consumer
while (item = read())
{
lock(queue)
if (queueNotFull)
{
enqueue(item);
}
else
{
// check front of the queue, notify worker.
Sleep(); // Releases Queue Mutex upon entering
// requires the mutex after it has been awaken
}
unlock(queue);
wakeUpAllConsumers();
}
-------------------------------------------------------
///////////////////////////////
// CONSUMER THREAD -> many threads
///////////////////////////////
// my tag is it like at thread id,
// each consumer has a unique tag
myTag = getThreadTAG()
while (true)
{
lock (queue);
if (queueNotEmpty)
{
item = queueFront()
if (myTag == item->id)
{
// this item is for me, let's dequeue and process
item = dequeue();
process();
}
else
{
// This is not for me let's go to sleep
Sleep(); // Releases Queue Mutex
// re-acquire mutex
}
}
else
{
Sleep(); // Releases Queue Mutex
// re-acquire mutex
}
unlock (queue);
wakeUpProducer();
}
Однако существуют проблемы, связанные с выше алгоритма. Рассмотрим следующие события и предположим следующее:
item.tag = 1 означает, что этот элемент должен потребляться только потребителем с тем же самым тегом. Я буду представлять это как consumer.tag = 1
- Производителя читает
item.tag=1
и ставит в очередь - Производитель будит все потребительские потоки (
consumer.tag=1
,consumer.tag=2
и т.д ... все спят сейчас и проверить фронт очереди) - Производитель читает
item.tag=2
и ставит в очередь - Производитель будит все потребительские темы
- очередь теперь имеет
[item.tag=1, item.tag=2]
consumer.tag=2 wakes up and peek at the front of the queue, but
item.tag = 1 который не соответствуетconsumer.tag=1
; поэтому он ложится спать.consumer.tag=2
спал.consumer.tag=1
просыпается и смотрит в передней части очереди, иitem.tag=1
, что соответствуетconsumer.tag=1
. Удаляет и уведомляет производителя, что он может потреблять больше.- Производитель заканчивает считывание данных и выходов. Теперь очередь
item.tag=2
иconsumer.tag=2
спать и никогда не потребляет эти данные. Обратите внимание, что может быть много потребителей. Таким образом, в конце концов многие потребители могут закончить спать, а очередь
Я думал, что просто добавьте в конце нити производителя петлю, которая просыпает все спящие потоки, пока очередь не станет пустой.
// PRODUCER THREAD
// Process the rest
while (queueIsNotEmpty)
{
WakeUpAllConsumer();
Sleep();
}
Но я считаю, что должен быть более элегантный способ справиться с этой проблемой. Любые идеи, дайте мне знать
thx!
Нет смысла ставить всю работу в одной очереди. Таким образом, каждый потребитель должен иметь собственную независимую очередь, защищенную переменной условия. Он будет масштабироваться намного лучше. – piotrekg2
Заслуживает упоминания, что заказчик обрабатывает данные так же, как производитель читает с диска (это список команд). Если я использую несколько очередей, порядок, в котором будут обрабатываться элементы, будет не таким, как заказ, который был сохранен на диске. – gmmo
Текущее решение не использует параллелизм, поскольку очередь блокируется во время обработки. Я думаю, что единственный производитель/единственный потребитель - это путь. – piotrekg2