- Первый случай, когда вы используете один экземпляр
io_service
и вызова io_service::run
метод из нескольких потоков.
Позволяет видеть schduler::run
функцию (упрощенный):
std::size_t scheduler::run(asio::error_code& ec)
{
mutex::scoped_lock lock(mutex_);
std::size_t n = 0;
for (; do_run_one(lock, this_thread, ec); lock.lock())
if (n != (std::numeric_limits<std::size_t>::max)())
++n;
return n;
}
Так, с удержанной блокировкой, она вызывает метод do_run_one
, который что-то вроде:
std::size_t scheduler::do_run_one(mutex::scoped_lock& lock,
scheduler::thread_info& this_thread,
const asio::error_code& ec)
{
while (!stopped_)
{
if (!op_queue_.empty())
{
// Prepare to execute first handler from queue.
operation* o = op_queue_.front();
op_queue_.pop();
bool more_handlers = (!op_queue_.empty());
if (o == &task_operation_)
{
task_interrupted_ = more_handlers;
if (more_handlers && !one_thread_)
wakeup_event_.unlock_and_signal_one(lock);
else
lock.unlock();
task_cleanup on_exit = { this, &lock, &this_thread };
(void)on_exit;
task_->run(!more_handlers, this_thread.private_op_queue);
}
else
{
//......
}
}
else
{
wakeup_event_.clear(lock);
wakeup_event_.wait(lock);
}
}
return 0;
}
Интересная часть кода этих строк:
if (more_handlers && !one_thread_)
wakeup_event_.unlock_and_signal_one(lock);
else
lock.unlock();
Дело, которое мы обсуждаем сейчас, это одно с несколькими потоками, поэтому первое условие будет удовлетворять (если у нас в op_queue_ есть довольно много ожидающих задач).
wakeup_event_.unlock_and_signal_one
заканчивается, так это освобождение/разблокирование lock
и уведомление одного из пользователей, которые ждут условного ожидания. Таким образом, по крайней мере один другой поток (тот, кто получает блокировку) теперь может позвонить do_run_one
.
task_
в вашем случае epoll_reactor
как вы сказали. И в этом методе run
он вызывает epoll_wait
(не удерживая lock_
от scheduler
).
Интересная вещь здесь, что она делает, когда она перебирает все готовые дескрипторы, возвращаемые epoll_wait
. Он отталкивает их обратно в операционную очередь, полученную в качестве аргумента в аргументе. Операции подтолкнули теперь имеют тип времени выполнения descriptor_state
вместо task_operation_
:
for (int i = 0; i < num_events; ++i)
{
void* ptr = events[i].data.ptr;
if (ptr == &interrupter_)
{
// don't call work_started() here. This still allows the scheduler to
// stop if the only remaining operations are descriptor operations.
descriptor_state* descriptor_data = static_cast<descriptor_state*>(ptr);
descriptor_data->set_ready_events(events[i].events);
ops.push(descriptor_data);
}
}
Таким образом, в следующей итерации цикла, а внутри scheduler::do_run_one
, за выполненные задания, он попал в else
отделения (я опущен в моей пасте ранее):
else
{
std::size_t task_result = o->task_result_;
if (more_handlers && !one_thread_)
wake_one_thread_and_unlock(lock);
else
lock.unlock();
// Ensure the count of outstanding work is decremented on block exit.
work_cleanup on_exit = { this, &lock, &this_thread };
(void)on_exit;
// Complete the operation. May throw an exception. Deletes the object.
o->complete(this, ec, task_result);
return 1;
}
Что вызывается функция указателя complete
, который INTURN вероятно, будет вызывать пользователя передается дескриптор async_read
или async_write
API.
- Второй случай, где вы создаете пул
io_service
объектов и вызвать его метод run
на 1 или более потоков т.е. отображение между io_service
и thread
может быть 1: 1 или 1: N, которые могут удовлетворить заявку. Таким образом, вы можете назначить объект io_service
объекту soucket
в круговом режиме.
Теперь, приходя на ваш вопрос:
Если я хочу использовать Epoll в многопоточности, или я могу использовать «EPOLLONESHOT» так, что он может гарантировать, что одна нить обрабатывать один сокет в одном время.
Если я правильно понял это, вы хотите обработать все события в сокет, используя 1 поток? Я думаю, что это возможно с помощью подхода номер 2, т. Е. Создать пул из io_service
объектов и сопоставить его с 1 потоком. Таким образом, вы можете быть уверены, что вся деятельность на конкретном сокете будет решаться только одним потоком, то есть потоком, на котором находится этот io_service:run
.
Вам не нужно беспокоиться о настройке EPOLLONESHOT
в вышеуказанном случае.
Я не уверен в том, что получаю такое же поведение, используя первый подход, который представляет собой несколько потоков и 1 io_service
.
Но если вы не используете нитки вообще, то ваш io_service
работает на одной ветке, тогда вам не нужно беспокоиться обо всем этом, ведь цель asio - абстрагировать все эти вещи.
Вам ничего особенного не нужно делать. Если вы не считаете, что лучше знаете, как многопоточность на вашей платформе, чем разработчики ASIO, не связывайтесь с ней. Это уже лучший дизайн, который они смогли придумать. –