2016-05-15 2 views
1

Балансировщик нагрузки принимает входящие запросы, повторно отправляет их на несколько серверов и возвращает ответы с серверов на ожидающих клиентов.C# Server Load balancer работает только тогда, когда точка останова установлена ​​в VS-отладчике VS?

// Dispatcher.cs 
using System; 
using System.Collections.Generic; 
using System.Net; 
using System.Net.Sockets; 
using System.Threading; 

namespace LoadBallancer { 
    public class Dispatcher 
    { 
     // set the TcpListener on port 8890 
     int port = 8890; 
     TcpListener server; 
     List<CoreComm> processors = new List<CoreComm>(); 

     static void Main() 
     { 
      var dispatcher = new Dispatcher(); 
      dispatcher.ListenForRequests(); 
     } 

     public Dispatcher() 
     { 
      server = new TcpListener(IPAddress.Any, port); 
     } 

     public void ListenForRequests() 
     { 
      server.Start(); 
      while (true) 
      { 
       try 
       { 
        // Start listening for client requests 
        // Enter the listening loop 

        Console.Write("Waiting for a connection... "); 

        lock(server) 
        { 
         // Perform a blocking call to accept requests. 
         TcpClient client = server.AcceptTcpClient(); 

         Console.WriteLine("Connected."); 

         ThreadPool.QueueUserWorkItem(ThreadProc, client); 
        } 
       } 
       catch (Exception e) 
       { 
        Console.WriteLine("Exception: {0}", e); 
       } 
      } 
     } 
     private static void ThreadProc(object obj) 
     { 
      var processor = new CoreComm((TcpClient)obj); 
      processor.ReSendRequest(null); 
     } 
    } 
} 

// CoreComm.cs

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Net.Sockets; 

using System.Configuration; 
using System.Threading; 

namespace LoadBallancer 
{ 
    public class IamServer 
    { 
     public string Url { get; set; } 
     public int  Port { get; set; } 
     public string Type { get; set; } 
    } 

    public class CoreComm 
    { 
     // Buffer for reading data 
     int bufSize = 1024; 
     static List<IamServer> servers = new List<IamServer>(); 

     protected TcpClient acceptorSocket; 
     NetworkStream acceptorStream; 

     protected TcpClient clientSocket; 

     protected List<KeyValuePair<int, byte[]>> requestPackets = new List<KeyValuePair<int, byte[]>>(); 

     static CoreComm() 
     { 
      // reading config for servers' parameters 
     } 

     public CoreComm(TcpClient socket) 
     { 
      acceptorSocket = socket; 
      // Get a stream object for reading and writing 
      acceptorStream = acceptorSocket.GetStream(); 
     } 

     private void ReadFromAcceptorStream() 
     { 
      // Loop to receive all the data sent by the client. 
      while (acceptorStream.DataAvailable) 
      { 
       byte[] requestBuffer = new byte[bufSize]; 
       int i = acceptorStream.Read(requestBuffer, 0, requestBuffer.Length); 
       requestPackets.Add(new KeyValuePair<int, byte[]>(i, requestBuffer)); 
      } 
     } 

     public void ReSendRequest(Object threadContext) 
     { 
      ReadFromAcceptorStream(); 

      var servers = GetDestinationServers(null); 

      if (servers.Count == 0) 
       acceptorStream.Write(ErrMessage, 0, ErrMessage.Length); 
      else 
       // for debug only send the first in the list 
       SendRequestToServer(servers[0]); 

      // Shutdown and end connection 
      acceptorSocket.Close(); 
     } 

     public void SendRequestToServer(IamServer server) 
     { 
      clientSocket = new TcpClient(); 
      clientSocket.Connect(server.Url, server.Port); 
      NetworkStream clientStream = clientSocket.GetStream(); 

      foreach (var packet in requestPackets) 
       clientStream.Write(packet.Value, 0, packet.Key); 

      var requestBuffer = new byte[bufSize]; 

      while (clientStream.DataAvailable) 
      { 
       int i = clientStream.Read(requestBuffer, 0, requestBuffer.Length); 
       acceptorStream.Write(requestBuffer, 0, i); 
      } 

      clientSocket.Close(); 
     } 

     // Mock up of the real load balancing algorithm 
     static int lastServerAnswered = 0; 

     public List<IamServer> GetDestinationServers(string requestData) 
     { 
      // processing to determine the query destinations 
      lock(servers) 
      { 
       // patch 
       int currentServerNum = lastServerAnswered; 
       lastServerAnswered ++ ; 
       if (lastServerAnswered > servers.Count - 1) 
        lastServerAnswered = 0; 

       return new List<IamServer> { servers[currentServerNum] }; 
      } 
     } 

    } 
} 

Так работает правильно, когда я установил сломаться точку в коде и не работает в противном случае. Любые идеи?

ответ

0

Проблема оказалась в коде:

while (clientStream.DataAvailable) 
{ 
     int i = clientStream.Read(requestBuffer, 0, requestBuffer.Length); 
     acceptorStream.Write(requestBuffer, 0, i); 
} 

На самом деле это случилось, что для некоторых пакетов clientStream.DataAvailable было ложным, даже если еще оставшиеся данные, которые будут получены. Решение основано на протоколе прикладного уровня, для которого была разработана Балансировщику нагрузки, который посылает в первые 4 байта потока число байтов общего, которые sent.The код становится следующим:

var responseBuffer = new byte[bufSize]; 

int numTotalBytesStreamed = clientStream.Read(responseBuffer, 0, responseBuffer.Length); 
int numBytesToStream = GetNumBytesInTheStream(responseBuffer); 

acceptorStream.Write(responseBuffer, 0, numTotalBytesStreamed); 

while (numBytesToStream > numTotalBytesStreamed) 
{ 
    while (!clientStream.DataAvailable) 
     Thread.Sleep(1); 

     int numMoreBytesStreamed = clientStream.Read(responseBuffer, 0, responseBuffer.Length); 
     acceptorStream.Write(responseBuffer, 0, numMoreBytesStreamed); 
     numTotalBytesStreamed += numMoreBytesStreamed; 
} 
acceptorStream.Flush(); 
clientSocket.Close(); 

Решение работает и чрезвычайно стабильно для непрерывных нагрузок сотен запросов в секунду.

Смежные вопросы