2009-05-19 3 views
19

Хотя асинхронный IO (неблокирующие дескрипторы с select/poll/epoll/kqueue и т. Д.) Не является самой документированной вещью в Интернете, есть несколько хороших примеров.tidy code for asynchronous IO

Однако все эти примеры, определяя дескрипторы, возвращаемые вызовом, имеют только «do_some_io(fd)». Они действительно не объясняют, как наилучшим образом подойти к фактическому асинхронному IO в таком методе.

Блокировка ввода-вывода очень аккуратная и простая в считывании кода. С другой стороны, неблокирующий, async IO, волосатый и грязный.

Какие подходы существуют? Что такое надежное и читаемое?

void do_some_io(int fd) { 
    switch(state) { 
    case STEP1: 
     ... async calls 
     if(io_would_block) 
      return; 
     state = STEP2; 
    case STEP2: 
     ... more async calls 
     if(io_would_block) 
      return; 
     state = STEP3; 
    case STEP3: 
     ... 
    } 
} 

или, возможно, (аб) с использованием вычисленных последовательно открывает ССЗ:

#define concatentate(x,y) x##y 
#define async_read_xx(var,bytes,line)  \ 
    concatentate(jmp,line):     \ 
    if(!do_async_read(bytes,&var)) {   \ 
     schedule(EPOLLIN);     \ 
     jmp_read = &&concatentate(jmp,line); \ 
     return;        \ 
} 

// macros for making async code read like sync code 
#define async_read(var,bytes) \ 
    async_read_xx(var,bytes,__LINE__) 

#define async_resume()   \ 
    if(jmp_read) {    \ 
     void* target = jmp_read; \ 
     jmp_read = NULL;   \ 
     goto *target;   \ 
    } 

void do_some_io() { 
    async_resume(); 
    async_read(something,sizeof(something)); 
    async_read(something_else,sizeof(something_else)); 
} 

или, возможно, исключения C++ и государственную машину, поэтому функции работник может вызвать преждевременное прекращение/резюме немного, или, возможно, с приводом стола Государственный аппарат?

Его не так, как заставить его работать, как его поддерживать, чтобы я гонялся!

ответ

16

Предлагаю взглянуть на: http://www.kegel.com/c10k.html, вторые взглянуть на существующие библиотеки, такие как libevent, Boost.Asio, которые уже выполняют работу и видят, как они работают.

Дела в том, что подход может быть различным для каждого типа системного вызова:

  • выбора прост реактор
  • Epoll оба краев или уровня сигнала интерфейса, который требует иного подхода
  • IOCP является proactor требуют другого подхода

предложения: использовать хорошую существующую библиотеку как Boost.Asio для C++ или Libevent для C.

EDIT: Это как ASIO обрабатывает этот

class connection { 
    boost::asio:ip::tcp::socket socket_; 
public: 
    void run() 
    { 
     // for variable length chunks 
     async_read_until(socket_,resizable_buffer,'\n', 
       boost::bind(&run::on_line_recieved,this,errorplacehplder); 
     // or constant length chunks 
     async_read(socket_,buffer(some_buf,buf_size), 
       boost::bind(&run::on_line_recieved,this,errorplacehplder); 
    } 
    void on_line_recieved(error e) 
    { 
     // handle it 
     run(); 
    } 

}; 

Поскольку ASIO работает как proactor уведомляет вас, когда операция завершена и ручки EWOULDBLOCK внутри.

Если слово как реактор можно смоделировать такое поведение:

class conn { 
    // Application logic 

    void run() { 
     read_chunk(&conn::on_chunk_read,size); 
    } 
    void on_chunk_read() { 
     /* do something;*/ 
    } 

    // Proactor wrappers 

    void read_chunk(void (conn::*callback),int size, int start_point=0) { 
     read(socket,buffer+start,size) 
     if(complete) 
      (this->*callback() 
     else { 
      this -> tmp_size-=size-read; 
      this -> tmp_start=start+read; 
      this -> tmp_callback=callback 
      your_event_library_register_op_on_readable(callback,socket,this); 
     } 
    } 
    void callback() 
    { 
     read_chunk(tmp_callback,tmp_size,tmp_start); 
    } 
} 

Что-то вроде этого.

+0

Libevent буферизации событий оберток, с высокими и низкими водяными знаками даже , являются удобным способом избежать сложной обработки ввода-вывода; но как вы представляете это состояние в коде, который вызывает его, который должен быть возобновлен? – Will

+0

EDITED: добавлен пример – Artyom

+0

спасибо Артем, я надеюсь, что люди googling epoll и такие найдут это! – Will

5

Государственные машины - один прекрасный подход. Это немного сложная задача, которая спасет вас от головных болей в будущем, когда будущее начнется действительно, очень скоро. ;-)

Другой метод - использовать потоки и блокировать ввод-вывод на одном fd в каждом потоке. Компромисс заключается в том, что вы делаете ввод-вывод простым, но может ввести сложность в синхронизации.

+11

Быстрый пример государственной машины для асинхронного ИО был бы полезно – Will

+0

Я хотел бы видеть, что пример слишком – Viet

0

Вы хотите отделить «io» от обработки, после чего прочитанный вами код станет очень читаемым.В основном у вас есть:


    int read_io_event(...) { /* triggers when we get a read event from epoll/poll/whatever */ 

    /* read data from "fd" into a vstr/buffer/whatever */ 

    if (/* read failed */) /* return failure code to event callback */ ; 

    if (/* "message" received */) return process_io_event(); 

    if (/* we've read "too much" */) /* return failure code to event callback */ ; 

    return /* keep going code for event callback */ ; 
    } 


    int process_io_event(...) { 
     /* this is where you process the HTTP request/whatever */ 
    } 

... то реальный код в случае процесса, и даже если у вас есть несколько запросов ответов это довольно читаемым, вы просто делаете «вернуть read_io_event()» после того, как установка состояния или любой другой ,

+2

создание буфера достаточно хорошо работает на уровне строки или сообщения; но когда вы разбираете что-то более сложное, как вы представляете это состояние в обработчике process_io_event()? – Will

3

Для решения этой проблемы существует большой шаблон дизайна «сопрограмма».

Это лучшее из обоих миров: аккуратный код, точно такой же, как синхронный поток io и отличная производительность без переключения контекста, например async io дает. Coroutine выглядит как одиночный синхронный поток, с одним указателем инструкции. Но многие сопрограммы могут работать в пределах одного потока ОС (так называемая «совместная многозадачность»).

Пример сопрограммный Код:

void do_some_io() { 
    blocking_read(something,sizeof(something)); 
    blocking_read(something_else,sizeof(something_else)); 
    blocking_write(something,sizeof(something)); 
} 

Похож синхронный код, но в потоке управления фактически использовать другой способ, как это:

void do_some_io() { 
    // return control to network io scheduler, to handle another coroutine 
    blocking_read(something,sizeof(something)); 
    // when "something" is read, scheduler fill given buffer and resume this coroutine 

    // return control to network io scheduler, to handle another coroutine 
    CoroSleep(1000); 
    // scheduler create async timer and when it fires, scheduler pass control to this coroutine 
    ... 
    // and so on 

Так однотридовое управление планировщиком много сопрограммы с определенным пользователем код и порядок синхронных вызовов в io.

пример ++ реализация сопрограммы С «boost.coroutine» (на самом деле не является часть подъема :) http://www.crystalclearsoftware.com/soc/coroutine/ Этой библиотека полностью реализует сопрограммы механики и может использовать Boost.Asio в качестве планировщика и асинхронного Io слоя.

+0

[Boost.Coroutine] (http://www.boost.org/doc/libs/release/libs/coroutine/doc/html/index.html) теперь является частью повышения. – Mankarse

1

У вас должен быть основной цикл, который предоставляет async_schedule(), async_foreach(), async_tick() и т. Д. Эти функции, в свою очередь, помещают записи в глобальный список методов, которые будут выполняться при следующем вызове async_tick(). Затем вы можете написать код, который будет намного более аккуратным и не включает никаких операторов switch.

Вы можете просто написать:

async_schedule(callback, arg, timeout); 

Или:

async_wait(condition, callback, arg, timeout); 

Тогда ваше состояние даже может быть установлен в другом потоке (при условии, что вы заботитесь о безопасности потоков при обращении к этой переменной).

Я реализовал асинхронную структуру в C для моего внедренного проекта, потому что я хотел иметь неперехватную многозадачность, а асинхронность идеально подходит для выполнения многих задач, выполняя небольшую работу на каждой итерации основного цикла.

Кода здесь: https://github.com/mkschreder/fortmax-blocks/blob/master/common/kernel/async.c

+1

Ссылка больше не работает. –