2015-02-12 2 views
5

Ищете повышение: asio (и с его помощью boost) решил написать асинхронный сервер. Для хранения входящих данных я использую boost :: asio :: streambuf. У меня проблема. Когда я получаю второе сообщение от клиента, а затем вижу, что в буфере содержатся данные из предыдущих сообщений. Хотя я вызываю метод Consume во входном буфере. Что не так?Работа с boost :: asio :: streambuf

class tcp_connection 
// Using shared_ptr and enable_shared_from_this 
// because we want to keep the tcp_connection object alive 
// as long as there is an operation that refers to it. 
: public boost::enable_shared_from_this<tcp_connection> 
{ 
... 

boost::asio::streambuf receive_buffer; 

boost::asio::io_service::strand strand; 
} 

... 

void tcp_connection::receive() 
{ 
// Read the response status line. The response_ streambuf will 
// automatically grow to accommodate the entire line. The growth may be 
// limited by passing a maximum size to the streambuf constructor. 
boost::asio::async_read_until(m_socket, receive_buffer, "\r\n", 
    strand.wrap(boost::bind(&tcp_connection::handle_receive, shared_from_this()/*this*/, 
    boost::asio::placeholders::error, 
    boost::asio::placeholders::bytes_transferred))); 

} 


void tcp_connection::handle_receive(const boost::system::error_code& error, 
std::size_t bytes_transferred) 
{ 

if (!error) 
{ 
    // process the data 

    /* boost::asio::async_read_until remarks 

    After a successful async_read_until operation, 
    the streambuf may contain additional data beyond the delimiter. 
    An application will typically leave that data in the streambuf for a 
    subsequent async_read_until operation to examine. 
    */ 

    /* didn't work  
    std::istream is(&receive_buffer); 
    std::string line; 
    std::getline(is, line); 
    */ 


    // clean up incomming buffer but it didn't work 
    receive_buffer.consume(bytes_transferred); 

    receive(); 

} 
else if (error != boost::asio::error::operation_aborted) 
{ 
    std::cout << "Client Disconnected\n"; 

    m_connection_manager.remove(shared_from_this()); 
} 
} 

ответ

17

Либо используя std::istream и чтение из него, например, путем std::getline(), или явно вызывая boost::asio::streambuf::consume(n), будут удалены данные из входной последовательности.
Если приложение выполняет любой из этих и последующих операций read_until(), результаты дублируются в вводной последовательности receive_buffer, тогда дублированные данные, скорее всего, происходят из удаленного однорангового узла. Если удаленный одноранговый узел записывает в сокет и напрямую использует входную последовательность streambuf, удаленный одноранговый узел должен явно вызывать consume() после каждой успешной операции записи.


Как отмечается в документации, успешные read_until() операций могут содержать дополнительные данные, помимо разделителя, включая дополнительные разделители. Например, если "[email protected]@" записывается в сокет, операция read_until() с использованием '@' в качестве разделителя может считывать и фиксировать "[email protected]@" входной последовательности streambuf. Однако операция будет указывать на то, что количество переданных байтов составляет до и включая первый разделитель. Таким образом, bytes_transferred будет 2 и streambuf.size() будет 4. После того, как потребляется 2 байтов, входная последовательность streambuf будет содержать "[email protected]", и последующий вызов read_until() будет немедленно возвращен, так как streambuf уже содержит разделитель.

Вот полный пример demonstratingstreambuf использование для чтения и записи, и как входная последовательность потребляется:

#include <iostream> 
#include <boost/asio.hpp> 
#include <boost/bind.hpp> 

// This example is not interested in the handlers, so provide a noop function 
// that will be passed to bind to meet the handler concept requirements. 
void noop() {} 

std::string make_string(boost::asio::streambuf& streambuf) 
{ 
return {buffers_begin(streambuf.data()), 
     buffers_end(streambuf.data())}; 
} 

int main() 
{ 
    using boost::asio::ip::tcp; 
    boost::asio::io_service io_service; 

    // Create all I/O objects. 
    tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 0)); 
    tcp::socket server_socket(io_service); 
    tcp::socket client_socket(io_service); 

    // Connect client and server sockets. 
    acceptor.async_accept(server_socket, boost::bind(&noop)); 
    client_socket.async_connect(acceptor.local_endpoint(), boost::bind(&noop)); 
    io_service.run(); 

    // Write to server. 
    boost::asio::streambuf write_buffer; 
    std::ostream output(&write_buffer); 
    output << "[email protected]" 
      "[email protected]"; 
    write(server_socket, write_buffer.data()); 
    std::cout << "Wrote: " << make_string(write_buffer) << std::endl; 
    assert(write_buffer.size() == 4); // Data not consumed. 

    // Read from the client. 
    boost::asio::streambuf read_buffer; 

    // Demonstrate consuming via istream. 
    { 
    std::cout << "Read" << std::endl; 
    auto bytes_transferred = read_until(client_socket, read_buffer, '@'); 
    // Verify that the entire write_buffer (data pass the first delimiter) was 
    // read into read_buffer. 
    auto initial_size = read_buffer.size(); 
    assert(initial_size == write_buffer.size()); 

    // Read from the streambuf. 
    std::cout << "Read buffer contains: " << make_string(read_buffer) 
       << std::endl; 
    std::istream input(&read_buffer); 
    std::string line; 
    getline(input, line, '@'); // Consumes from the streambuf. 
    assert("a" == line); // Note getline discards delimiter. 
    std::cout << "Read consumed: " << line << "@" << std::endl; 
    assert(read_buffer.size() == initial_size - bytes_transferred); 
    } 

    // Write an additional message to the server, but only consume '[email protected]' 
    // from write buffer. The buffer will contain '[email protected]@'. 
    write_buffer.consume(2); 
    std::cout << "Consumed write buffer, it now contains: " << 
       make_string(write_buffer) << std::endl; 
    assert(write_buffer.size() == 2); 
    output << "[email protected]"; 
    assert(write_buffer.size() == 4); 
    write(server_socket, write_buffer.data()); 
    std::cout << "Wrote: " << make_string(write_buffer) << std::endl; 

    // Demonstrate explicitly consuming via the streambuf. 
    { 
    std::cout << "Read" << std::endl; 
    auto initial_size = read_buffer.size(); 
    auto bytes_transferred = read_until(client_socket, read_buffer, '@'); 
    // Verify that the read operation did not attempt to read data from 
    // the socket, as the streambuf already contained the delimiter. 
    assert(initial_size == read_buffer.size()); 

    // Read from the streambuf. 
    std::cout << "Read buffer contains: " << make_string(read_buffer) 
       << std::endl; 
    std::string line(
     boost::asio::buffers_begin(read_buffer.data()), 
     boost::asio::buffers_begin(read_buffer.data()) + bytes_transferred); 
    assert("[email protected]" == line); 
    assert(read_buffer.size() == initial_size); // Nothing consumed. 
    read_buffer.consume(bytes_transferred); // Explicitly consume. 
    std::cout << "Read consumed: " << line << std::endl; 
    assert(read_buffer.size() == 0); 
    } 

    // Read again. 
    { 
    std::cout << "Read" << std::endl; 
    read_until(client_socket, read_buffer, '@'); 

    // Read from the streambuf. 
    std::cout << "Read buffer contains: " << make_string(read_buffer) 
       << std::endl; 
    std::istream input(&read_buffer); 
    std::string line; 
    getline(input, line, '@'); // Consumes from the streambuf. 
    assert("b" == line); // Note "b" is expected and not "c". 
    std::cout << "Read consumed: " << line << "@" << std::endl; 
    std::cout << "Read buffer contains: " << make_string(read_buffer) 
       << std::endl; 
    } 
} 

Выход:

Wrote: [email protected]@ 
Read 
Read buffer contains: [email protected]@ 
Read consumed: [email protected] 
Consumed write buffer, it now contains: [email protected] 
Wrote: [email protected]@ 
Read 
Read buffer contains: [email protected] 
Read consumed: [email protected] 
Read 
Read buffer contains: [email protected]@ 
Read consumed: [email protected] 
Read buffer contains: [email protected] 
+0

Спасибо, это очень хороший пример работа с streambuf. Я нашел, почему я вижу данные из предыдущих сообщений после отправки. На клиенте функция Send() использует глобальный буфер streambuf (send_buffer в моем примере). Как мне сделать, что каждый вызов Send() использует собственный объект streambuf? Что-то вроде boost :: make_shared? Это правильный подход? У вас есть хороший пример сервера и клиента с помощью boost :: asio? Это то, что в документации Boost d не подходит мне, там используются буферы постоянной длины, и я хотел бы посмотреть пример использования streambuf. – vint

+2

Удивительный, очень полезный ответ. Я хочу, чтобы люди Бостона могли написать вам всю свою документацию! ;-) – Dronz

+0

Как вы получили доступ к входной последовательности, не комментируя ее ('commit()')? [data()] (http://www.boost.org/doc/libs/1_61_0/doc/html/boost_asio/reference/basic_streambuf/data.html) говорит, что 'data()' возвращает список входных буферов и согласно документации [commit()] (http://www.boost.org/doc/libs/1_61_0/doc/html/boost_asio/reference/basic_streambuf/commit.html), она фактически копирует символы из выходной последовательности на вход последовательность. –

Смежные вопросы