2016-02-26 2 views
0

У меня возникли проблемы с производительностью Boost ASIO UDP - или по крайней мере - я так думаю.Производительность Boost UDP Sockets - низкая пропускная способность?

Я поставил вместе простой клиентское приложение/сервер для иллюстрации:

  1. Клиент начинает посылать большие (65К) пакеты UDP к серверу - медленно
  2. сервер немедленно отправляет обратно пакет ACK когда пакет получен
  3. Клиент следит за скоростью, в которую входят эти пакеты ACK, и отправляет исходящие пакеты немного быстрее - для того, чтобы получить скорость.

(потеря пакетов/повторной передачи и т.д. не является проблемой в примере кода.)

Когда первоначально пишу это в C# Клиент быстро достиг предела NIC - около 125 мегабайт/сек. При переписывании того же кода на C++ с Boost - скорости останавливаются со скоростью менее 2 мегабайт/сек.

enter image description here

ЦП для клиента и сервера на 1-2%. Нет проблем с памятью. Запуск сервера и клиента на localhost. Запуск Windows 10, VS 2013, Boost 1.60.

Я попытался использовать методы async send/receive Boost, но это, похоже, не помогло. How to increase throughput of Boost ASIO, UDP client application

Есть ли проблема, связанная с отправкой/получением двух потоков из одного и того же буфера расширения? C++ Socket Server - Unable to saturate CPU

PS - Я начал программирования C++ 2 месяца назад - так что может быть действительно глупая ошибка где-то здесь .. но я не могу видеть, где именно. Любая помощь/мысли были бы значительно оценены.

КЛИЕНТ:

#pragma once 
#include <boost/smart_ptr/shared_ptr.hpp> 
#include <boost/asio.hpp> 
#include <boost/thread/v2/thread.hpp> 
#include <boost/thread/detail/thread.hpp> 
#include <chrono> 
#include <boost/lexical_cast.hpp> 

#define EXPOSE_IN_DLL __declspec(dllexport) 

namespace dummy{ 

    EXPOSE_IN_DLL class DummyClient 
    { 
     public: 

      // How fast the client is receiving acks - e.g. every 200 ms. 
      double MillisecondsPerAck = 200; 

      //How fast the client should be sending packages (meaning - how long it should sleep between packages) 
      long long MinimumOfMillisecondsToUsePerSentPackage = 200; 

      //How often the code should throttle (calculate new value for 'MinimumOfMillisecondsToUsePerSentPackage') 
      int intervalMilliseconds = 500; 

      //How much faster we should send than receive (how fast we should increase the speed) 
      const double ThrottleAgressiveness = 0.7; 

      //Counters 
      int AcksSinceLastTime = 0; 
      int AcksLastTime = 0; 
      int PackagesSent = 0; 
      int AcksReceived = 0; 
      long long BytesSentAndAcked = 0; 

      //Size of UDP Package to send. IP Layer (NIC) will break larger packages into smaller ones (MTU). 
      static const int PacketSize = 65000; 

      std::shared_ptr<boost::asio::io_service> io_service; 
      std::shared_ptr<boost::asio::ip::udp::socket> socket; 
      std::shared_ptr<boost::asio::ip::udp::endpoint> udpEndPoint; 

      EXPOSE_IN_DLL DummyClient() 
      { 
       boost::thread receiverThread(&DummyClient::SendPackages, this); 
       receiverThread.detach(); 

       using namespace std::chrono; 
       auto started = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); 

       while (true){ 

        boost::this_thread::sleep_for(boost::chrono::milliseconds(100)); 

        auto elapsedMilliseconds = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count() - started; 

        system("cls"); 
        std::cout << "     ############ UDP CLIENT ############ \n\n"; 
        std::cout << "        Packages sent: " << PackagesSent << "\n"; 
        std::cout << "        Acks received: " << AcksReceived << "\n"; 
        std::cout << "       Bytes delivered: " << BytesToSize(BytesSentAndAcked) << "\n"; 
        std::cout << "---------------------------------------------------------------------------\n"; 

        if (AcksReceived > 0 && PackagesSent > 0) 
         std::cout << "       Acked percentage: " << 100.0 * AcksReceived/PackagesSent << "\n"; 

        std::cout << "---------------------------------------------------------------------------\n"; 
        std::cout << "         Bandwidth: " << BytesToSize(1000 * BytesSentAndAcked/elapsedMilliseconds) << "/sec\n"; 
        std::cout << "---------------------------------------------------------------------------\n"; 
        std::cout << "      MilliSecondsPerAck: " << MillisecondsPerAck << "\n"; 
        std::cout << "      ThrottleAgressiveness: " << ThrottleAgressiveness << "\n"; 
        std::cout << " MinimumOfMillisecondsToUsePerSentPackage: " << MinimumOfMillisecondsToUsePerSentPackage << "\n"; 
        std::cout << "---------------------------------------------------------------------------\n"; 
       } 
      } 

      EXPOSE_IN_DLL ~DummyClient(){ 

      }; 

    private: 

     void SendPackages(){ 
      io_service = std::make_shared<boost::asio::io_service>(); 
      socket = std::make_shared<boost::asio::ip::udp::socket>(*io_service); 
      udpEndPoint = std::make_shared<boost::asio::ip::udp::endpoint>(boost::asio::ip::address_v4::from_string("127.0.0.1"), 56000); 
      socket->open(boost::asio::ip::udp::v4()); 

      //Start throtteling thread 
      boost::thread throttleThread(&DummyClient::ThrottleThread, this); 

      //Start Receiver thread - that listens for ACK packages from the Dummy server 
      boost::thread receiverThread(&DummyClient::ReceiverThread, this); 

      //Start sending packages - slowly 
      unsigned char dummyData[PacketSize]; 
      auto bufferToSend = boost::asio::buffer(dummyData, PacketSize); 
      for (int i = 0; i < 100000; i++) 
      { 
       //Send 
       socket->send_to(bufferToSend, *udpEndPoint, boost::asio::socket_base::message_do_not_route); 

       PackagesSent++; 

       //Check if we need to sleep a little (throtteling) before sending next package 
       if (MinimumOfMillisecondsToUsePerSentPackage > 0) 
        boost::this_thread::sleep_for(boost::chrono::milliseconds(MinimumOfMillisecondsToUsePerSentPackage)); 
      } 
     } 

     //"Acks" are received here - we are just counting them to get the rate they're coming in at. 
     void ReceiverThread(){ 

      //Need to wait until first package is sent - so that the local andpoint is bound 
      while (PackagesSent == 0){ 
       boost::this_thread::sleep_for(boost::chrono::milliseconds(100)); 
      } 

      //Set up receiving buffer 
      unsigned char receiveBuffer[100]; 
      auto bufferToReceiveInto = boost::asio::buffer(receiveBuffer, 100); 
      boost::asio::ip::udp::endpoint senderEndPoint; 

      //Start Receiving! 
      while (true){ 
       socket->receive_from(bufferToReceiveInto, senderEndPoint); 
       AcksReceived++; 
       BytesSentAndAcked += PacketSize; 
      } 
     } 


     //Counts acks per interval - and adjusts outgoing speed accordingly. 
     void ThrottleThread() 
     { 
      while (true) 
      { 
       boost::this_thread::sleep_for(boost::chrono::milliseconds(intervalMilliseconds)); 

       //Find out how many packages we got since last time - and send a little bit faster than this number. 
       if (PackagesSent > 0) 
       { 
        AcksSinceLastTime = AcksReceived - AcksLastTime; 
        AcksLastTime = AcksReceived; 

        if (AcksSinceLastTime == 0) 
        { 
         //No new packages got acked - slow the down! 
         MinimumOfMillisecondsToUsePerSentPackage = 200; 
         continue; 
        } 

        //Increase sending speed to a little bit faster than we're receiving packages 
        MillisecondsPerAck = 1.0 * intervalMilliseconds/AcksSinceLastTime; 
        MinimumOfMillisecondsToUsePerSentPackage = MillisecondsPerAck * ThrottleAgressiveness; 
       } 
      } 
     } 


     //Util method 
     std::string BytesToSize(long long Bytes){ 
      float tb = 1099511627776; 
      float gb = 1073741824; 
      float mb = 1048576; 
      float kb = 1024; 

      std::string returnSize = ""; 

      if (Bytes >= tb){ 
       returnSize += boost::lexical_cast<std::string>((float)Bytes/tb); 
       returnSize += " TB"; 
      } 
      else if (Bytes >= gb && Bytes < tb) 
      { 
       returnSize += boost::lexical_cast<std::string>((float)Bytes/gb); 
       returnSize += " GB"; 
      } 
      else if (Bytes >= mb && Bytes < gb){ 
       returnSize += boost::lexical_cast<std::string>((float)Bytes/mb); 
       returnSize += " MB"; 
      } 
      else if (Bytes >= kb && Bytes < mb){ 
       returnSize += boost::lexical_cast<std::string>((float)Bytes/kb); 
       returnSize += " KB"; 
      } 
      else{ 
       returnSize += boost::lexical_cast<std::string>(Bytes); 
       returnSize += " Bytes"; 
      } 
      return returnSize; 
     } 
    }; 
} 

SERVER:

#pragma once 

#include <boost/asio.hpp> 
#include <boost/smart_ptr/shared_ptr.hpp> 
#include <boost/thread/v2/thread.hpp> 
#include <boost/thread/detail/thread.hpp> 

#define EXPOSE_IN_DLL __declspec(dllexport) 

using namespace boost::asio; 

namespace dummy{ 
class DummyServer 
{ 
    public: 

     std::shared_ptr<ip::udp::socket> listenSocket; 
     std::shared_ptr<io_service> listenIoService; 

     int PackagesReceived = 0; 

     EXPOSE_IN_DLL DummyServer() 
     { 
      boost::thread receiverThread(&DummyServer::ReceivePackages, this); 
      receiverThread.detach(); 

      //Print status 
      while (true){ 
       boost::this_thread::sleep_for(boost::chrono::milliseconds(100)); 
       system("cls"); 
       std::cout << "  ############ UDP SERVER ############ \n\n"; 
       std::cout << "  Packages received: " << PackagesReceived << "\n"; 
      } 
     } 

     EXPOSE_IN_DLL ~DummyServer(){} 


     void ReceivePackages(){ 
      //Start Receiver thread 
      listenIoService = std::make_shared<io_service>(); 
      listenSocket = std::make_shared<ip::udp::socket>(*listenIoService, ip::udp::endpoint(ip::udp::v4(), 56000)); 

      //Set up receiving buffer 
      auto bytesReceived = 0; 
      unsigned char receiveBuffer[1024 * 70]; 
      auto bufferToReceiveInto = boost::asio::buffer(receiveBuffer, 1024 * 70); 

      unsigned char returnBufferBuffer[10]; 
      auto bufferToSendBackToClient = boost::asio::buffer(returnBufferBuffer, 10); 

      ip::udp::endpoint senderEndPoint; 
      //Receive packages 
      while (true){ 
       listenSocket->receive_from(bufferToReceiveInto, senderEndPoint); 
       PackagesReceived++; 

       //Send an "Ack" package back to client - letting him know that a package successfully made it. 
       //Doesn't matter what the package contains - since client is just counting. 
       listenSocket->send_to(bufferToSendBackToClient, senderEndPoint); 
      } 
    } 
}; 

}

+0

Я бы потерял тег C#. он в лучшем случае тангенциальный. – user4581301

+0

Согласен. C# тег удален. –

+1

У вас есть тонны ошибок в вашем коде. Например, ваш 'ThrottleThread' может получить доступ к' PackagesSent', в то время как поток 'SendPackages' изменяет его. Это может привести к потерям. Вам необходимо защитить совместное состояние с помощью мьютекса или использовать соответствующие атомные типы/операции. –

ответ

1

Найдено проблема - в прошлом. Это было sleep_for, что вызвало низкие скорости.

Этот код

boost::this_thread::sleep_for(boost::chrono::milliseconds(20)); 

часто занимает более 30мс - когда другие потоки делают работу.

Я предполагаю, что мне нужно использовать другую стратегию дросселирования (подсчет пакетов в полете и т. Д.) - или использовать высокоточный мультимедийный таймер.

Смежные вопросы