2013-02-28 2 views
2

Я пытаюсь смоделировать систему, в которой есть несколько потоков, производящих данные, и один поток, потребляющий данные. Фокус в том, что я не хочу, чтобы выделенный поток потреблял данные, потому что все потоки живут в пуле. Вместо этого я хочу, чтобы один из продюсеров опорожнил очередь, когда есть работа, и уступите, если другой производитель уже освобождает очередь.Многопроцессорный однопользовательский Lazy Task Execution

Основная идея заключается в том, что есть очередь работы и блокировка обработки. Каждый производитель подталкивает свою полезную нагрузку в очередь, а затем предпринимает попытки ввести замок. Попытка неблокируется и возвращает либо true (блокировка была получена), либо false (блокировка удерживается кем-то другим).

Если блокировка получена, тогда этот поток обрабатывает все данные в очереди до тех пор, пока не станет пустым (включая любые новые полезные нагрузки, введенные другими производителями во время обработки). После того как вся работа была обработана, поток освобождает блокировку и завершает работу.

Ниже C++ код для алгоритма:

void Process(ITask *task) { 
    // queue is a thread safe implementation of a regular queue 
    queue.push(task); 

    // crit_sec is some handle to a critical section like object 
    // try_scoped_lock uses RAII to attempt to acquire the lock in the constructor 
    //     if the lock was acquired, it will release the lock in the 
    //     destructor 
    try_scoped_lock lock(crit_sec); 

    // See if this thread won the lottery. Prize is doing all of the dishes 
    if (!lock.Acquired()) 
     return; 

    // This thread got the lock, so it needs to do the work 
    ITask *currTask; 
    while (queue.try_pop(currTask)) { 
      ... execute task ... 
    } 
} 

В целом этот код работает отлично, и я фактически никогда не видел поведение я собираюсь описать ниже, но реализация заставляет меня чувствовать себя неловко. Разумеется, условие гонки возникает между тем, когда поток выходит из цикла while и когда он освобождает критический раздел.

Весь алгоритм основан на предположении, что если блокировка удерживается, то поток обслуживает очередь.

Я в основном ищу просветления на 2 вопроса:

  1. Я правильно, что есть состояние гонки, как описано (бонус для других рас)
  2. Есть стандартный шаблон для реализации этого механизма, является исполнителем и не предусматривает условия гонки?

ответ

2

Да, есть состояние гонки.

Тема А добавляет задачу, получает lock, обрабатывает себя, а затем запрашивает задачу из queue. Он отклоняется.

Резьба B в этой точке добавляет задачу к queue. Затем он пытается получить блокировку и терпит неудачу, потому что поток A имеет блокировку. Поток B завершается.

Тема А затем выходит, а queue не пуст, и никто не обрабатывает эту задачу.

Это будет сложно найти, потому что это окно относительно узкое. Чтобы сделать его более вероятным, после цикла while ввести «сон в течение 10 секунд». В вызывающем коде вставьте задачу, подождите 5 секунд, затем вставьте вторую задачу. Через 10 секунд проверьте, что оба задания вставки завершены, и еще есть задача, которую нужно обработать на queue.

Один из способов исправить это - изменить try_pop на try_pop_or_unlock и передать в ваш lock ему. try_pop_or_unlock затем атомарно проверяет пустой queue, и если это так разблокирует lock и возвращает false.

Другой подход заключается в улучшении пула потоков. Добавьте счетчик семафора на основе «потреблять» на него.

semaphore_bool bTaskActive; 
counting_semaphore counter; 

when (counter || !bTaskActive) 
    if (bTaskActive) 
    return 
    bTaskActive = true 
    --counter 
    launch_task(process_one_off_queue, when_done([&]{ bTaskActive=false)); 

Когда подсчет семафор активен, или когда ткнул законченный потреблять задачу, он запускает потреблять задачу, если не потреблять задачу активно.

Но это совсем недалеко от моей головы.

+0

Метод try_pop_or_unlock на самом деле является действительно неотразимой идеей. Как я могу сделать эту функцию атомной? – Mranz

+0

Вы реализовали «очередь»? Простая поточно-безопасная «очередь» просто имеет в ней «mutex» - в этом случае просто разблокируйте переданную «блокировку», сохраняя при этом «mutex», когда «очередь» пуста. Для более удобной «очереди в очереди» все становится сложнее. – Yakk

+0

К сожалению, эта реализация является последней. – Mranz

Смежные вопросы