2016-10-31 3 views
1

Я прочитал исходный код Boost ASIO, и я хочу узнать, что это только одна тема для его вызова epoll_wait (конечно, если я использую реактор epoll).
Я хочу найти решение более чем одного потока для вызова epoll_wait, это может привести к тому, что разные потоки будут выполнять чтение для одного и того же сокета одновременно. Я прочитал некоторые ключевые коды следующим образом:Для повышения io_service только один поток заблокирован на epoll_wait?

// Prepare to execute first handler from queue. 
     operation* o = op_queue_.front(); 
     op_queue_.pop(); 
     bool more_handlers = (!op_queue_.empty()); 

     if (o == &task_operation_) 
     { 
     task_interrupted_ = more_handlers; 

     if (more_handlers && !one_thread_) 
      wakeup_event_.unlock_and_signal_one(lock); 
     else 
      lock.unlock(); 

     task_cleanup on_exit = { this, &lock, &this_thread }; 
     (void)on_exit; 

     // Run the task. May throw an exception. Only block if the operation 
     // queue is empty and we're not polling, otherwise we want to return 
     // as soon as possible. 
     task_->run(!more_handlers, this_thread.private_op_queue); 
     } 

task_ является Epoll реактор и он будет вызывать epoll_wait в перспективе, я предполагаю, что это может только один поток, чтобы назвать это, потому что только один «task_operation_» в op_queue_, я прав ?
Если я хочу использовать epoll в многопоточном режиме, или я могу использовать «EPOLLONESHOT», чтобы он мог гарантировать, что один поток обрабатывает один сокет за один раз.

+3

Вам ничего особенного не нужно делать. Если вы не считаете, что лучше знаете, как многопоточность на вашей платформе, чем разработчики ASIO, не связывайтесь с ней. Это уже лучший дизайн, который они смогли придумать. –

ответ

0
  • Первый случай, когда вы используете один экземпляр io_service и вызова io_service::run метод из нескольких потоков.

Позволяет видеть schduler::run функцию (упрощенный):

std::size_t scheduler::run(asio::error_code& ec) 
{ 
    mutex::scoped_lock lock(mutex_); 

    std::size_t n = 0; 
    for (; do_run_one(lock, this_thread, ec); lock.lock()) 
    if (n != (std::numeric_limits<std::size_t>::max)()) 
     ++n; 
    return n; 
} 

Так, с удержанной блокировкой, она вызывает метод do_run_one, который что-то вроде:

std::size_t scheduler::do_run_one(mutex::scoped_lock& lock, 
    scheduler::thread_info& this_thread, 
    const asio::error_code& ec) 
{ 
    while (!stopped_) 
    { 
    if (!op_queue_.empty()) 
    { 
     // Prepare to execute first handler from queue. 
     operation* o = op_queue_.front(); 
     op_queue_.pop(); 
     bool more_handlers = (!op_queue_.empty()); 

     if (o == &task_operation_) 
     { 
     task_interrupted_ = more_handlers; 

     if (more_handlers && !one_thread_) 
      wakeup_event_.unlock_and_signal_one(lock); 
     else 
      lock.unlock(); 

     task_cleanup on_exit = { this, &lock, &this_thread }; 
     (void)on_exit; 

     task_->run(!more_handlers, this_thread.private_op_queue); 
     } 
     else 
     { 
     //...... 
     } 
    } 
    else 
    { 
     wakeup_event_.clear(lock); 
     wakeup_event_.wait(lock); 
    } 
    } 

    return 0; 
} 

Интересная часть кода этих строк:

if (more_handlers && !one_thread_) 
    wakeup_event_.unlock_and_signal_one(lock); 
else 
    lock.unlock(); 

Дело, которое мы обсуждаем сейчас, это одно с несколькими потоками, поэтому первое условие будет удовлетворять (если у нас в op_queue_ есть довольно много ожидающих задач).

wakeup_event_.unlock_and_signal_one заканчивается, так это освобождение/разблокирование lock и уведомление одного из пользователей, которые ждут условного ожидания. Таким образом, по крайней мере один другой поток (тот, кто получает блокировку) теперь может позвонить do_run_one.

task_ в вашем случае epoll_reactor как вы сказали. И в этом методе run он вызывает epoll_wait (не удерживая lock_ от scheduler).

Интересная вещь здесь, что она делает, когда она перебирает все готовые дескрипторы, возвращаемые epoll_wait. Он отталкивает их обратно в операционную очередь, полученную в качестве аргумента в аргументе. Операции подтолкнули теперь имеют тип времени выполнения descriptor_state вместо task_operation_:

for (int i = 0; i < num_events; ++i) 
    { 
    void* ptr = events[i].data.ptr; 
    if (ptr == &interrupter_) 
    { 
     // don't call work_started() here. This still allows the scheduler to 
     // stop if the only remaining operations are descriptor operations. 
     descriptor_state* descriptor_data = static_cast<descriptor_state*>(ptr); 
     descriptor_data->set_ready_events(events[i].events); 
     ops.push(descriptor_data); 
    } 
    } 

Таким образом, в следующей итерации цикла, а внутри scheduler::do_run_one, за выполненные задания, он попал в else отделения (я опущен в моей пасте ранее):

 else 
     { 
     std::size_t task_result = o->task_result_; 

     if (more_handlers && !one_thread_) 
      wake_one_thread_and_unlock(lock); 
     else 
      lock.unlock(); 

     // Ensure the count of outstanding work is decremented on block exit. 
     work_cleanup on_exit = { this, &lock, &this_thread }; 
     (void)on_exit; 

     // Complete the operation. May throw an exception. Deletes the object. 
     o->complete(this, ec, task_result); 

     return 1; 
     } 

Что вызывается функция указателя complete, который INTURN вероятно, будет вызывать пользователя передается дескриптор async_read или async_write API.

  • Второй случай, где вы создаете пул io_service объектов и вызвать его метод run на 1 или более потоков т.е. отображение между io_service и thread может быть 1: 1 или 1: N, которые могут удовлетворить заявку. Таким образом, вы можете назначить объект io_service объекту soucket в круговом режиме.

Теперь, приходя на ваш вопрос:

Если я хочу использовать Epoll в многопоточности, или я могу использовать «EPOLLONESHOT» так, что он может гарантировать, что одна нить обрабатывать один сокет в одном время.

Если я правильно понял это, вы хотите обработать все события в сокет, используя 1 поток? Я думаю, что это возможно с помощью подхода номер 2, т. Е. Создать пул из io_service объектов и сопоставить его с 1 потоком. Таким образом, вы можете быть уверены, что вся деятельность на конкретном сокете будет решаться только одним потоком, то есть потоком, на котором находится этот io_service:run.

Вам не нужно беспокоиться о настройке EPOLLONESHOT в вышеуказанном случае.

Я не уверен в том, что получаю такое же поведение, используя первый подход, который представляет собой несколько потоков и 1 io_service.

Но если вы не используете нитки вообще, то ваш io_service работает на одной ветке, тогда вам не нужно беспокоиться обо всем этом, ведь цель asio - абстрагировать все эти вещи.

+0

Спасибо, что много писали. Это мне очень помогает. Огромное спасибо. – NeoLiu

+0

@NeoLiu Просто как общая информация о stackoverflow. Если вы найдете какой-либо ответ, действительно полезный для вашего заданного вопроса, вы должны «принять» его, щелкнув знак «Проверить» ниже счетчика (вверху справа от ответа). Я вижу, вы до сих пор не приняли никакого ответа, и, поскольку вы знаете об этом сейчас, вы можете начать принимать их :) – Arunmu

1

Только один поток будет ссылаться на epoll_wait. Как только поток получает уведомления о событиях для дескрипторов, он демультиплексирует дескрипторы ко всем потокам, работающим с io_service. Согласно Platform-Specific Implementation Notes:

Темы:

  • демультиплексирования с использованием epoll выполняются в одном из потоков, которые называют io_service::run(), io_service::run_one(), io_service::poll() или io_service::poll_one().

Один дескриптор будет обработан одним потоком, который будет выполнять операции ввода/вывода. Следовательно, при использовании асинхронных операций I/O не будет выполняться одновременно для данного сокета.