2016-06-03 2 views
0

Я изучаю программирование сокетов async и для немного более сложного проекта. Я думал о создании сервера для группового чата. Удалось это сделать, но я не уверен, что производительность достаточно хороша и думаю, что я делаю что-то неправильно.NetworkStream AsyncWrite speed

В принципе, я подключаю 400 пользователей к серверу, а затем отправляю 1000 сообщений (сообщение 1kB, с префиксом длины и остальное пусто) от одного из пользователей. Серверу необходимо транслировать каждое сообщение всем 400 пользователям. На сервере есть список сетевых потоков, и когда сервер получает сообщение, он выполняет итерацию по списку и вызывает метод stream.WriteAsync. Однако, кажется, сервер 40-50ms отправляет это сообщение всем 400 пользователям. Во время теста использование процессора сервера составляет ~ 4%, а использование процессора CentressClient составляет ~ 55%. Я ожидал, что это будет быстрее, чем 40-50 мс. Я что-то делаю неправильно или это максимальная скорость?

Вот код сервера (последние 2 методы являются наиболее актуальными, ReceiveMessageAsync и SendToAllAsync)

private List<NetworkStream> connectedUsers = new List<NetworkStream>(); 
private int processedRequestsAmount = 0; 
private Stopwatch sw = new Stopwatch(); 

public ServerEngine() 
{ 
} 

public void Start(IPAddress ipAddress, int port) 
{ 
    TcpListener listener = new TcpListener(ipAddress, port); 
    try 
    { 
     listener.Start(); 
     AcceptClientsAsync(listener); 

     while (true) 
     { 
      Console.ReadKey(true); 
      Console.WriteLine("Processed requests: " + processedRequestsAmount); 
     } 
    } 
    finally 
    { 
     listener.Stop(); 
     Console.WriteLine("Server stopped! Press ENTER to close application..."); 
     Console.ReadLine(); 
    } 
} 

private async Task AcceptClientsAsync(TcpListener listener) 
{ 
    while (true) 
    { 
     try 
     { 
      TcpClient client = await listener.AcceptTcpClientAsync().ConfigureAwait(false); 
      StartClientListenerAsync(client); 
     } 
     catch (Exception ex) 
     { 
      Console.WriteLine(ex.Message); 
     } 
    } 
} 

private async Task StartClientListenerAsync(TcpClient client) 
{ 
    using (client) 
    { 
     var buf = new byte[1024]; 
     NetworkStream stream = client.GetStream(); 
     lock (connectedUsers) 
     { 
      connectedUsers.Add(stream); 
     } 
     Console.WriteLine(connectedUsers.Count + " users connected!"); 

     while (true) 
     { 
      try 
      { 
       await RecieveMessageAsync(stream, buf).ConfigureAwait(false); 
      } 
      catch (Exception ex) 
      { 
       break; 
      } 
     } 

     connectedUsers.Remove(stream); 
     Console.WriteLine("User disconnected."); 
    } 
} 

private async Task RecieveMessageAsync(NetworkStream stream, byte[] readBuffer) 
{ 
    int totalAmountRead = 0; 

    // read header (length, 2 bytes total) 
    while (totalAmountRead < 2) 
    { 
     totalAmountRead += await stream.ReadAsync(readBuffer, totalAmountRead, 2 - totalAmountRead).ConfigureAwait(false); 
    } 

    short totalLength = BitConverter.ToInt16(readBuffer, 0); 

    // read rest of the message 
    while (totalAmountRead < totalLength) 
    { 
     totalAmountRead += await stream.ReadAsync(readBuffer, totalAmountRead, totalLength - totalAmountRead).ConfigureAwait(false); 
    } 

    await SendToAllAsync(readBuffer, totalLength); 
} 

private async Task SendToAllAsync(byte[] buffer, short totalLength) 
{ 
    List<Task> tasks = new List<Task>(connectedUsers.Count); 
    if (processedRequestsAmount == 0) 
    { 
     sw.Start(); 
    } 

    foreach (NetworkStream stream in connectedUsers) 
    { 
     tasks.Add(stream.WriteAsync(buffer, 0, buffer.Length)); 
    } 

    await Task.WhenAll(tasks).ConfigureAwait(false); 

    processedRequestsAmount++; 
    if (processedRequestsAmount == 1000) 
    { 
     sw.Stop(); 
     Console.WriteLine("Average time for sending 400 messages is {0} ms", sw.Elapsed.TotalMilliseconds/1000.0); 
    } 
} 
+1

Являются ли 400 клиентов на локальном компьютере или через сеть? => 400 * 1 Кбит в 40 мс - 10 Мбайт/с, что означает ~ линейную скорость 100 Мбит Ethernet. – tolanj

+0

Этот вопрос не имеет никакого отношения к async, насколько я могу судить. Кроме того, ваш код сокета выглядит правильно, что является редкой находкой в ​​Stack Overflow. – usr

+0

, если ваши ожидания относительно скорости намного выше, вы знаете об этом: https://msdn.microsoft.com/en-us/library/system.net.sockets.networkstream(v=vs.110).aspx " Примечания => Класс NetworkStream предоставляет методы для отправки и получения данных по сокетам Stream в режиме блокировки. Дополнительные сведения о блокировании и неблокирующих сокетах см. В разделе Использование асинхронного клиентского сокета. => Т.е. асинхронность этой модели не очень глубока. – tolanj

ответ

0

один пункт больше - в SendToAllAsync вы можете подавить захват ExecutionContext

foreach (NetworkStream stream in connectedUsers) 
    { 
     ExecutionContext.SuppressFlow(); 
     tasks.Add(stream.WriteAsync(buffer, 0, buffer.Length)); 
    } 

и прочитайте, пожалуйста, https://msdn.microsoft.com/en-us/magazine/hh456402.aspx

0

Оказывается, когда я запускаю приложения Server и ClientStressTest без отладки (ctrl + F5 в Visual St udio), сервер требует всего 5 мс (загрузка процессора на ~ 30%), чтобы отправить сообщение 400 пользователям, что намного лучше, чем я когда-либо надеялся. Может ли кто-нибудь объяснить мне, почему бы так сильно отложить отладчик?

Во всяком случае, вот остальная часть кода, если кто-то это нужно, чтобы понять это

Program.cs ClientStressTest в

class Program 
{ 
    static int NumOfClients = 400; 
    static int NumOfMessages = 1000; 

    static NetworkStream[] Streams = new NetworkStream[NumOfClients]; 
    static byte[] Message = new byte[1024]; 

    static void Main(string[] args) 
    { 
     Buffer.BlockCopy(BitConverter.GetBytes((short)1024), 0, Message, 0, sizeof(short)); 
     Console.WriteLine("Press ENTER to run setup"); 
     Console.ReadLine(); 

     Setup().Wait(); 


     Console.WriteLine("Press ENTER to start sending"); 
     Console.ReadLine(); 


     NetworkStream sender = Streams[0]; 
     for (int i = 0; i < NumOfMessages; i++) 
     { 
      sender.WriteAsync(Message, 0, 1024); 
     } 

     Console.ReadLine(); 
    } 

    static async Task Setup() 
    { 
     for (int i = 0; i < Streams.Length; i++) 
     { 
      TcpClient tcpClient = new TcpClient(); 
      tcpClient.Connect("localhost", 4000); 
      NetworkStream stream = tcpClient.GetStream(); 
      Streams[i] = stream; 
      Task.Run(() => CallbackListener(stream)); 
     } 
    } 

    static int counter = 0; 
    static object objLock = new object(); 
    static async Task CallbackListener(NetworkStream stream) 
    { 
     var readBuffer = new byte[1024]; 
     int totalAmountRead; 
     short totalLength; 

     while (true) 
     { 
      totalAmountRead = 0; 
      while (totalAmountRead < 2) 
      { 
       totalAmountRead += await stream.ReadAsync(readBuffer, totalAmountRead, 2 - totalAmountRead).ConfigureAwait(false); 
      } 

      totalLength = BitConverter.ToInt16(readBuffer, 0); 

      while (totalAmountRead < totalLength) 
      { 
       totalAmountRead += await stream.ReadAsync(readBuffer, totalAmountRead, totalLength - totalAmountRead).ConfigureAwait(false); 
      } 

      lock(objLock) 
      { 
       counter++; 
       if (counter % 1000 == 0) 
       { 
        // to see progress 
        Console.WriteLine(counter); 
       } 
      } 
      // do nothing 
     } 
    } 
} 

Program.cs Сервера

class Program 
{ 
    static void Main(string[] args) 
    { 
     var server = new ServerEngine(); 
     server.Start(IPAddress.Any, 4000); 
    } 
} 

ServerEngine.cs сервера

public class ServerEngine 
{ 
    private List<NetworkStream> connectedUsers = new List<NetworkStream>(); 
    private int processedRequestsAmount = 0; 
    private Stopwatch sw = new Stopwatch(); 

    public ServerEngine() 
    { 
    } 

    public void Start(IPAddress ipAddress, int port) 
    { 
     TcpListener listener = new TcpListener(ipAddress, port); 
     try 
     { 
      listener.Start(); 
      AcceptClientsAsync(listener); 

      while (true) 
      { 
       Console.ReadKey(true); 
       Console.WriteLine("Processed requests: " + processedRequestsAmount); 
      } 
     } 
     finally 
     { 
      listener.Stop(); 
      Console.WriteLine("Server stopped! Press ENTER to close application..."); 
      Console.ReadLine(); 
     } 
    } 

    private async Task AcceptClientsAsync(TcpListener listener) 
    { 
     while (true) 
     { 
      try 
      { 
       TcpClient client = await listener.AcceptTcpClientAsync().ConfigureAwait(false); 
       StartClientListenerAsync(client); 
      } 
      catch (Exception ex) 
      { 
       Console.WriteLine(ex.Message); 
      } 
     } 
    } 

    private async Task StartClientListenerAsync(TcpClient client) 
    { 
     using (client) 
     { 
      var buf = new byte[1024]; 
      NetworkStream stream = client.GetStream(); 
      lock (connectedUsers) 
      { 
       connectedUsers.Add(stream); 
      } 
      Console.WriteLine(connectedUsers.Count + " users connected!"); 

      while (true) 
      { 
       try 
       { 
        await RecieveMessageAsync(stream, buf).ConfigureAwait(false); 
       } 
       catch (Exception ex) 
       { 
        break; 
       } 
      } 

      connectedUsers.Remove(stream); 
      Console.WriteLine("User disconnected."); 
     } 
    } 

    private async Task RecieveMessageAsync(NetworkStream stream, byte[] readBuffer) 
    { 
     int totalAmountRead = 0; 

     // read header (length, 2 bytes total) 
     while (totalAmountRead < 2) 
     { 
      totalAmountRead += await stream.ReadAsync(readBuffer, totalAmountRead, 2 - totalAmountRead).ConfigureAwait(false); 
     } 

     short totalLength = BitConverter.ToInt16(readBuffer, 0); 

     // read rest of the message 
     while (totalAmountRead < totalLength) 
     { 
      totalAmountRead += await stream.ReadAsync(readBuffer, totalAmountRead, totalLength - totalAmountRead).ConfigureAwait(false); 
     } 

     await SendToAll(readBuffer, totalLength).ConfigureAwait(false); 
    } 

    private async Task SendToAll(byte[] buffer, short totalLength) 
    { 
     List<Task> tasks = new List<Task>(connectedUsers.Count); 
     if (processedRequestsAmount == 0) 
     { 
      sw.Start(); 
     } 

     foreach (NetworkStream stream in connectedUsers) 
     { 
      tasks.Add(stream.WriteAsync(buffer, 0, buffer.Length)); 
     } 

     await Task.WhenAll(tasks).ConfigureAwait(false); 

     processedRequestsAmount++; 
     if (processedRequestsAmount == 1000) 
     { 
      sw.Stop(); 
      Console.WriteLine("Average time for sending 400 messages is {0} ms", sw.Elapsed.TotalMilliseconds/1000.0); 
     } 
    } 
} 
Смежные вопросы