2010-09-22 2 views
1

Я делаю некоторые эксперименты по многопоточности на C++, и я понятия не имею, как решить одну проблему. Допустим, у нас есть пул потоков, который обрабатывает запросы пользователей с использованием существующего потока и создает новый поток, когда нет свободного потока. Я создал класс thread_ safe класса, который имеет методы push и pop. поп ждет, пока очередь пуста и возвращается только тогда, когда команда доступна или время ожидания. Теперь пришло время реализовать пул потоков. Идея состоит в том, чтобы освободить потоки сна в течение некоторого времени и убить нить, если после этого нечего делать. Вот реализацияСинхронизация потоков с boost :: condition_variable

command_queue::handler_t handler; 
while (handler = tasks.pop(timeout)) 
{ 
    handler(); 
} 

Здесь мы выходим из процедуры потока, если произошел тайм-аут. Это хорошо, но есть проблемы с созданием новых потоков. Допустим, у нас уже есть 2 потока запросов пользователя, они работают в данный момент, но нам нужно выполнить некоторую другую операцию асинхронно. Мы называем

thread_pool::start(some_operation); 

, который должен начать новую тему, потому что нет свободных нитей доступны. Когда поток доступен, он вызывает timed_wait в переменной условия, поэтому идея состоит в проверке наличия ожидающих потоков.

if (thread_are_free_threads) // ??? 
    condition.notify_one(); 
else 
    create_thread(thread_proc); 

но как его проверить? Документация гласит, что если нет ожидающих потоков, то notify_one ничего не делает. Если бы я мог проверить, действительно ли не сделало это ничего, что было бы решением

if (!condition.notify_one()) // nobody was notified 
    create_thread(thread_proc); 

Насколько я вижу, нет никакого способа, чтобы проверить это.

Спасибо за ваши ответы.

+1

Вы знаете, что код написания с небольшими комментариями, как правило, намного яснее, чем английский, с окроплением кода. Выполните полный код в одном блоке. –

ответ

2

Вам нужно создать другую переменную (возможно, семафор), которая знает, сколько потоков выполняется, тогда вы можете проверить это и создать новый поток, если необходимо, перед вызовом уведомления.

Другой, лучший вариант состоит в том, чтобы просто не закрыть ваши потоки, когда они тайм-аут. Они должны остаться в живых, ожидая уведомления. Вместо того, чтобы выходить, когда время уведомления истекает, проверьте переменную, чтобы убедиться, что программа все еще запущена или если она «выключена», если она все еще запущена, а затем снова запустите ожидание.

+0

'boost :: thread_group' может упростить последнее, т. Е. Вы можете вызвать' interrupt_all() ', чтобы запросить выключение, когда они ждут. – ybungalobill

+1

Идея состоит в создании пула интеллектуальных потоков, который не будет содержать лишние потоки. Так что это не решит проблему, она просто избежит ее, изменив задачу :) Я попробую семафоры, они должны быть немного медленнее, но если нет другого выбора. – ledokol

+0

Сохранение потоков вживую и ожидание переменной состояния не является проблемой. Он не потребляет ресурсов времени выполнения, а пространство стека невелико. С другой стороны, создание потоков дорого. –

1

Более типичный пул потоков будет выглядеть следующим образом:

Pool::Pool() 
{ 
    runningThreads = 0; 
    actualThreads = 0; 
    finished  = false; 
    jobQue.Init(); 

    mutex.Init(); 
    conditionVariable.Init(); 

    for(int loop=0; loop < threadCount; ++loop) { startThread(threadroutine); } 
} 

Pool::threadroutine() 
{ 

    { 
     // Extra code to count threads sp we can add more if required. 
     RAIILocker doLock(mutex); 
     ++ actualThreads; 
     ++ runningThreads; 
    } 
    while(!finished) 
    { 
     Job job; 
     { 
      RAIILocker doLock(mutex); 

      while(jobQue.empty()) 
      { 
       // This is the key. 
       // Here the thread is suspended (using zero resources) 
       // until some other thread calls the notify_one on the 
       // conditionVariable. At this point exactly one thread is release 
       // and it will start executing as soon as it re-acquires the lock 
       // on the mutex. 
       // 
       -- runningThreads; 
       conditionVariable.wait(mutex); 
       ++ runningThreads; 
      } 
      job = jobQue.getJobAndRemoveFromQue(); 
     } 
     job.execute(); 
    } 
    { 
     // Extra code to count threads sp we can add more if required. 
     RAIILocker doLock(mutex); 
     -- actualThreads; 
     -- runningThreads; 
    } 
} 

Pool::AddJob(Job job) 
{ 
    RAIILocker doLock(mutex); 

    // This is where you would check to see if you need more threads. 
    if (runningThreads == actualThreads) // Plus some other conditions. 
    { 
     // increment both counts. When it waits we decrease the running count. 
     startThread(threadroutine); 
    } 
    jobQue.push_back(job); 
    conditionVariable.notify_one(); // This releases one worker thread 
            // from the call to wait() above. 
            // Note: The worker thread will not start 
            //  until this thread releases the mutex. 
} 
+0

Я думаю, что это у нас уже есть в boost :: asio? – ledokol

+0

@ledokol: Я уверен, что есть, но вы, кажется, пытаетесь создать свой собственный. Вот как это должно выглядеть. –

+0

Да, это только для экспериментальных целей, но все же boost :: asio не то, что я пытаюсь построить. – ledokol

0

Я думаю, что вы должны пересмотреть свой дизайн. В простой модели нитки дилера, выполняющей работу ниток игрока, дилер помещает задание в очередь сообщений и позволяет одному из игроков забирать задание, когда у него появляется шанс.

В вашем случае дилер активно управляет пулом потоков, поскольку он сохраняет знания о том, какие потоки игроков не заняты и которые заняты. Так как дилер знает, какой игрок простаивает, дилер может активно пропустить простоя задания и сигнализировать игроку с помощью простого семафора (или cond var) - там есть один семафор на игрока. В таком случае для дилера может иметь смысл активно уничтожать простаивающие нити, давая нить убить себя работу.

+0

Мое дело абсолютно такое же, как вы описали. Может быть, мне было недостаточно ясно. Я просто подталкиваю работу в очередь, а рабочие потоки ждут, когда новая работа будет опубликована и просыпается, поскольку они видят, что есть что-то делать. Но для удовлетворения моих потребностей дилер должен создавать поток, когда нет свободных потоков, и проблема в том, что дилер не знает, есть ли на данный момент простаивающая нить. Проблема в том, чтобы проверить это. Вы можете представить это как 2 типа потоков - BOSS и WORKERS, BOSS дает задания, а работники обрабатывают их, но когда нет свободного рабочего, BOSS должен нанять кого-то – ledokol

+0

Шаблон очереди сообщений - это задания BOSS на доске объявлений, а WORKERS когда они закончили предыдущий. Важно то, что BOSS не знает, насколько заняты рабочие. Моя модель BOSS назначает задания отдельным работникам (кто знает по имени) и нанимает другого WORKER, если он не может найти тот, который, как он знает, выполняет эту работу. – doron

+0

Возможно, я не понимаю вас ясно. Вы написали «Так как дилер знает, какой игрок простаивает», но откуда он это знает? – ledokol

0

Теперь я нашел одно решение, но это не так идеально. У меня есть изменчивая переменная-член, именуемая свободной, - которая хранит количество свободных потоков в пуле.

void thread_pool::thread_function() 
{ 
    free++; 
    command_queue::handler_t handler; 
    while (handler = tasks.pop(timeout)) 
    { 
     free--; 
     handler(); 
     free++; 
    } 
    free--; 
} 

, когда я задаю задачу нить я что-то вроде этого

if (free == 0) 
    threads.create_thread(boost::bind(&thread_pool::thread_function, this)); 

есть еще проблема с синхронизацией, потому что, если контекст будет включен после free-- в thread_function мы могли бы создать новый поток, который нам на самом деле не нужен, но поскольку очередь задач является потокобезопасной, нет проблем с этим, это просто нежелательные накладные расходы. Можете ли вы предложить решение этого и что вы думаете об этом? Может быть, лучше оставить его, потому что здесь есть еще одна синхронизация?

0

Другая идея. Вы можете запросить длину очереди сообщений. Если он слишком длинный, создайте нового работника.

+0

Возможно, но это не гарантирует немедленного запуска всей задачи. В этом случае вы можете потребовать выполнить операцию, которая будет выполнена после завершения некоторых других. Это не решает проблему. Дилер хочет, чтобы все задачи выполнялись немедленно, без каких-либо задержек, даже если это неэффективно с точки зрения использования большего количества потоков, а затем процессоров/ядер. – ledokol

+0

Затем взгляните на мой другой ответ – doron

Смежные вопросы