2013-06-13 3 views
3

Моя цель - читать сообщения из сокета, где каждое сообщение разделяется символом ETX. Это высокочастотный рынок данных, поэтому я не думаю, что байтовый байтовый подход имеет смысл, а размер полного сообщения неизвестен.Чтение непрерывного сообщения от Socket

Есть ли способ, которым я могу прочитать это сообщение, используя класс NetworkStream? Я также попытался использовать класс Socket для этой цели, но вместо чтения сообщений один за другим из сокета он считывает все сообщения из сокета, и это становится проблемой по мере замедления работы системы.

+0

я добавил пример реализации в новом ответе –

+0

Спасибо за помощь Марк. Ваши ответы были весьма полезными. –

ответ

5

Здесь мы идем; вот базовый процесс для чтения списка сообщений с разделителями-оповещением от источника, такого как Socket или Stream.Сложный бит отслеживает то, что вы использовали в входящем буфере, и любом накопителе неиспользуемых данных из более ранних буферов. Обратите внимание, что изменение этого кода между Socket и Stream существенно меняется Receive до Read - кроме этого подход будет идентичным.

Следующие должны в основном делать то, что вам нужно. Вы можете использовать API ReadNext(), пока не получите null (что означает конец потока), или вы можете использовать ReadAll(), который дает вам последовательность IEnumerable<string>. Кодировка и размер буфера доступны для вас с помощью конструктора, но по умолчанию используются знаковые значения.

foreach (var s in reader.ReadAll()) 
    Console.WriteLine(s); 

код:

class EtxReader : IDisposable 
{ 
    public IEnumerable<string> ReadAll() 
    { 
     string s; 
     while ((s = ReadNext()) != null) yield return s; 
    } 
    public void Dispose() 
    { 
     if (socket != null) socket.Dispose(); 
     socket = null; 
     if (backlog != null) backlog.Dispose(); 
     backlog = null; 
     buffer = null; 
     encoding = null; 
    } 
    public EtxReader(Socket socket, Encoding encoding = null, int bufferSize = 4096) 
    { 
     this.socket = socket; 
     this.encoding = encoding ?? Encoding.UTF8; 
     this.buffer = new byte[bufferSize]; 
    } 
    private Encoding encoding; 
    private Socket socket; 
    int index, count; 
    byte[] buffer; 
    private bool ReadMore() 
    { 
     index = count = 0; 
     int bytes = socket.Receive(buffer); 
     if (bytes > 0) 
     { 
      count = bytes; 
      return true; 
     } 
     return false; 
    } 
    public const byte ETX = 3; 
    private MemoryStream backlog = new MemoryStream(); 
    public string ReadNext() 
    { 
     string s; 
     if (count == 0) 
     { 
      if (!ReadMore()) return null; 
     } 
     // at this point, we expect there to be *some* data; 
     // this may or may not include the ETX terminator 
     var etxIndex = Array.IndexOf(buffer, ETX, index); 
     if (etxIndex >= 0) 
     { 
      // found another message in the existing buffer 
      int len = etxIndex - index; 
      s = encoding.GetString(buffer, index, len); 
      index = etxIndex + 1; 
      count -= (len + 1); 
      return s; 
     } 
     // no ETX in the buffer, so we'll need to fetch more data; 
     // buffer the unconsumed data that we have 
     backlog.SetLength(0); 
     backlog.Write(buffer, index, count); 

     bool haveEtx; 
     do 
     { 
      if (!ReadMore()) 
      { 
       // we had unused data; this must signal an error 
       throw new EndOfStreamException(); 
      } 
      etxIndex = Array.IndexOf(buffer, ETX, index); 
      haveEtx = etxIndex >= 0; 
      if (!haveEtx) 
      { 
       // keep buffering 
       backlog.Write(buffer, index, count); 
      } 

     } while (!haveEtx); 

     // now we have some data in the backlog, and the ETX in the buffer; 
     // for convenience, copy the rest of the next message into 
     // the backlog 
     backlog.Write(buffer, 0, etxIndex); 
     s = encoding.GetString(backlog.GetBuffer(), 0, (int)backlog.Length); 
     index = etxIndex + 1; 
     count -= (etxIndex + 1); 
     return s; 
    } 
} 
+0

Ошибка чего-то) переменная 'count' должна быть передана каждому из' Array.IndexOf 'как последний параметр, в противном случае можно случайно обнаружить записи ETX в данных' buffer', которые были получены поворотами назад и ... crash – Mikant

2

Это, предположительно, текстовый API. Здесь нет никакой практической разницы между использованием NetworkStream против Socket; ни Stream ни Socket не собирается «читать все сообщения» - это только ваш код, который делает это.

В обоих случаях вам потребуется практически идентичную цикл, который извлекает следующий блок данных (который не является синонимом «сообщение»), и начинает искать для значения дозорного (вы имеете в виду ETX?) - обработки или буферизации по мере необходимости. Если вы не знаете, что входящий канал находится в однобайтовой кодировке, вам, вероятно, будет лучше всего рассматривать его как просто байты, пока вы не разделите его на логические сообщения, а только тогда запустите текстовый декодер, чтобы получить текст этого сообщения, прежде чем перейти к следующему.

1

Вы должны изучить асинхронную связь и класс TcpListener. Мой подход был бы:

  1. Создание слушателя
  2. иметь его слушать непрерывно соединений (BeginAccept/EndAccecpt).
  3. Для каждого соединения, прочитайте асинхронно с NetworkStream, пока клиент не отсоединится (BeginRead/EndRead). Вы можете читать фрагменты данных, например, вы можете попробовать прочитать 512 байт одновременно - если в буфере меньше 512 байт, вы получите меньше 512 байт.
  4. иметь все, что приходит в прилагаются к StringBuilder (по одному для каждого соединения, ума правильной кодировки при преобразовании byte[] в string)
  5. Если StringBuilder содержит разделитель, разделить это сообщение от и записать его в очередь (Дон» t забыть заблокировать очередь до enqueueing!)
  6. Имейте отдельный поток, который постоянно отслеживает эту очередь для новых сообщений и обрабатывает их. Вы также можете сигнализировать поток, если вы помещаете что-то новое в очередь, используя, например, ManualResetEvent.

Это всего лишь грубая схема, но я уверен, что вы получили эту идею.

Нет такой вещи, как чтение «сообщений» - все, что приходит через TCP/IP, - это просто поток байтов - вот почему вы получаете сеть stream. Сообщение представляет собой концепцию, которую вы изобретаете для интерпретации поступающих данных.

+2

Однако ни одна из этих явлений не зависит от какой-либо конкретной реализации; то же самое будет применяться с использованием 'Socket',' Stream' или 'TcpListener'. –

+1

Вы также должны быть очень осторожны с тем, как вы переключаетесь между байтами и символами; если вы не используете однобайтную кодировку, может быть катастрофическим, чтобы выполнить декодирование, прежде чем вы узнаете, что у вас есть допустимая строка - у вас может быть один символ, разделенный между несколькими вызовами на «чтение» –

+1

Да, это правда. Вопрос в том, можно ли это сделать с помощью «NetworkStream». И поскольку я поклонник использования API более высокого уровня, я предложил «TcpListener» над классом «Socket». О кодировке: он хочет использовать 'ETX' в качестве разделителя - как это будет кодироваться в кодировке mutlibyte? (вопрос личного интереса) –

Смежные вопросы