Я пишу UDP-сервер, который в настоящее время получает данные из UDP, обертывает его в объект и помещает их в параллельную очередь. Параллельная очередь представляет собой реализацию, представленную здесь: http://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.htmlПобочные эффекты глобальных статических переменных
Пул рабочих потоков извлекает данные из очереди для обработки.
Очередь определена глобально, как:
static concurrent_queue<boost::shared_ptr<Msg> > g_work_queue_;
Теперь проблема у меня в том, что если я просто написать функцию для получения данных и вставить его в очередь и создать некоторые потребительские нити, чтобы вытащить их это прекрасно работает. Но в тот момент, когда я добавляю своего производителя на основе UDP, рабочие потоки перестают получать уведомления о поступлении данных в очередь.
Я отслеживал проблему до конца функции push в concurrent_queue. В частности, строка: the_condition_variable.notify_one(); Не возвращается при использовании моего сетевого кода.
Таким образом, проблема связана с тем, как я написал сетевой код.
Вот как это выглядит.
enum
{
MAX_LENGTH = 1500
};
class Msg
{
public:
Msg()
{
static int i = 0;
i_ = i++;
printf("Construct ObbsMsg: %d\n", i_);
}
~Msg()
{
printf("Destruct ObbsMsg: %d\n", i_);
}
const char* toString() { return data_; }
private:
friend class server;
udp::endpoint sender_endpoint_;
char data_[MAX_LENGTH];
int i_;
};
class server
{
public:
server::server(boost::asio::io_service& io_service)
: io_service_(io_service),
socket_(io_service, udp::endpoint(udp::v4(), PORT))
{
waitForNextMessage();
}
void server::waitForNextMessage()
{
printf("Waiting for next msg\n");
next_msg_.reset(new Msg());
socket_.async_receive_from(
boost::asio::buffer(next_msg_->data_, MAX_LENGTH), sender_endpoint_,
boost::bind(&server::handleReceiveFrom, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
void server::handleReceiveFrom(const boost::system::error_code& error, size_t bytes_recvd)
{
if (!error && bytes_recvd > 0) {
printf("got data: %s. Adding to work queue\n", next_msg_->toString());
g_work_queue.push(next_msg_); // Add received msg to work queue
waitForNextMessage();
} else {
waitForNextMessage();
}
}
private:
boost::asio::io_service& io_service_;
udp::socket socket_;
udp::endpoint sender_endpoint_;
boost::shared_ptr<Msg> next_msg_;
}
int main(int argc, char* argv[])
{
try{
boost::asio::io_service io_service;
server s(io_service);
io_service.run();
catch(std::exception& e){
std::err << "Exception: " << e.what() << std::endl;
}
return 0;
}
Теперь я обнаружил, что если handle_receive_from может вернуться затем notify_one() в concurrent_queue возвращается. Поэтому я думаю, что это потому, что у меня рекурсивный цикл. Итак, каков правильный способ начать прослушивание новых данных? и пример асинхронного udp-сервера испорчен, поскольку я основывал его на том, что они уже делали.
РЕДАКТИРОВАТЬ: О'кей, вопрос просто еще более странный.
Что я не упоминал здесь, так это то, что у меня есть класс, называемый процессором. процессора выглядит следующим образом:
class processor
{
public:
processor::processor(int thread_pool_size) :
thread_pool_size_(thread_pool_size) { }
void start()
{
boost::thread_group threads;
for (std::size_t i = 0; i < thread_pool_size_; ++i){
threads.create_thread(boost::bind(&ObbsServer::worker, this));
}
}
void worker()
{
while (true){
boost::shared_ptr<ObbsMsg> msg;
g_work_queue.wait_and_pop(msg);
printf("Got msg: %s\n", msg->toString());
}
}
private:
int thread_pool_size_;
};
Теперь, кажется, что если я извлекаю функцию работника на своем собственном и начать резьбу от основных. оно работает! Может ли кто-нибудь объяснить, почему поток работает так, как я ожидал бы вне класса, но внутри он получил побочные эффекты?
EDIT2: Теперь это становится еще страннее еще
Я вытащил две функции (точно такие же).
Один называется потребителем, другой рабочий.
т.е.
void worker()
{
while (true){
boost::shared_ptr<ObbsMsg> msg;
printf("waiting for msg\n");
g_work_queue.wait_and_pop(msg);
printf("Got msg: %s\n", msg->toString());
}
}
void consumer()
{
while (true){
boost::shared_ptr<ObbsMsg> msg;
printf("waiting for msg\n");
g_work_queue.wait_and_pop(msg);
printf("Got msg: %s\n", msg->toString());
}
}
Теперь потребитель живет в верхней части файла server.cpp. То есть где также работает наш код сервера.
С другой стороны, рабочий живет в файле processor.cpp.
Теперь я не использую процессор вообще на данный момент.Основная функция теперь выглядит следующим образом:
void consumer();
void worker();
int main(int argc, char* argv[])
{
try {
boost::asio::io_service io_service;
server net(io_service);
//processor s(7);
boost::thread_group threads;
for (std::size_t i = 0; i < 7; ++i){
threads.create_thread(worker); // this doesn't work
// threads.create_thread(consumer); // THIS WORKS!?!?!?
}
// s.start();
printf("Server Started...\n");
boost::asio::io_service::work work(io_service);
io_service.run();
printf("exiting...\n");
} catch (std::exception& e) {
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}
Почему это, что потребитель может получить в очереди элементы, но работник не является. Они идентичны реализациям с разными именами.
Это не имеет никакого смысла. Есть идеи?
Вот пример вывода при получении .txt "Hello World":
Выход 1: не работает. При вызове функции employee или использовании класса процессора.
Construct ObbsMsg: 0
waiting for msg
waiting for msg
waiting for msg
waiting for msg
waiting for msg
waiting for msg
Server Started...
waiting for msg
got data: hello world. Adding to work queue
Construct ObbsMsg: 1
Выход 2: работает при вызове функции потребителя, которая идентична функции рабочего.
Construct ObbsMsg: 0
waiting for msg
waiting for msg
waiting for msg
waiting for msg
waiting for msg
waiting for msg
Server Started...
waiting for msg
got data: hello world. Adding to work queue
Construct ObbsMsg: 1
Got msg: hello world <----- this is what I've been wanting to see!
Destruct ObbsMsg: 0
waiting for msg
Лучшее имя поможет другим найти это в будущем. –
Спасибо, я надеюсь, что имя теперь имеет больше смысла. Выучил большой урок здесь. – Matt