2014-03-06 4 views
1

Я создаю собственный веб-сервер, и я хочу, чтобы он был многопоточным. Я не знаю, как начать новый поток, сохранить его в каком-то списке, обнаружить, когда поток сделан, и добавить некоторые из очереди ожидания. Может ли кто-нибудь дать мне простой пример, как это сделать? Теперь я бегу темы таким образом:C++ boost многопоточность, как очередь в очереди

boost::thread t(app, client_fd); // but it's not good way because I can't control it 

Ниже pseucode что Проиллюстрируем то, что я имею в виду:

class worker 
{ 
    public: 
    void run(int cfd) 
    { 
    // do something 
    } 
} 

std::vector<int> waitQueue; 
std::vector<worker> runningQueue; 

onAcceptClient(int client_fd) 
{ 
    waitQueue.insert(client_fd); 
} 

while(1) // this must run in single thread 
{ 
    client_fd = accept(...); 
    onAcceptClient(client_fd); 
} 

while(1) // this must run in single thread 
{ 
    if (runningQueue.size() < 128) 
    { 
     int diff = 128 - runningQueue.size() ; 
     for (int a = 0; a < diff; a++) 
     { 
     int cfc = waitQueue.pop(); 
     worker w; 
     w.run(cfc); 
     runningQueue.insert(w); 
     } 
    } 
} 
+0

Не прямой ответ на Ваш вопрос, но вы должны определенно взглянуть на повышение :: Asio (http://www.boost.org/doc/libs/ 1_55_0/doc/html/boost_asio.html), особенно если вы работаете на веб-сервере. Он позволяет разделить работу на разные потоки (см., Например, http://stackoverflow.com/q/14265676/991425) и ist специально для сетевого ввода-вывода, который вы планируете делать много, при программировании веб-сервера , Я полагаю. – Haatschii

+0

Вам понадобится мьютекс для очереди. При использовании окон самым простым является CriticalSection. В Unix, мьютекс. Чтобы поток сигнализировал, когда он закончен, я использую семафор и ждут его. –

+0

Спасибо за ваши комментарии! – mitch

ответ

0

Это, как я это реализовать.

Во-первых, у меня есть базовый класс ThreadWorker, из которого могут произойти мои фактические задачи обработки. Обратите внимание, что это Windows, специфична, но вы можете заменить повысить семафор для CriticalSection, а реализация POSIX семафоров здесь: http://linux.die.net/man/7/sem_overview

class ThreadWorker 
{ 
public: 
ThreadWorker(void) 
{ 
    signalfinished = NULL; 
    forcesignal = false; 
} 
virtual ~ThreadWorker(void) 
{ 
    if (signalfinished!=NULL) ReleaseSemaphore(signalfinished,1,NULL); 
} 

DWORD ThreadId; 
HANDLE threadhandle; 
HANDLE signalfinished; 
bool forcesignal; 

static DWORD WINAPI workerthread(void *param) 
{ 
    ThreadWorker *worker = (ThreadWorker*)param; 
    worker->signalfinished = CreateSemaphore(NULL,0,1,NULL); 
    worker->RunTask(); 
    ReleaseSemaphore(worker->signalfinished,1,NULL); 
} 

void StartWorker() 
{ 
    CreateThread(NULL,NULL,ThreadWorker::workerthread,this,0,&ThreadId); 
} 

void WaitUntilWorkerFinished() 
{ 
    DWORD waitresult; 
    do 
    { 
     waitresult = WaitForSingleObject(signalfinished,1000); 
    } while (waitresult!=WAIT_OBJECT_0 && !forcesignal); 

} 
virtual void RunTask()=0; 
}; 

Тогда у меня есть ThreadManager, который управляет очередью объектов ThreadWorker. Опять же, вы можете заменить мьютекс для CriticalSection. Я предпочитаю std :: list для std :: vector, потому что вектор смежный.

class ThreadManager 
{ 
    CRITICAL_SECTION critsec; 
    std::list<ThreadWorker*> taskqueue; 
public: 
    ThreadManager(void) 
    { 
     InitializeCriticalSection(&critsec); 
    } 
    void AddTaskToQueue(ThreadWorker *task) 
    { 
     EnterCriticalSection(&critsec); 
     taskqueue.push_back(task); 
     LeaveCriticalSection(&critsec); 
    } 
    void ProcessTaskQueue() 
    { 
     while (true) 
     { 
      EnterCriticalSection(&critsec); 
      ThreadWorker *thistask = taskqueue.front(); 
      taskqueue.pop_front(); 
      LeaveCriticalSection(&critsec); 
      thistask->StartWorker(); 
     } 
    } 
    ~ThreadManager(void) 
    { 
     DeleteCriticalSection(&critsec); 
    } 
}; 

Чтобы добавить задачу в очередь, нам нужен подкласс, который реализует ThreadWorker.

class SomeWorkerTask : public ThreadWorker 
{ 
public: 
SomeWorkerTask(void); 
virtual ~SomeWorkerTask(void); 

void RunTask() 
{ 
    std::cout << "Hello, I am a worker task runing on thread id " << ThreadId << std::endl; 
} 
}; 

Создать новый экземпляр SomeWorkerTask и добавить его в очередь, а затем обрабатывать очереди. В вашем случае у вас будут разные потоки, добавляющие задачи в очередь и обработка очереди, но я предполагаю, что вы получите эту идею.

SomeWorkerTask *atask = new SomeWorkerTask(); 
ThreadManager manager; 
manager.AddTaskToQueue(atask); 
manager.ProcessTaskQueue(); 

Если вы хотите знать, когда задача завершения обработки, вы можете вызвать ThreadWorker :: WaitUntilWorkerFinished из другого потока или добавить вызов ProcessTaskQueue. Вы можете изменить ThreadManager, чтобы у вас была очередь ожидающих задач, одна очередь запущенных задач и третья очередь готовых задач. После того, как вы вытащите задание из очереди ожидания, вы добавите ее в рабочую очередь и с помощью семафора задачи определите, когда она была завершена, затем добавьте ее в готовые задачи/удалите ее из запущенных задач. Обратите внимание, что стандартные контейнеры, такие как вектор, карта и список, не являются потокобезопасными, поэтому вы всегда должны окружать операции, которые вставляют/удаляют из контейнера с блокировкой взаимного исключения, например критический раздел или мьютекс.

Надеюсь, что это поможет.

0

вам необходимо использовать мьютексу для очереди, к которой обращаются одновременно от нескольких потоков, а так как вы тестируете runningQueue.size() < 128, вам также нужна переменная условия. Если ваш компилятор поддерживает C++ 11, std::mutex и std::condition_variable сделают это задание, или boost::mutex и boost::condition_variable в порядке.
Так что вроде как это:

onAcceptClient(int client_fd) 
{ 
    boost::mutex::scoped_lock waitLock(mWaitMutex); 
    waitQueue.insert(client_fd); 
} 

while(1) // this must run in single thread 
{ 
    boost::mutex::scoped_lock lock(mRunningMutex); 
    // wait if the runningQueue.size >= 128 
    while(runningQueue.size() >= 128) 
     mRunningCond.wait(lock); 

    if (runningQueue.size() < 128) 
    { 
     int diff = 128 - runningQueue.size() ; 
     for (int a = 0; a < diff; a++) 
     { 
      boost::mutex::scoped_lock waitLock(mWaitMutex); 
      int cfc = waitQueue.pop(); 
      worker w; 
      w.run(cfc); 
      runningQueue.insert(w); 
     } 
    } 
} 
Смежные вопросы