2010-08-05 6 views
0

Я пишу UDP-сервер, который в настоящее время получает данные из UDP, обертывает его в объект и помещает их в параллельную очередь. Параллельная очередь представляет собой реализацию, представленную здесь: http://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.htmlПобочные эффекты глобальных статических переменных

Пул рабочих потоков извлекает данные из очереди для обработки.

Очередь определена глобально, как:

static concurrent_queue<boost::shared_ptr<Msg> > g_work_queue_; 

Теперь проблема у меня в том, что если я просто написать функцию для получения данных и вставить его в очередь и создать некоторые потребительские нити, чтобы вытащить их это прекрасно работает. Но в тот момент, когда я добавляю своего производителя на основе UDP, рабочие потоки перестают получать уведомления о поступлении данных в очередь.

Я отслеживал проблему до конца функции push в concurrent_queue. В частности, строка: the_condition_variable.notify_one(); Не возвращается при использовании моего сетевого кода.

Таким образом, проблема связана с тем, как я написал сетевой код.

Вот как это выглядит.

enum 
{ 
    MAX_LENGTH = 1500 
}; 


class Msg 
{ 
    public: 
    Msg() 
    { 
     static int i = 0; 
     i_ = i++; 
     printf("Construct ObbsMsg: %d\n", i_); 
    } 

    ~Msg() 
    { 
     printf("Destruct ObbsMsg: %d\n", i_); 
    } 

    const char* toString() { return data_; } 

    private: 
    friend class server; 

    udp::endpoint sender_endpoint_; 
    char data_[MAX_LENGTH]; 
    int i_; 
}; 

class server 
{ 
public: 
    server::server(boost::asio::io_service& io_service) 
    : io_service_(io_service), 
     socket_(io_service, udp::endpoint(udp::v4(), PORT)) 
    { 
    waitForNextMessage(); 
    } 

    void server::waitForNextMessage() 
    { 
    printf("Waiting for next msg\n"); 

    next_msg_.reset(new Msg()); 

    socket_.async_receive_from(
     boost::asio::buffer(next_msg_->data_, MAX_LENGTH), sender_endpoint_, 
     boost::bind(&server::handleReceiveFrom, this, 
        boost::asio::placeholders::error, 
        boost::asio::placeholders::bytes_transferred)); 
    } 

    void server::handleReceiveFrom(const boost::system::error_code& error, size_t bytes_recvd) 
    { 
    if (!error && bytes_recvd > 0) { 
     printf("got data: %s. Adding to work queue\n", next_msg_->toString()); 
     g_work_queue.push(next_msg_); // Add received msg to work queue 
     waitForNextMessage(); 
    } else { 
     waitForNextMessage(); 
    } 
    } 

private: 
    boost::asio::io_service& io_service_; 
    udp::socket socket_; 

    udp::endpoint sender_endpoint_; 
    boost::shared_ptr<Msg> next_msg_; 
} 

int main(int argc, char* argv[]) 
{ 
    try{ 
     boost::asio::io_service io_service; 
     server s(io_service); 
     io_service.run(); 
    catch(std::exception& e){ 
     std::err << "Exception: " << e.what() << std::endl; 
    } 
    return 0; 
} 

Теперь я обнаружил, что если handle_receive_from может вернуться затем notify_one() в concurrent_queue возвращается. Поэтому я думаю, что это потому, что у меня рекурсивный цикл. Итак, каков правильный способ начать прослушивание новых данных? и пример асинхронного udp-сервера испорчен, поскольку я основывал его на том, что они уже делали.

РЕДАКТИРОВАТЬ: О'кей, вопрос просто еще более странный.

Что я не упоминал здесь, так это то, что у меня есть класс, называемый процессором. процессора выглядит следующим образом:

class processor 
{ 
public: 
    processor::processor(int thread_pool_size) : 
     thread_pool_size_(thread_pool_size) { } 

    void start() 
    { 
    boost::thread_group threads; 
    for (std::size_t i = 0; i < thread_pool_size_; ++i){ 
     threads.create_thread(boost::bind(&ObbsServer::worker, this)); 
    } 
    } 

    void worker() 
    { 
    while (true){ 
     boost::shared_ptr<ObbsMsg> msg; 
     g_work_queue.wait_and_pop(msg); 
     printf("Got msg: %s\n", msg->toString()); 
    } 
    } 

private: 
    int thread_pool_size_; 
}; 

Теперь, кажется, что если я извлекаю функцию работника на своем собственном и начать резьбу от основных. оно работает! Может ли кто-нибудь объяснить, почему поток работает так, как я ожидал бы вне класса, но внутри он получил побочные эффекты?

EDIT2: Теперь это становится еще страннее еще

Я вытащил две функции (точно такие же).

Один называется потребителем, другой рабочий.

т.е.

void worker() 
{ 
    while (true){ 
     boost::shared_ptr<ObbsMsg> msg; 
     printf("waiting for msg\n"); 
     g_work_queue.wait_and_pop(msg); 
     printf("Got msg: %s\n", msg->toString()); 
    } 
} 

void consumer() 
{ 
    while (true){ 
     boost::shared_ptr<ObbsMsg> msg; 
     printf("waiting for msg\n"); 
     g_work_queue.wait_and_pop(msg); 
     printf("Got msg: %s\n", msg->toString()); 
    } 
} 

Теперь потребитель живет в верхней части файла server.cpp. То есть где также работает наш код сервера.

С другой стороны, рабочий живет в файле processor.cpp.

Теперь я не использую процессор вообще на данный момент.Основная функция теперь выглядит следующим образом:

void consumer(); 
void worker(); 

int main(int argc, char* argv[]) 
{ 
    try { 
     boost::asio::io_service io_service; 
     server net(io_service); 
     //processor s(7); 

     boost::thread_group threads; 
     for (std::size_t i = 0; i < 7; ++i){ 
      threads.create_thread(worker); // this doesn't work 
      // threads.create_thread(consumer); // THIS WORKS!?!?!? 
     } 

//  s.start(); 

     printf("Server Started...\n"); 
     boost::asio::io_service::work work(io_service); 
     io_service.run(); 

     printf("exiting...\n"); 
    } catch (std::exception& e) { 
     std::cerr << "Exception: " << e.what() << "\n"; 
    } 

    return 0; 
} 

Почему это, что потребитель может получить в очереди элементы, но работник не является. Они идентичны реализациям с разными именами.

Это не имеет никакого смысла. Есть идеи?

Вот пример вывода при получении .txt "Hello World":

Выход 1: не работает. При вызове функции employee или использовании класса процессора.

Construct ObbsMsg: 0 
waiting for msg 
waiting for msg 
waiting for msg 
waiting for msg 
waiting for msg 
waiting for msg 
Server Started... 
waiting for msg 
got data: hello world. Adding to work queue 
Construct ObbsMsg: 1 

Выход 2: работает при вызове функции потребителя, которая идентична функции рабочего.

Construct ObbsMsg: 0 
waiting for msg 
waiting for msg 
waiting for msg 
waiting for msg 
waiting for msg 
waiting for msg 
Server Started... 
waiting for msg 
got data: hello world. Adding to work queue 
Construct ObbsMsg: 1 
Got msg: hello world <----- this is what I've been wanting to see! 
Destruct ObbsMsg: 0 
waiting for msg 
+0

Лучшее имя поможет другим найти это в будущем. –

+0

Спасибо, я надеюсь, что имя теперь имеет больше смысла. Выучил большой урок здесь. – Matt

ответ

1

Чтобы ответить на мой вопрос.

Похоже, что проблема связана с объявлением g_work_queue;

Объявлено в файле заголовка как: static concurrent_queue < boost :: shared_ptr> g_work_queue;

Кажется, что объявлять его статичным не то, что я хочу делать. Видимо, что создает отдельный объект очереди для каждого скомпилированного файла .o и очевидно отдельные замки и т.д.

Это объясняет, почему, когда очередь манипулируют внутри того же исходного файла с потребителем и производителем в том же файле, что работал. Но когда в разных файлах это произошло не потому, что потоки действительно ожидали на разных объектах.

Итак, я обновил рабочую очередь так.

-- workqueue.h -- 
extern concurrent_queue< boost::shared_ptr<Msg> > g_work_queue; 

-- workqueue.cpp -- 
#include "workqueue.h" 
concurrent_queue< boost::shared_ptr<Msg> > g_work_queue; 

Выполнение этой задачи.

+0

Рекомендовать вас принять ваше решение затем :) –

Смежные вопросы