2015-08-24 2 views
2

Я работаю над проблемой производителя/потребителя с модификацией. Тем не менее, есть состояние гонки, и я обсуждаю лучший способ этого. Могут быть более чистые способы, и мне интересно, сделал ли кто-нибудь что-то подобное и, если возможно, поделился лучшим решением.модифицированный алгоритм потребительского потребителя

он запускается как обычный производитель/потребитель, используя очередь. Один поток производителя считывает элементы с диска и помещает их в общую очередь. Затем несколько потребительских потоков пытаются удалить объекты для обработки. Однако каждый элемент имеет тег (например, идентификатор потока), который ДОЛЖЕН соответствовать тегу потребителя. Потребительский поток смотрит на фронт очереди и проверяет тег элемента. Если он не совпадает с тегом потребительского потока, потребитель должен заснуть и ждать, пока на передней части очереди не будет элемент, соответствующий его тегу. Немного сбивает с толку, но псевдокод ниже, надеюсь, уточняет алгоритм:

struct item 
{ 
    // This is unique tag that only a specific consumer can consumer 
    int consumerTag; 
    // data for the consumer to consume 
    void *pData; 
} 

/////////////////////////////// 
// PRODUCER THREAD -> only 1 
/////////////////////////////// 
// producer reads items 
// each item has a tag to a specific consumer 
while (item = read()) 
{ 
    lock(queue) 
    if (queueNotFull) 
    { 
     enqueue(item); 
    } 
    else 
    { 
     // check front of the queue, notify worker. 
     Sleep(); // Releases Queue Mutex upon entering 
     // requires the mutex after it has been awaken 
    } 
    unlock(queue); 
    wakeUpAllConsumers(); 
} 
------------------------------------------------------- 
/////////////////////////////// 
// CONSUMER THREAD -> many threads 
/////////////////////////////// 
// my tag is it like at thread id, 
// each consumer has a unique tag 
myTag = getThreadTAG() 
while (true) 
{ 
    lock (queue); 
    if (queueNotEmpty) 
    { 
     item = queueFront() 
     if (myTag == item->id) 
     { 
      // this item is for me, let's dequeue and process 
      item = dequeue(); 
      process(); 
     } 
     else 
     { 
      // This is not for me let's go to sleep 
      Sleep(); // Releases Queue Mutex 
      // re-acquire mutex 
     } 
    } 
    else 
    { 
     Sleep(); // Releases Queue Mutex 
     // re-acquire mutex 
    } 

    unlock (queue); 
    wakeUpProducer(); 
} 

Однако существуют проблемы, связанные с выше алгоритма. Рассмотрим следующие события и предположим следующее:

item.tag = 1 означает, что этот элемент должен потребляться только потребителем с тем же самым тегом. Я буду представлять это как consumer.tag = 1

  1. Производителя читает item.tag=1 и ставит в очередь
  2. Производитель будит все потребительские потоки (consumer.tag=1, consumer.tag=2 и т.д ... все спят сейчас и проверить фронт очереди)
  3. Производитель читает item.tag=2 и ставит в очередь
  4. Производитель будит все потребительские темы
  5. очередь теперь имеет [item.tag=1, item.tag=2]
  6. consumer.tag=2 wakes up and peek at the front of the queue, but item.tag = 1 который не соответствует consumer.tag=1; поэтому он ложится спать. consumer.tag=2 спал.
  7. consumer.tag=1 просыпается и смотрит в передней части очереди, и item.tag=1, что соответствует consumer.tag=1. Удаляет и уведомляет производителя, что он может потреблять больше.
  8. Производитель заканчивает считывание данных и выходов. Теперь очередь item.tag=2 и consumer.tag=2 спать и никогда не потребляет эти данные. Обратите внимание, что может быть много потребителей. Таким образом, в конце концов многие потребители могут закончить спать, а очередь

Я думал, что просто добавьте в конце нити производителя петлю, которая просыпает все спящие потоки, пока очередь не станет пустой.

// PRODUCER THREAD 
// Process the rest 
while (queueIsNotEmpty) 
{ 
    WakeUpAllConsumer(); 
    Sleep(); 
} 

Но я считаю, что должен быть более элегантный способ справиться с этой проблемой. Любые идеи, дайте мне знать

thx!

+0

Нет смысла ставить всю работу в одной очереди. Таким образом, каждый потребитель должен иметь собственную независимую очередь, защищенную переменной условия. Он будет масштабироваться намного лучше. – piotrekg2

+0

Заслуживает упоминания, что заказчик обрабатывает данные так же, как производитель читает с диска (это список команд). Если я использую несколько очередей, порядок, в котором будут обрабатываться элементы, будет не таким, как заказ, который был сохранен на диске. – gmmo

+0

Текущее решение не использует параллелизм, поскольку очередь блокируется во время обработки. Я думаю, что единственный производитель/единственный потребитель - это путь. – piotrekg2

ответ

0

Я столкнулся с чем-то похожим назад (в установке, где все потоки могут иметь дело со всеми предметами), и решение, которое я использовал там, хотя не все, что было изящно, должно было разбудить всех в последний раз, когда продюсер закончил считывая данные.
Здесь это не будет работать, так как если у вас есть третий элемент в вашей очереди, то этот элемент может остаться позади.Я бы предположил, что это один из двух способов:

  1. Пробуждение всей темы КАЖДЫЙ ВРЕМЯ потребитель делит предмет. Это единственный способ, которым я могу думать, чтобы убедиться, что ничего не осталось. (Этот режим может быть сделано только тогда, когда isProducerFinishedReading == true, чтобы сохранить один ресурсов/времени.

  2. Заново спроектировать систему, чтобы иметь 10 очередей, а затем, когда элемент добавляется в очередь n, потребительский поток n будит. Когда его законченный с элементом, он проверяет очередь снова для нового элемента, чтобы иметь дело с. в любом случае, производитель должен проверить длину всех очередей, когда чтение выполняется и просыпается соответствующие темы.

надежды, что помогли.

EDIT:

  1. Каждый раз, когда поток заканчивается, он должен проверять очередь снова, и если элемент имеет «его», то он выполняет работу. Если поток может разбудить другие потоки, тогда он должен разбудить соответствующий.
+0

Есть дополнительная проблема, о которой я не упоминал. Данные, которые считываются и находятся в очереди, должны обрабатываться в том же порядке. У меня не может быть несколько обработок ада. То, что находится в очереди, это список команд, эти команды должны обрабатываться в том же порядке. – gmmo

+0

Итак, почему разные рабочие потоки? –

+0

трудно объяснить. Предположим, что все, что я описал, необходимо, оно является частью сложной системы. Я только что подготовил основной вопрос. – gmmo