2013-03-12 2 views
2

Недавно мне пришлось создать небольшое клиентское приложение TCP, которое подключается к прослушивателю TCP-приложения внешнего приложения и предназначено для работы с большим объемом томов данных и на высоких частотах ,C#/TCP Sockets/Blocking Read/Как закрыть такие потоки

Я создал класс оболочки вокруг класса TCPClient, чтобы просто перехватывать исключения и ссылаться на некоторые интересующие свойства (сетевой поток и т. Д.). Вот обертка:

public class MyTCPClient 
    { 

     private string serverIP; 
     private int serverPort; 

     public TcpClient tcpClient = new TcpClient(); 
     private IPEndPoint serverEndPoint; 
     private NetworkStream stream = null; 

     public string name; 

     public MyTCPClient(string serverIp, int serverPort, string parentName) 
     { 
      this.serverIP = serverIp; 
      this.serverPort = serverPort; 
      this.name = parentName + "_TCPClient"; 

      serverEndPoint = new IPEndPoint(IPAddress.Parse(serverIP), serverPort); 

      tcpClient.ReceiveBufferSize = 1048576; 

      this.TryConnect(); 
     } 

     private bool TryConnect() 
     { 
      try 
      { 
       tcpClient.Connect(serverEndPoint); 
      } 
      catch (SocketException e1) 
      { 
       throw new ErrorOnConnectingException(e1, "SocketException while connecting. (see msdn Remarks section for more details.) Error code: " + e1.ErrorCode); 
      } 
      catch (ArgumentNullException e2) 
      { 
       throw new ErrorOnConnectingException(e2, "ArgumentNullException while connecting. (The hostname parameter is null.) Message: " + e2.Message); 
      } 
      catch (ArgumentOutOfRangeException e3) 
      { 
       throw new ErrorOnConnectingException(e3, "ArgumentOutOfRangeException while connecting (The port parameter is not between MinPort and MaxPort.). Message: " + e3.Message); 
      } 
      catch (ObjectDisposedException e4) 
      { 
       throw new ErrorOnConnectingException(e4, "ObjectDisposedException while connecting. (TcpClient is closed.) Message: " + e4.Message); 
      } 


      try 
      { 
       stream = this.tcpClient.GetStream(); 
      } 
      catch (ObjectDisposedException e1) 
      { 
       throw new ErrorOnGettingStreamException(e1, "ObjectDisposedException while acquiring Network stream. (The TcpClient has been closed.) Message: " + e1.Message); 
      } 
      catch (InvalidOperationException e2) 
      { 
       throw new ErrorOnGettingStreamException(e2, "ArgumentOutOfRangeException while acquiring Network stream (The TcpClient is not connected to a remote host. ). Message: " + e2.Message); 
      } 

      return true; 
     } 

     public string ReadData() 
     { 
      try 
      { 
       ASCIIEncoding encoder = new ASCIIEncoding(); 

       byte[] dataHeader = new byte[12]; 
       if (this.tcpClient.Connected) 
       { 
        stream.Read(dataHeader, 0, 12); 
       } 
       else 
       { 
        throw new ErrorOnReadingException(null, "The underlying TCP tcpClient is not connected any more"); 
       } 

       var strHeaderMessage = System.Text.Encoding.Default.GetString(dataHeader); 

       Utils.logToTimeStampedFile(strHeaderMessage, name); 

       int bodyAndTailCount = Convert.ToInt32(strHeaderMessage.Replace("#", "")); 
       byte[] dataBodyAndTail = new byte[bodyAndTailCount]; 

       if (this.tcpClient.Connected) 
       { 
        stream.Read(dataBodyAndTail, 0, bodyAndTailCount); 
       } 
       else 
       { 
        throw new ErrorOnReadingException(null, "The underlying TCP tcpClient is not connected any more"); 
       } 

       var strBodyAndTailMessage = System.Text.Encoding.Default.GetString(dataBodyAndTail); 

       Utils.logToTimeStampedFile(strBodyAndTailMessage, name); 

       return strBodyAndTailMessage; 

      } 
      catch (FormatException e0) 
      { 
       CloseAllLeft(); 
       throw new ErrorOnReadingException(e0, "FormatException while reading data. (Bytes red are null or does not correspond to specification, happens on closing Server) Message: " + e0.Message); 
      } 
      catch (ArgumentNullException e1) 
      { 
       CloseAllLeft(); 
       throw new ErrorOnReadingException(e1, "ArgumentNullException while reading data. (The buffer parameter is null.) Message: " + e1.Message); 
      } 
      catch (ArgumentOutOfRangeException e2) 
      { 
       CloseAllLeft(); 
       throw new ErrorOnReadingException(e2, "ArgumentOutOfRangeException while reading data. (see msdn description) Message: " + e2.Message); 
      } 
      catch (IOException e3) 
      { 
       CloseAllLeft(); 
       throw new ErrorOnReadingException(e3, "IOException while reading data. (The underlying Socket is closed.) Message: " + e3.Message); 
      } 
      catch (ObjectDisposedException e4) 
      { 
       CloseAllLeft(); 
       throw new ErrorOnReadingException(e4, "ArgumentOutOfRangeException while reading data. (see msdn description) Message: " + e4.Message); 
      } 
     } 

     public void CloseAllLeft() 
     { 
      try 
      { 
       stream.Close(); 
      } 
      catch (Exception e) 
      { 
       Console.WriteLine("Exception closing tcp network stream: " + e.Message); 
      } 
      try 
      { 
       tcpClient.Close(); 
      } 
      catch (Exception e) 
      { 
       Console.WriteLine("Exception closing tcpClient: " + e.Message); 
      } 
     } 
    } 

По-прежнему ничего не говорится о потоках с использованием этого MyTCPClient. Приложение должно иметь два таких клиента TCP, подключение на разных портах и ​​выполнение разных заданий. Я был новичком в программировании TCP, и после некоторого блуждания по свойствам я решил использовать подход с блокировкой чтения, т. Е. По умолчанию метод TCPClient.Read() будет блокировать поток до тех пор, пока не появятся новые данные. Мне нужен такой подход, потому что у меня нет контроля над слушателем внешнего приложения, и единственным способом распознать закрытие сервера были «нулевые байты», отправленные в соответствии со спецификациями TCP Sockets.

Итак, я создаю абстрактный класс, который будет поддерживать и контролировать потоки, которые впоследствии будут использовать вышеприведенный класс MyTCPClient (который по дизайну в конечном итоге может блокировать родительские теги). Вот код для моего абстрактного TCPManager:

/// <summary> 
    /// Serves as a dispatcher for the high frequency readings from the TCP pipe. 
    /// Each time the thread is started it initializes new TCPClients which will attempt to connect to server. 
    /// Once established a TCP socket connection is alive until the thread is not requested to stop. 
    /// 
    /// Error hanling level here: 
    /// 
    /// Resources lke NetworkStream and TCPClients are ensured to be closed already within the myTCPClient class, and the error handling here 
    /// is steps on top of that - sending proper emails, notifications and logging. 
    /// 
    /// </summary> 
    public abstract class AbstractmyTCPClientManager 
    { 

     public string name; 
     public string serverIP; 
     public int serverPort; 

     public Boolean requestStop = false; 
     public Boolean MyTCPClientThreadRunning = false; 
     public Boolean requestStart = false; 

     public myTCPClient myTCPClient; 

     public int sleepInterval; 

     public Thread MyTCPClientThread; 

     public AbstractmyTCPClientManager(string name, string serverIP, int serverPort) 
     { 
      this.name = name; 
      this.serverIP = serverIP; 
      this.serverPort = serverPort; 
     } 

     public void ThreadRun() 
     { 
      MyTCPClientThreadRunning = false; 
      bool TCPSocketConnected = false; 
      bool AdditionalInitializationOK = false; 

      // keep trying to init requested tcp clients 
      while (!MyTCPClientThreadRunning && !requestStop) // and we are not suggested to stop 
      { 
       while (!TCPSocketConnected && !requestStop) // and we are not suggested to stop) 
       { 
        try 
        { 
         myTCPClient = new myTCPClient(serverIP, serverPort, name); 

         TCPSocketConnected = true; 
        } 
        catch (ErrorOnConnectingException e0) 
        { 

         // nah, too long message 
         string detail = e0.originalException != null ? e0.originalException.Message : "No inner exception"; 
         //Utils.logToTimeStampedFile("Creating connection attempt failed.(1." + e0.customMessage + " 2." + detail + "). Will retry in 10 seconds...", name); 
         //Utils.logToTimeStampedFile(e0.customMessage + " (" + detail + "). Will retry in 10 seconds...", name); 
         Utils.logToTimeStampedFile(detail + ". Will retry in 10 seconds...", name); 

         Thread.Sleep(10000); 
        } 
        catch (ErrorOnGettingStreamException e1) 
        { 
         // nah, too long message 
         string detail = e1.originalException != null ? e1.originalException.Message : "No inner exception"; 
         //Utils.logToTimeStampedFile("Getting network stream attempt failed. (1." + e1.customMessage + " 2." + detail + "). Will retry in 10 seconds...", name); 
         //Utils.logToTimeStampedFile(e1.customMessage + " (" + detail + "). Will retry in 10 seconds...", name); 

         Utils.logToTimeStampedFile(detail + ". Will retry in 10 seconds...", name); 
         Thread.Sleep(10000); 
        } 
       } 
       Utils.logToTimeStampedFile("TCP Communication established", name); 

       while (!AdditionalInitializationOK && !requestStop) // or we are not suggested to stop 
       { 
        try 
        { 
         AdditionalInitialization(); 

         AdditionalInitializationOK = true; 

        } 
        catch (AdditionalInitializationException e1) 
        { 
         string detail = e1.originalException != null ? e1.originalException.Message : "No inner exception"; 

         //Utils.logToTimeStampedFile("Additional initialization failed (1." + e1.customMessage + " 2." + detail + "). Will retry in 10 seconds", name); 

         Utils.logToTimeStampedFile(e1.customMessage + ". Will retry in 10 seconds", name); 
         Thread.Sleep(10000); 
        } 
       } 

       MyTCPClientThreadRunning = TCPSocketConnected && AdditionalInitializationOK; 
       ViewModelLocator.ControlTabStatic.updateUIButtons(); 
      } 
      Utils.logToTimeStampedFile("Additional Initialization successfully completed, thread started", name); 

      // while all normal (i.e nobody request a stop) continiously sync with server (read data) 
      while (!requestStop) 
      { 
       try 
       { 
        syncWithInterface(); 
       } 
       catch (ErrorOnReadingException e1) 
       { 
        string detail = e1.originalException != null ? e1.originalException.Message : "No inner exception"; 

        //Utils.logToTimeStampedFile("Error ocured while reading data. (1." + e1.customMessage + " 2." + detail + ")", name); 
        Utils.logToTimeStampedFile(e1.customMessage, name); 

        if (!requestStop) // i.e if this indeed is an exception, during a normal flow, and nobody requested a thread stop (which migh cause read exceptions as a consequence) 
        { 
         Utils.logToTimeStampedFile("There was no external stop request, when the error occured, doing tcp client restart.", name); 
         requestStop = true; 
         requestStart = true; 
        } 
       } 

       Thread.Sleep(sleepInterval); 
      } 

      // we need to close all after execution, but the execution may be closed before/while resources were still initializing 
      if (TCPSocketConnected) 
      { 
       myTCPClient.CloseAllLeft(); 
      } 
      if (AdditionalInitializationOK) 
      { 
       ReleaseAdditionalResources(); 
      } 

      // remember that thread is stoped 
      MyTCPClientThreadRunning = false; 
      Utils.logToTimeStampedFile("Thread stoped", name); 
      ViewModelLocator.ControlTabStatic.updateUIButtons(); 

      // this serves as a restart 
      if (requestStart) 
      { 
       Utils.logToTimeStampedFile("Restarting thread...", name); 
       this.requestStop = false; 
       this.requestStart = false; // we are already processing a request start event, so reset this flag 

       this.MyTCPClientThread = new Thread(new ThreadStart(this.ThreadRun)); 
       this.MyTCPClientThread.Name = this.name; 
       this.MyTCPClientThread.IsBackground = true; 
       this.MyTCPClientThread.Start(); 
      } 
     } 

     /// <summary> 
     /// this method empties the entire TCP buffer, cycling through it 
     /// </summary> 
     private void syncWithInterface() 
     { 
      int counter = 0; 
      // read at most 100 messages at once (we assume that for 3 sec interval there might not be more, 
      //even if they are, it is still OK, they just will be processed next time) 
      while (counter < 100) 
      { 
       counter++; 
       string data = myTCPClient.ReadData(); 
       ForwardData(data); 
      } 

      // below is left for testing: 
      /* 
      * "Sleep(0) or Yield is occasionally useful in production code for 
      * advanced performance tweaks. It’s also an excellent diagnostic tool 
      * for helping to uncover thread safety issues: if inserting Thread.Yield() 
      * anywhere in your code makes or breaks the program, you almost certainly have a bug."*/ 
      Thread.Yield(); 
     } 

     /// <summary> 
     /// Left for implementing in the caller that initialized the object. Meaning: one and the same way for receiving market/order data. Different ways of processing this data 
     /// </summary> 
     /// <param name="data"></param> 
     public abstract void ForwardData(string data); 

     /// <summary> 
     /// left for implementing in child classes. Its purpose is to initialize any additional resources needed for the thread to operate. 
     /// If something goes wrong while getting this additional resources, 
     /// an AdditionalInitialization exception should be thrown, which is than handled from the initialization phase in the caller. 
     /// </summary> 
     public abstract void AdditionalInitialization(); 

     // countrapart of AdditionalInitialization method - what is initialized should be then closed 
     public abstract void ReleaseAdditionalResources(); 
    } 

Позже, каждый нужно канал TCP связи будет иметь специальную реализацию для выше абстрактного класса, обеспечивающей реализацию методов ForwardData (т.е. что делать с этими данными) и Дополнительная инициализация (то есть, что еще нужно инициализировать до того, как будет запущена конкретная обработка связи TCP. Например, по моим потокам требуется дополнительное хранилище Thread, который будет инициализирован перед получением данных).

Все было хорошо, за исключением закрытия TCP-обработки. У меня были переменные requestStop для управления, когда поток должен выйти или продолжить, но дело в том, что метод Read() может падать в непрерывной блокировке, предотвращая чтение переменной requestStop (я должен сказать, что для двух каналов tcp мне нужно процесс очень отличается тем, что один из них очень часто получает данные, а другой - спорадически). Я бы хотел, чтобы они реализовали один и тот же дизайн. Поэтому из того, что я читаю до сих пор, мне нужно реализовать другой поток «parent» или «control» или «wrapper», который фактически выполнит работу по наблюдению параметра requestStop.

Ищу сторону решения, как этот post или таймеры, как этот post

Любые предложения будут высоко оценены. Благодаря!

ответ

2

Я бы порекомендовал звонить по номеру ReadAsync method of NetworkStream и передал уведомление от CancerationToken. Таким образом, операция чтения может быть легко отменено (из другого потока), когда наблюдается остановка по требованию событие:

public class MyTCPClient : IDisposable 
{ 
    ... 
    private CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); 
    ... 

    public string ReadData() 
    { 
    ... 
    byte[] dataHeader = new byte[12]; 
    if (this.tcpClient.Connected) 
    { 
     stream.ReadAsync(dataHeader, 0, 12, cancellationTokenSource.Token).Wait(); 
    } ... 
+0

Большое спасибо, я видел эти методы, доступные только в .Net 4.5 (Извините, забыл упомянуть, что я с выигрышем 7, .Net 4, а в целом - я немного новичок в программировании на C#). Но эти настройки можно контролировать в среде prod, поэтому определенно лучше работать с .Net 4.5. Я сейчас устанавливаю/читаю и т. Д. Я думаю, что у меня появятся дополнительные вопросы по этому объекту CancellationTokenSource, но сначала попробую прочитать/код, а затем вернется обратно. – akrsmv

+0

Хорошо, другой вопрос возник. Просто хочу упомянуть - я все еще оглядываюсь и читаю, потому что оказалось, что использовать .Net 4.5 я бы тоже получил лицензию для Visual studio 2012. – akrsmv

+0

Как (кому) гарантировано, что я получу все сообщения в заказе они пришли. Вероятно, это совсем не так, но я представляю ситуацию, когда начинается один асинхронный вызов, читает одно сообщение и по какой-то причине следующий асинхронный вызов, который начался, заканчивается до первого. (сообщения должны быть проанализированы, сопоставлены с некоторыми номенклатурами от БД и т. д.). В конце у меня есть очередь, содержащая объекты, построенные на основе байтов в сообщении tcp, и ее решающее значение для моего приложения они должны быть в одном порядке. Метод Wait() вызван в объект задачи, возвращаемый формой ReadAsync? – akrsmv

2

Я лично использую асинхронные сокеты для этого: http://msdn.microsoft.com/en-us/library/bbx2eya8.aspx

Если, однако, по-прежнему хотите использовать блокирование чтения, вы один может быть просто закрыть() сокет из другого потока.

Я надеюсь, что эта помощь.

0

Установите логическое значение «requestStop» и закройте сокет клиента из другого потока. Это приводит к тому, что вызов read() возвращает «раньше» с ошибкой/исключением.Клиентский поток может проверять «requestStop» после каждого возврата read() и очистки/выхода по запросу.

TBH, я редко бываю в явном отключении таких клиентов. Я просто оставлю их, пока приложение не выйдет.

+0

Ох - если этот ответ полезен, пожалуйста, воздержитесь от @brain - он предложил то же самое раньше, но я пропустил его. –

Смежные вопросы