2016-07-05 2 views
0

У меня есть рабочее приложение-чат-сервер в C++ на малине pi, которое прослушивает для клиентов и отправляет сообщения от одного клиента другим, и наоборот, используя pthreads для циклов. Я использую C# в качестве клиентов.серверное/клиентское приложение реального времени в C++ и C#

Клиенты C# непрерывно отправляют и принимают данные (один байт) на сервер RPi и регистрируют время отправки и приема данных.

Оглядываясь на занесенные в журнал времена, я вижу, что между тем, когда один клиент отправляет, а второй получает данные, происходит задержка в 100 мс или около того. Такая задержка неприемлема для моего приложения. Мне нужно получить его менее 15 мс последовательно.

В моей программе на C++ временная задержка между получением и отправкой байта обратно клиенту составляет 1-2 мс.

Я не уверен, есть ли проблема в том, как я кодировал C# -сервер или сервер C++. Я обновил свое ядро ​​с помощью патча RT PREEMPT, но это не повлияло на время задержки.

Если я поставил случайную задержку порядка секунд перед отправкой байта на сервер в программе C#, время задержки значительно улучшилось - до 1-2 мс.

Есть ли способ оптимизировать его, даже если он отправляется непрерывно, время задержки очень мало? Я могу отправлять коды, если это необходимо.

** EDIT: Вот код сервера на RPi.

#include <iostream> 
#include <stdio.h> 
#include <stdlib.h> 
#include <cstdlib> 
#include <iostream> 
#include <cstring> 
#include <string.h> // memset 
#include <pthread.h> 
#include <sys/socket.h> 
#include <arpa/inet.h> 
#include <netinet/in.h> 
#include <unistd.h> 
#include <netdb.h> 
#include <vector> 
#include <string> 
#include <fstream> 
#include <iomanip> 
#include <algorithm> 
#include <sys/time.h> 
#include <sched.h> 
#include <time.h> 
#include <sys/mman.h> 
using namespace std;  

int BACKLOG; 
#define IP_ADDR "192.168.137.99" 
#define PORT "8888" 
#define MAXLEN 1 
#define MY_PRIORITY (49) /* we use 49 as the PRREMPT_RT use 50 
          as the priority of kernel tasklets 
          and interrupt handler by default */ 

#define MAX_SAFE_STACK (8*1024) /* The maximum stack size which is 
            guaranteed safe to access without 
            faulting */ 

#define NSEC_PER_SEC (1000000000) /* The number of nsecs per sec. */ 
static unsigned int cli_count = 0; 

vector<int> cliarray; 
vector<vector<unsigned long long> > data; 
pthread_attr_t custom_sched_attr; 
int fifo_max_prio, fifo_min_prio; 
struct sched_param fifo_param; 

void stack_prefault(void) { 

     unsigned char dummy[MAX_SAFE_STACK]; 

     memset(dummy, 0, MAX_SAFE_STACK); 
     return; 
} 


void send_message(char *s, int sock){ 
    int i; 
    for(i=0;i<BACKLOG;i++){ 
     if(cliarray[i]){ 
      if(cliarray[i] != sock){ 
       send(cliarray[i], s, 1,0); 
      } 
     } 
    } 
} 

/* Send message to all clients */ 
void send_message_all(char *s){ 
    int k; 
    for(k=0;k<BACKLOG;k++){ 
     if(cliarray[k]){ 
      send(cliarray[k], s, 1,0); 
     } 
    } 
} 



void *handle_conn(void *pnewsock) 
{ 
    int sock = *(int*)pnewsock; 


    char client_msg[MAXLEN]; 

    int read_size; 
    struct timeval tv; 
    bool looprun = true; 
    int clientint; 
    vector<unsigned long long> row(4); 
    while(looprun){ 


    read_size = recv(sock, client_msg, 1, 0); 
    gettimeofday(&tv, NULL); 

     unsigned long long milliseconds_recv =(unsigned long long)(tv.tv_sec) * 1000 +(unsigned long long)(tv.tv_usec)/1000; 

    clientint = int(*client_msg); 



    client_msg[read_size] = '\0'; 
    /* cout << "length of client message: " << strlen(client_msg) << endl; 
     cout << "# bytes is : " << read_size << endl;  
     cout << clientint << " received" << endl;*/ 


    send_message(client_msg,sock); 
    gettimeofday(&tv, NULL); 
    unsigned long long milliseconds_sent =(unsigned long long)(tv.tv_sec) * 1000 +(unsigned long long)(tv.tv_usec)/1000; 



    row = {clientint, milliseconds_recv, milliseconds_sent, strlen(client_msg)}; 
    data.push_back(row); 


    if (clientint == 100) 
     { 
     looprun = false; 


     break; 
     } 

    } 
    cout << "exit handle -conn " << endl; 

    pthread_exit(NULL); 

} 
int main (int argc, char **argv) 
{ 

     struct timespec t; 
     struct sched_param param; 
     int interval = 50000; /* 50us*/ 

     /* Declare ourself as a real time task */ 

     param.sched_priority = MY_PRIORITY; 
     if(sched_setscheduler(0, SCHED_FIFO, &param) == -1) { 
       perror("sched_setscheduler failed"); 
       exit(-1); 
     } 

     /* Lock memory */ 

     if(mlockall(MCL_CURRENT|MCL_FUTURE) == -1) { 
       perror("mlockall failed"); 
       exit(-2); 
     } 

     /* Pre-fault our stack */ 

     stack_prefault(); 
    int connfd =0, n = 0; 
    int *new_sock, sock; 



    struct addrinfo hints, *res; 
    int reuseaddr = 1; // True 

    // Get the address info 
    memset(&hints, 0, sizeof hints); 
    hints.ai_family = AF_INET; 
    hints.ai_socktype = SOCK_STREAM; 
    if (getaddrinfo(IP_ADDR, PORT, &hints, &res) != 0) { 
     perror("getaddrinfo"); 
     exit (EXIT_FAILURE); 
     //return 1; 
    } 

    // Create the socket 
    sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol); 
    if (sock == -1) { 
     perror("socket"); 
     exit (EXIT_FAILURE); 
     // return 1; 
    } 

    // Enable the socket to reuse the address 
    if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &reuseaddr, sizeof(int)) == -1) { 
     perror("setsockopt"); 
     ::close(sock); 
     exit (EXIT_FAILURE); 
     //shutdown(sock,2); 
     // return 1; 
    } 

    // Bind to the address 
    if (bind(sock, res->ai_addr, res->ai_addrlen) == -1) { 
     perror("bind"); 
     ::close(sock); 
     exit (EXIT_FAILURE); 
     //shutdown(sock,2); 
     //return 0; 
    } 

    freeaddrinfo(res); 

    // Listen 
    if (listen(sock, BACKLOG) == -1) { 
     perror("listen"); 
     exit (EXIT_FAILURE); 
     // return 0; 
    } 
    cout << "Enter # clients: " ; 
    cin >> BACKLOG; 
    cout << "Enter name of text file (num clients - trial #).txt:" << endl; 
    string filename; 
    cin >> filename; 
    cout << "listening for connections" << endl; 
    // Main loop 

    // Main loop 
    bool running = true; 
    // Initialize clients 
    while (running) 
    { 
     size_t size = sizeof(struct sockaddr_in); 
     struct sockaddr_in their_addr; 
     int clilen = sizeof(their_addr); 
     int newsock = accept(sock, (struct sockaddr*)&their_addr, &size); 
     if (newsock == -1) 
     { 
     perror("accept"); 
     exit (EXIT_FAILURE); 
     // return -1; 
     } 
     cli_count++; 
     printf("Got a connection from %s on port %d\n", inet_ntoa(their_addr.sin_addr), htons(their_addr.sin_port)); 
     cliarray.push_back(newsock); 
     if (cli_count == BACKLOG) 
     { 
     cout << "Max clients reached" << endl; 
     running = false; 
     break; 
     } 
    } 

    ofstream frout("/home/pi/cplus/"+filename,ios::app); 
    frout << "recv \t" << "time recv (ms) \t" << "time sent (ms) \t" << "length of msg" << endl; 

    /* Send message to all clients that server is ready to accept data */ 

    char r = char(cli_count); 

    char *mesg = &r; 

    send_message_all(mesg); 

    cout << "length of mesg: " << strlen(mesg) << endl; 
    //pthread_t from_ard_t, *ptr; 
    pthread_attr_init(&custom_sched_attr); 
    pthread_attr_setinheritsched(&custom_sched_attr, PTHREAD_EXPLICIT_SCHED); 
    pthread_attr_setschedpolicy(&custom_sched_attr, SCHED_FIFO); 
    fifo_max_prio = sched_get_priority_max(SCHED_FIFO); 
    fifo_param.sched_priority = fifo_max_prio; 
    pthread_attr_setschedparam(&custom_sched_attr, &fifo_param); 
    pthread_t *ptr; 
    ptr =static_cast<pthread_t*>(malloc(sizeof(pthread_t)*cli_count)); 


    int i; 
    for (i=0;i<BACKLOG;i++) 
    { 
     if (pthread_create(&ptr[i], &custom_sched_attr, handle_conn, (void *)&cliarray[i]) != 0)//was newsock 
      { 
     fprintf(stderr, "Failed to create thread\n"); 
     exit (EXIT_FAILURE); 
     } 

    } 

    /*if (pthread_create(&from_ard_t, NULL, from_ard, NULL)!=0) 
    { 
     fprintf(stderr, "Failed to create thread\n"); 
    }*/ 

    //pthread_join(from_ard_t, NULL); 
    cout << "Created threads" << endl; 
    for(i = 0; i < BACKLOG; i++) 
    { 
     pthread_join(ptr[i], NULL); 
    } 
    cout << "joined send/recv threads" << endl; 


    close(sock); 

    /* array for timestamp data */ 
    int numrows = data.size(); 

    for (int k = 0; k < numrows; k++) 
    { 
     for (int j = 0; j < 4; j++) 
     { 
      frout << data[k][j] << "\t"; 
     } 
     frout << endl; 
    } 



} 

C# код клиента:

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Text; 
using System.Threading; 
using System.Net.Sockets; 
using System.IO; 
//make command line possible to save time info in file 

namespace sockclient_cs 
{ 

    public class Program 
    { 
     private System.Object lockThis = new System.Object(); 
     const int MAXLEN = 1; 
     public bool recvrun = true; 
     StringBuilder sb = new StringBuilder(); 
     public NetworkStream stream; 
     string fnrecv; 
     string fnsend; 
     public int clicount; 
     DateTime centuryBegin = new DateTime(1970, 1, 1); 
     Random rndseed; 
     public Program(NetworkStream streamer, int clinum, string pathsend, string pathrecv, Random rand) 
     { 
      stream = streamer; 
      clicount = clinum; 
      fnrecv = pathrecv; 
      fnsend = pathsend; 
      rndseed = rand; 

     } 


     public void SendData() 
     { 
      int[] numarray = new int[] { 70, 80, 90, 100, 60, 50, 40, 30}; // coressponds to %, A, P, _, d 
      bool looprun = true; 

      while (looprun) 
      {    

       int rnd1 = rndseed.Next(0, numarray.Length); 
       byte[] writebyte = new byte[] { BitConverter.GetBytes(numarray[rnd1])[0] }; 


       int delay = rndseed.Next(2000,6000); 

       Thread.Sleep(delay); 

       Array.Reverse(writebyte); 



       stream.Write(writebyte, 0, writebyte.Length); 
       DateTime currentDate = DateTime.Now; 
       long elapsedTicks = currentDate.Ticks - centuryBegin.Ticks; 
       Decimal milliseconds = elapsedTicks/(Decimal)TimeSpan.TicksPerMillisecond; 

       using (StreamWriter sw = File.AppendText(fnsend)) 
       { 
        sw.WriteLine(numarray[rnd1] + "\t" + milliseconds + "\n"); 

       } 


       Console.Write("sent: " + numarray[rnd1] + "\n"); 
       if (numarray[rnd1] == 100) 
       { 
        looprun = false; 
        break; 
       } 

      } 

     } 
     public void ReceiveData() 
     { 
      bool recvrun = true; 
      int numenders = 0; 
      while (recvrun) 
      { 

       String responseData = String.Empty; 
       byte[] bb = new byte[1]; //1 byte of data coming in 
       ASCIIEncoding ascii = new ASCIIEncoding(); 
       int bytes; 

        bytes = stream.Read(bb, 0, bb.Length); 

       DateTime currentDate = DateTime.Now; 
       long elapsedTicks = currentDate.Ticks - centuryBegin.Ticks; 
       Decimal milliseconds = elapsedTicks/(Decimal)TimeSpan.TicksPerMillisecond; 
       int numback = BitConverter.ToInt16(new byte[] { bb[0], 0x00 }, 0); 
       using (StreamWriter sw = File.AppendText(fnrecv)) 
       { 
        sw.WriteLine(numback + "\t" + milliseconds + "\n"); 

       } 

       //responseData = ascii.GetString(bb, 0, bytes); 






        Console.WriteLine("Received: " + bb[0] + "\n"); 
        if (numback == 100) 
        { 
         numenders++; 
         if (numenders == clicount-1) 
         { 
          recvrun = false; 
          break; 
         } 

        } 







      } 
      Console.Write("Exiting receive"); 
     } 
    } 
    public class Simple 
    { 


     public static void Main() 
     { 

      Console.Write("Enter name of recv data file (ex. cli1recv_1.txt):\n"); 
      string recvfile = Console.ReadLine(); 
      string pathrecv = @"C:\Users\Neha\Documents\Ayaz Research\" + recvfile; 
      Console.Write("Enter name of send data file (ex. cli4send_1.txt):\n"); 
      string sendfile = Console.ReadLine(); 
      string pathsend = @"C:\Users\Neha\Documents\Ayaz Research\" + sendfile; 

      using (StreamWriter sw = File.CreateText(pathrecv)) 
      { 
       sw.WriteLine("Received \t Recv time (ms) \n"); 

      } 

      using (StreamWriter sw = File.CreateText(pathsend)) 
      { 
       sw.WriteLine("Sent \t Sent time (ms) \n"); 

      } 
      //SerialPort Serial1 = new SerialPort("COM1", 9600, Parity.None, 8, StopBits.One); 

      Random seed = new Random((int)DateTime.Now.Ticks & 0x0000FFFF); 
      try 
      { 


       TcpClient tcpclnt = new TcpClient(); 
       Console.WriteLine("Connecting..."); 
       tcpclnt.Connect("192.168.137.99", 8888); //address of RPi on arbitrary non privileged port 
       Console.WriteLine("Connected"); 
       NetworkStream stream = tcpclnt.GetStream(); 

       /*Receive the welcome from server */ 
       String responseData = String.Empty; 
       Byte[] bb = new byte[2]; //1 byte of data coming in 
       ASCIIEncoding ascii = new ASCIIEncoding(); 
       int bytes = stream.Read(bb, 0, bb.Length); 

       int numback = BitConverter.ToInt16(new byte[] { bb[0], 0x00 }, 0); 
       Console.Write("Received initial message from server: " + bb[0] + "\n"); 


       /*byte[] writebyte = new byte[] { BitConverter.GetBytes(82)[0] }; 

       Console.Write("writebyte length is " + writebyte.Length + "\n"); 
       Array.Reverse(writebyte); 
       stream.Write(writebyte, 0, writebyte.Length); 



       bytes = stream.Read(bb, 0,bb.Length); 
       // convert to string info 
       Console.Write("reading byte length is " + bb.Length + "\n"); 

       responseData = ascii.GetString(bb, 0, bytes); 
       Console.WriteLine("bb[0] is: " + bb[0] + "and bb[1] is: " + bb[1] + "\n"); 
       int numback = BitConverter.ToInt16(new byte[] { bb[0], 0x00 }, 0); 

       Console.WriteLine("Received: " + responseData + "\n"); 
       Console.WriteLine("Received: " + numback + "\n");*/ 

       Program clientObject = new Program(stream,numback,pathsend, pathrecv, seed); 
       //non loop format - for cppserv 

       ThreadStart sending = new ThreadStart(clientObject.SendData); 
       Thread sendThread = new Thread(sending); 
       sendThread.Start(); 

       ThreadStart receiving = new ThreadStart(clientObject.ReceiveData); 
       Thread recvThread = new Thread(receiving); 
       recvThread.Start(); 

       sendThread.Join(); 
       recvThread.Join(); 

       tcpclnt.Close(); 

      } 
      catch (Exception e) 
      { 
       Console.WriteLine("Error...." + e.StackTrace); 
      } 
     } 
    } 

} 

Вот что Клиент 2 отправляет Клиенту 1 и метку времени.

Sent  Sent time (ms) 

70 1467720189893.1576 

80 1467720189912.1587 

60 1467720189926.1595 

60 1467720189937.1602 

50 1467720189949.1608 

60 1467720189959.1614 

40 1467720189969.162 

100 1467720190006.1641 

Вот что клиент 1 получает от клиента 2 и отметки времени.

Received  Recv time (ms) 

70 1467720190016.1647 

80 1467720190063.1674 

60 1467720190079.1683 

60 1467720190109.17 

50 1467720190126.171 

60 1467720190137.1716 

40 1467720190149.1723 

100 1467720190161.173 
+0

Как эти устройства подключены? И как вы отправляете данные прямо сейчас? Через TCP/IP? Или UART? –

+1

Чтобы быть ясным, клиент A отправляет сообщение серверу, который передает его клиенту B? И требуется 100 мс для приема B после отправки A? Вы используете TCP/IP? Датаграммы? Трудно сказать, что может происходить без просмотра кода для отправки и получения на клиенте и сервере. –

+0

Задержка может произойти откуда угодно. Что такое сетевая задержка? Вы уверены, что журналы не лгут вам? Вы уверены, что правильно интерпретируете журналы? Можете ли вы разместить их здесь, чтобы мы могли сделать такую ​​же решимость? Как выглядит ваш код? Проводили ли вы сетевые тесты для проверки сети между вашими устройствами? Вы пытались заменить RPi? Я написал приложения на C#, которые говорят о нескольких коммутаторах и достигают субмиллисекундной задержки. Проблема не в вычислительной задержке. – antiduh

ответ

0

Выключите алгоритм Nagle у отправителя

Socket.NoDelay = true 
+0

Я переключился на UDP, и я вижу ту же странную задержку в синхронизации на стороне клиента C# (я переключился на использование класса секундомера). Когда я одновременно запускаю Wireshark для захвата сетевых пакетов, он не показывает никаких больших задержек. Все меньше 2 мс. Это заставляет меня подозревать, что по-прежнему есть некоторая ошибка в механизме синхронизации, который я реализовал, но я понятия не имею, как исправить эту проблему. – nt387

Смежные вопросы