2014-04-03 4 views
2

У меня есть серверное приложение, и у меня есть проблема с тем, что потоки не удаляются до их завершения. Кодекс ниже в значительной степени представляет собой мой сервер; очистка требуется для предотвращения накопления мертвых потоков в списке.Лучший способ обработки многопоточной очистки

using namespace std; 

class A { 
public: 
    void doSomethingThreaded(function<void()> cleanupFunction, function<bool()> getStopFlag) { 
     somethingThread = thread([cleanupFunction, getStopFlag, this]() { 
      doSomething(getStopFlag); 
      cleanupFunction(); 
     }); 

    } 
private: 
    void doSomething(function<bool()> getStopFlag); 
    thread somethingThread; 
    ... 
} 

class B { 
public: 
    void runServer(); 

    void stop() { 
     stopFlag = true; 
     waitForListToBeEmpty(); 
    } 
private: 
    void waitForListToBeEmpty() { ... }; 
    void handleAccept(...) { 
     shared_ptr<A> newClient(new A()); 
     { 
      unique_lock<mutex> lock(listMutex); 
      clientData.push_back(newClient); 
     } 
     newClient.doSomethingThreaded(bind(&B::cleanup, this, newClient), [this]() { 
      return stopFlag; 
     }); 
    } 

    void cleanup(shared_ptr<A> data) { 
     unique_lock<mutex> lock(listMutex); 
     clientData.remove(data); 
    } 

    list<shared_ptr<A>> clientData; 
    mutex listMutex; 
    atomc<bool> stopFlag; 
} 

Проблема, кажется, что деструкторы работают в неправильном порядке - т.е. shared_ptr разрушается в том, когда функция потока завершается, то есть «A» объект удаляется до завершения потока, в результате чего Havok, когда нити вызывается деструктор.

т.е. Вызов функции очистки Все ссылки на это (то есть Л объект) удаляется, так называют деструктор (включая деструктор этого потока) вызова деструктора эту нить снова - ОН голосующие!

Я рассмотрел альтернативы, такие как сохранение списка «для удаления», который периодически используется для очистки основного списка другим потоком или с помощью функции удаления по времени для общих указателей, но обе они кажутся непримиримыми и могут иметь условия гонки.

Кто-нибудь знает, как это сделать? Я не вижу простой способ рефакторинга, чтобы он работал нормально.

ответ

3

Являются ли нити соединяемыми или отсоединенными? Я не вижу никаких detach, , что означает, что разрушение объекта нити, не связанное с , является фатальной ошибкой. Вы можете попробовать просто отсоединить его, , хотя это может сделать чистое завершение несколько сложным. (Of Конечно, для большого количества серверов в любом случае никогда не должно быть выключение .) В противном случае: то, что я делал в прошлом, - это создать резьбу для жгута; нить, которая ничего не делает, кроме как присоединить любые выдающиеся потоки, чтобы очистить их после них.

Я мог бы добавить, что это хороший пример случая, когда shared_ptr is не подходит. Вы хотите полный контроль над , когда происходит удаление; если вы отсоединились, вы можете сделать это в функции очистки (но, откровенно говоря, просто используя delete this; в конце лямбда в A::doSomethingThreaded выглядит более ); в противном случае вы сделаете это после того, как вы присоединились, в резьбе жгута .

EDIT:

Для жатки нити, что-то вроде должно работать:

class ReaperQueue 
{ 
    std::deque<A*> myQueue; 
    std::mutex myMutex; 
    std::conditional_variable myCond; 
    A* getOne() 
    { 
     std::lock<std::mutex> lock(myMutex); 
     myCond.wait(lock, [&](!myQueue.empty())); 
     A* results = myQueue.front(); 
     myQueue.pop_front(); 
     return results; 
    } 
public: 
    void readyToReap(A* finished_thread) 
    { 
     std::unique_lock<std::mutex> lock(myMutex); 
     myQueue.push_back(finished_thread); 
     myCond.notify_all(); 
    } 

    void reaperThread() 
    { 
     for (; ;) 
     { 
      A* mine = getOne(); 
      mine->somethingThread.join(); 
      delete mine; 
     } 
    } 
}; 

(Предупреждение: Я не проверял это, и я пытался использовать C++ 11 функциональность. Я только на самом деле реализовать это, в прошлом, с помощью Pthreads, так что могут быть некоторые ошибки. основные принципы следует проводить, однако.)

Для использования, создания экземпляра, то ул арт-поток, вызывающий reaperThread на нем. При очистке каждой нити вызовите readyToReap.

Для поддержки чистого отключения, вы можете использовать две очереди: вы вставить каждую нить в первый, так как он создается, а затем переместить его от первой до второй (который будет соответствовать myQueue , выше) в readyToReap. Чтобы выключить, вы ждете до тех пор, пока обе очереди не будут пустыми (не начинать никаких новых потоков в этот промежуток, конечно).

+0

Вы можете указать пример? – 4pie0

+0

@lizusek Пример для чего? Я предложил несколько разных решений. –

+0

пример того, как создать резьбу для жгута; поток, который ничего не делает, кроме как присоединиться к любым выдающимся потокам, чтобы очистить их после них. – 4pie0

1

Проблема заключается в том, что, поскольку вы управляете A с помощью общих указателей, указатель this, захваченный потоком лямбда, действительно должен быть общим указателем, а не необработанным указателем, чтобы он не зависал. Проблема заключается в том, что нет простого способа создать shared_ptr из необработанного указателя, когда у вас также нет общего shared_ptr.

Один из способов обойти это использовать shared_from_this:

class A : public enable_shared_from_this<A> { 
public: 
    void doSomethingThreaded(function<void()> cleanupFunction, function<bool()> getStopFlag) { 
     somethingThread = thread([cleanupFunction, getStopFlag, this]() { 
      shared_ptr<A> temp = shared_from_this(); 
      doSomething(getStopFlag); 
      cleanupFunction(); 
     }); 

это создает дополнительный shared_ptr к A объекта, который не держит его в живых, пока нить заканчивается.

Обратите внимание, что вы по-прежнему есть проблемы с join/detach что Джеймс Kanze идентифицированного - Каждый нить обязательно имеют либо join или detach называется на ней ровно один раз, прежде чем он будет уничтожен. Вы можете выполнить это требование, добавив вызов detach в поток lambda, если вы никогда не заботитесь о значении выхода потока.

У вас также есть потенциал для проблем, если doSomethingThreaded вызывается несколько раз на одном A объекта ...

0

Для тех, кто заинтересован, я взял ABIT обоих ответов (то есть Джеймс открепление предложение, и Крис 'предложение о shared_ptr's).

Моего полученный код выглядит, как это и кажется аккуратнее и не вызывает сбой при выключении или отсоединении клиента:

с использованием патезраса;

class A { 
public: 
    void doSomething(function<bool()> getStopFlag) { 
     ... 
    } 
private: 
    ... 
} 

class B { 
public: 
    void runServer(); 

    void stop() { 
     stopFlag = true; 
     waitForListToBeEmpty(); 
    } 
private: 
    void waitForListToBeEmpty() { ... }; 
    void handleAccept(...) { 
     shared_ptr<A> newClient(new A()); 
     { 
      unique_lock<mutex> lock(listMutex); 
      clientData.push_back(newClient); 
     } 
     thread clientThread([this, newClient]() { 
      // Capture the shared_ptr until thread over and done with. 

      newClient->doSomething([this]() { 
       return stopFlag; 
      }); 
      cleanup(newClient); 
     }); 
     // Detach to remove the need to store these threads until their completion. 
     clientThread.detach(); 
    } 

    void cleanup(shared_ptr<A> data) { 
     unique_lock<mutex> lock(listMutex); 
     clientData.remove(data); 
    } 

    list<shared_ptr<A>> clientData; // Can remove this if you don't 
            // need to connect with your clients. 
            // However, you'd need to make sure this 
            // didn't get deallocated before all clients 
            // finished as they reference the boolean stopFlag 
            // OR make it a shared_ptr to an atomic boolean 
    mutex listMutex; 
    atomc<bool> stopFlag; 
} 
Смежные вопросы