У меня есть рабочее приложение-чат-сервер в C++ на малине pi, которое прослушивает для клиентов и отправляет сообщения от одного клиента другим, и наоборот, используя pthreads для циклов. Я использую C# в качестве клиентов.серверное/клиентское приложение реального времени в C++ и C#
Клиенты C# непрерывно отправляют и принимают данные (один байт) на сервер RPi и регистрируют время отправки и приема данных.
Оглядываясь на занесенные в журнал времена, я вижу, что между тем, когда один клиент отправляет, а второй получает данные, происходит задержка в 100 мс или около того. Такая задержка неприемлема для моего приложения. Мне нужно получить его менее 15 мс последовательно.
В моей программе на C++ временная задержка между получением и отправкой байта обратно клиенту составляет 1-2 мс.
Я не уверен, есть ли проблема в том, как я кодировал C# -сервер или сервер C++. Я обновил свое ядро с помощью патча RT PREEMPT, но это не повлияло на время задержки.
Если я поставил случайную задержку порядка секунд перед отправкой байта на сервер в программе C#, время задержки значительно улучшилось - до 1-2 мс.
Есть ли способ оптимизировать его, даже если он отправляется непрерывно, время задержки очень мало? Я могу отправлять коды, если это необходимо.
** EDIT: Вот код сервера на RPi.
#include <iostream>
#include <stdio.h>
#include <stdlib.h>
#include <cstdlib>
#include <iostream>
#include <cstring>
#include <string.h> // memset
#include <pthread.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <unistd.h>
#include <netdb.h>
#include <vector>
#include <string>
#include <fstream>
#include <iomanip>
#include <algorithm>
#include <sys/time.h>
#include <sched.h>
#include <time.h>
#include <sys/mman.h>
using namespace std;
int BACKLOG;
#define IP_ADDR "192.168.137.99"
#define PORT "8888"
#define MAXLEN 1
#define MY_PRIORITY (49) /* we use 49 as the PRREMPT_RT use 50
as the priority of kernel tasklets
and interrupt handler by default */
#define MAX_SAFE_STACK (8*1024) /* The maximum stack size which is
guaranteed safe to access without
faulting */
#define NSEC_PER_SEC (1000000000) /* The number of nsecs per sec. */
static unsigned int cli_count = 0;
vector<int> cliarray;
vector<vector<unsigned long long> > data;
pthread_attr_t custom_sched_attr;
int fifo_max_prio, fifo_min_prio;
struct sched_param fifo_param;
void stack_prefault(void) {
unsigned char dummy[MAX_SAFE_STACK];
memset(dummy, 0, MAX_SAFE_STACK);
return;
}
void send_message(char *s, int sock){
int i;
for(i=0;i<BACKLOG;i++){
if(cliarray[i]){
if(cliarray[i] != sock){
send(cliarray[i], s, 1,0);
}
}
}
}
/* Send message to all clients */
void send_message_all(char *s){
int k;
for(k=0;k<BACKLOG;k++){
if(cliarray[k]){
send(cliarray[k], s, 1,0);
}
}
}
void *handle_conn(void *pnewsock)
{
int sock = *(int*)pnewsock;
char client_msg[MAXLEN];
int read_size;
struct timeval tv;
bool looprun = true;
int clientint;
vector<unsigned long long> row(4);
while(looprun){
read_size = recv(sock, client_msg, 1, 0);
gettimeofday(&tv, NULL);
unsigned long long milliseconds_recv =(unsigned long long)(tv.tv_sec) * 1000 +(unsigned long long)(tv.tv_usec)/1000;
clientint = int(*client_msg);
client_msg[read_size] = '\0';
/* cout << "length of client message: " << strlen(client_msg) << endl;
cout << "# bytes is : " << read_size << endl;
cout << clientint << " received" << endl;*/
send_message(client_msg,sock);
gettimeofday(&tv, NULL);
unsigned long long milliseconds_sent =(unsigned long long)(tv.tv_sec) * 1000 +(unsigned long long)(tv.tv_usec)/1000;
row = {clientint, milliseconds_recv, milliseconds_sent, strlen(client_msg)};
data.push_back(row);
if (clientint == 100)
{
looprun = false;
break;
}
}
cout << "exit handle -conn " << endl;
pthread_exit(NULL);
}
int main (int argc, char **argv)
{
struct timespec t;
struct sched_param param;
int interval = 50000; /* 50us*/
/* Declare ourself as a real time task */
param.sched_priority = MY_PRIORITY;
if(sched_setscheduler(0, SCHED_FIFO, ¶m) == -1) {
perror("sched_setscheduler failed");
exit(-1);
}
/* Lock memory */
if(mlockall(MCL_CURRENT|MCL_FUTURE) == -1) {
perror("mlockall failed");
exit(-2);
}
/* Pre-fault our stack */
stack_prefault();
int connfd =0, n = 0;
int *new_sock, sock;
struct addrinfo hints, *res;
int reuseaddr = 1; // True
// Get the address info
memset(&hints, 0, sizeof hints);
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
if (getaddrinfo(IP_ADDR, PORT, &hints, &res) != 0) {
perror("getaddrinfo");
exit (EXIT_FAILURE);
//return 1;
}
// Create the socket
sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
if (sock == -1) {
perror("socket");
exit (EXIT_FAILURE);
// return 1;
}
// Enable the socket to reuse the address
if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &reuseaddr, sizeof(int)) == -1) {
perror("setsockopt");
::close(sock);
exit (EXIT_FAILURE);
//shutdown(sock,2);
// return 1;
}
// Bind to the address
if (bind(sock, res->ai_addr, res->ai_addrlen) == -1) {
perror("bind");
::close(sock);
exit (EXIT_FAILURE);
//shutdown(sock,2);
//return 0;
}
freeaddrinfo(res);
// Listen
if (listen(sock, BACKLOG) == -1) {
perror("listen");
exit (EXIT_FAILURE);
// return 0;
}
cout << "Enter # clients: " ;
cin >> BACKLOG;
cout << "Enter name of text file (num clients - trial #).txt:" << endl;
string filename;
cin >> filename;
cout << "listening for connections" << endl;
// Main loop
// Main loop
bool running = true;
// Initialize clients
while (running)
{
size_t size = sizeof(struct sockaddr_in);
struct sockaddr_in their_addr;
int clilen = sizeof(their_addr);
int newsock = accept(sock, (struct sockaddr*)&their_addr, &size);
if (newsock == -1)
{
perror("accept");
exit (EXIT_FAILURE);
// return -1;
}
cli_count++;
printf("Got a connection from %s on port %d\n", inet_ntoa(their_addr.sin_addr), htons(their_addr.sin_port));
cliarray.push_back(newsock);
if (cli_count == BACKLOG)
{
cout << "Max clients reached" << endl;
running = false;
break;
}
}
ofstream frout("/home/pi/cplus/"+filename,ios::app);
frout << "recv \t" << "time recv (ms) \t" << "time sent (ms) \t" << "length of msg" << endl;
/* Send message to all clients that server is ready to accept data */
char r = char(cli_count);
char *mesg = &r;
send_message_all(mesg);
cout << "length of mesg: " << strlen(mesg) << endl;
//pthread_t from_ard_t, *ptr;
pthread_attr_init(&custom_sched_attr);
pthread_attr_setinheritsched(&custom_sched_attr, PTHREAD_EXPLICIT_SCHED);
pthread_attr_setschedpolicy(&custom_sched_attr, SCHED_FIFO);
fifo_max_prio = sched_get_priority_max(SCHED_FIFO);
fifo_param.sched_priority = fifo_max_prio;
pthread_attr_setschedparam(&custom_sched_attr, &fifo_param);
pthread_t *ptr;
ptr =static_cast<pthread_t*>(malloc(sizeof(pthread_t)*cli_count));
int i;
for (i=0;i<BACKLOG;i++)
{
if (pthread_create(&ptr[i], &custom_sched_attr, handle_conn, (void *)&cliarray[i]) != 0)//was newsock
{
fprintf(stderr, "Failed to create thread\n");
exit (EXIT_FAILURE);
}
}
/*if (pthread_create(&from_ard_t, NULL, from_ard, NULL)!=0)
{
fprintf(stderr, "Failed to create thread\n");
}*/
//pthread_join(from_ard_t, NULL);
cout << "Created threads" << endl;
for(i = 0; i < BACKLOG; i++)
{
pthread_join(ptr[i], NULL);
}
cout << "joined send/recv threads" << endl;
close(sock);
/* array for timestamp data */
int numrows = data.size();
for (int k = 0; k < numrows; k++)
{
for (int j = 0; j < 4; j++)
{
frout << data[k][j] << "\t";
}
frout << endl;
}
}
C# код клиента:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Net.Sockets;
using System.IO;
//make command line possible to save time info in file
namespace sockclient_cs
{
public class Program
{
private System.Object lockThis = new System.Object();
const int MAXLEN = 1;
public bool recvrun = true;
StringBuilder sb = new StringBuilder();
public NetworkStream stream;
string fnrecv;
string fnsend;
public int clicount;
DateTime centuryBegin = new DateTime(1970, 1, 1);
Random rndseed;
public Program(NetworkStream streamer, int clinum, string pathsend, string pathrecv, Random rand)
{
stream = streamer;
clicount = clinum;
fnrecv = pathrecv;
fnsend = pathsend;
rndseed = rand;
}
public void SendData()
{
int[] numarray = new int[] { 70, 80, 90, 100, 60, 50, 40, 30}; // coressponds to %, A, P, _, d
bool looprun = true;
while (looprun)
{
int rnd1 = rndseed.Next(0, numarray.Length);
byte[] writebyte = new byte[] { BitConverter.GetBytes(numarray[rnd1])[0] };
int delay = rndseed.Next(2000,6000);
Thread.Sleep(delay);
Array.Reverse(writebyte);
stream.Write(writebyte, 0, writebyte.Length);
DateTime currentDate = DateTime.Now;
long elapsedTicks = currentDate.Ticks - centuryBegin.Ticks;
Decimal milliseconds = elapsedTicks/(Decimal)TimeSpan.TicksPerMillisecond;
using (StreamWriter sw = File.AppendText(fnsend))
{
sw.WriteLine(numarray[rnd1] + "\t" + milliseconds + "\n");
}
Console.Write("sent: " + numarray[rnd1] + "\n");
if (numarray[rnd1] == 100)
{
looprun = false;
break;
}
}
}
public void ReceiveData()
{
bool recvrun = true;
int numenders = 0;
while (recvrun)
{
String responseData = String.Empty;
byte[] bb = new byte[1]; //1 byte of data coming in
ASCIIEncoding ascii = new ASCIIEncoding();
int bytes;
bytes = stream.Read(bb, 0, bb.Length);
DateTime currentDate = DateTime.Now;
long elapsedTicks = currentDate.Ticks - centuryBegin.Ticks;
Decimal milliseconds = elapsedTicks/(Decimal)TimeSpan.TicksPerMillisecond;
int numback = BitConverter.ToInt16(new byte[] { bb[0], 0x00 }, 0);
using (StreamWriter sw = File.AppendText(fnrecv))
{
sw.WriteLine(numback + "\t" + milliseconds + "\n");
}
//responseData = ascii.GetString(bb, 0, bytes);
Console.WriteLine("Received: " + bb[0] + "\n");
if (numback == 100)
{
numenders++;
if (numenders == clicount-1)
{
recvrun = false;
break;
}
}
}
Console.Write("Exiting receive");
}
}
public class Simple
{
public static void Main()
{
Console.Write("Enter name of recv data file (ex. cli1recv_1.txt):\n");
string recvfile = Console.ReadLine();
string pathrecv = @"C:\Users\Neha\Documents\Ayaz Research\" + recvfile;
Console.Write("Enter name of send data file (ex. cli4send_1.txt):\n");
string sendfile = Console.ReadLine();
string pathsend = @"C:\Users\Neha\Documents\Ayaz Research\" + sendfile;
using (StreamWriter sw = File.CreateText(pathrecv))
{
sw.WriteLine("Received \t Recv time (ms) \n");
}
using (StreamWriter sw = File.CreateText(pathsend))
{
sw.WriteLine("Sent \t Sent time (ms) \n");
}
//SerialPort Serial1 = new SerialPort("COM1", 9600, Parity.None, 8, StopBits.One);
Random seed = new Random((int)DateTime.Now.Ticks & 0x0000FFFF);
try
{
TcpClient tcpclnt = new TcpClient();
Console.WriteLine("Connecting...");
tcpclnt.Connect("192.168.137.99", 8888); //address of RPi on arbitrary non privileged port
Console.WriteLine("Connected");
NetworkStream stream = tcpclnt.GetStream();
/*Receive the welcome from server */
String responseData = String.Empty;
Byte[] bb = new byte[2]; //1 byte of data coming in
ASCIIEncoding ascii = new ASCIIEncoding();
int bytes = stream.Read(bb, 0, bb.Length);
int numback = BitConverter.ToInt16(new byte[] { bb[0], 0x00 }, 0);
Console.Write("Received initial message from server: " + bb[0] + "\n");
/*byte[] writebyte = new byte[] { BitConverter.GetBytes(82)[0] };
Console.Write("writebyte length is " + writebyte.Length + "\n");
Array.Reverse(writebyte);
stream.Write(writebyte, 0, writebyte.Length);
bytes = stream.Read(bb, 0,bb.Length);
// convert to string info
Console.Write("reading byte length is " + bb.Length + "\n");
responseData = ascii.GetString(bb, 0, bytes);
Console.WriteLine("bb[0] is: " + bb[0] + "and bb[1] is: " + bb[1] + "\n");
int numback = BitConverter.ToInt16(new byte[] { bb[0], 0x00 }, 0);
Console.WriteLine("Received: " + responseData + "\n");
Console.WriteLine("Received: " + numback + "\n");*/
Program clientObject = new Program(stream,numback,pathsend, pathrecv, seed);
//non loop format - for cppserv
ThreadStart sending = new ThreadStart(clientObject.SendData);
Thread sendThread = new Thread(sending);
sendThread.Start();
ThreadStart receiving = new ThreadStart(clientObject.ReceiveData);
Thread recvThread = new Thread(receiving);
recvThread.Start();
sendThread.Join();
recvThread.Join();
tcpclnt.Close();
}
catch (Exception e)
{
Console.WriteLine("Error...." + e.StackTrace);
}
}
}
}
Вот что Клиент 2 отправляет Клиенту 1 и метку времени.
Sent Sent time (ms)
70 1467720189893.1576
80 1467720189912.1587
60 1467720189926.1595
60 1467720189937.1602
50 1467720189949.1608
60 1467720189959.1614
40 1467720189969.162
100 1467720190006.1641
Вот что клиент 1 получает от клиента 2 и отметки времени.
Received Recv time (ms)
70 1467720190016.1647
80 1467720190063.1674
60 1467720190079.1683
60 1467720190109.17
50 1467720190126.171
60 1467720190137.1716
40 1467720190149.1723
100 1467720190161.173
Как эти устройства подключены? И как вы отправляете данные прямо сейчас? Через TCP/IP? Или UART? –
Чтобы быть ясным, клиент A отправляет сообщение серверу, который передает его клиенту B? И требуется 100 мс для приема B после отправки A? Вы используете TCP/IP? Датаграммы? Трудно сказать, что может происходить без просмотра кода для отправки и получения на клиенте и сервере. –
Задержка может произойти откуда угодно. Что такое сетевая задержка? Вы уверены, что журналы не лгут вам? Вы уверены, что правильно интерпретируете журналы? Можете ли вы разместить их здесь, чтобы мы могли сделать такую же решимость? Как выглядит ваш код? Проводили ли вы сетевые тесты для проверки сети между вашими устройствами? Вы пытались заменить RPi? Я написал приложения на C#, которые говорят о нескольких коммутаторах и достигают субмиллисекундной задержки. Проблема не в вычислительной задержке. – antiduh