2015-12-27 2 views
5

Я пытаюсь создать полнодуплексную схему связи клиент-сервер на двух разных машинах (только), где каждая конечная точка (клиент или сервер) может отправлять информацию в любое время , асинхронно (неблокирующий канал), а другой конец подберет его и прочитает.C# Полные дуплексные асинхронные именованные каналы .NET

Я не хочу, чтобы ответы касались меня какой-либо другой технологией, кроме названных каналов, я знаю о других технологиях, но я хочу получить ответ на этот конкретный вопрос. (Я видел, как этот вопрос неоднократно обсуждался на разных форумах, и я продолжаю видеть ответы, советующие использовать некоторые другие технологии. Я думаю, что это граничит с грубой?)

Я читал, что Именованные каналы должны быть одним -определение или они запираются, но я предполагаю, что это, вероятно, неправильно. Я думаю, что каналы основаны на сокетах, и я не могу себе представить, что базовый сокет будет односторонним.

Любые ответы на этот вопрос нужно решать эти вопросы для того, чтобы быть действительно полезным:

  1. Ответы должны решать асинхронные трубы, я не могу использовать синхронное решение.
  2. ответы должны быть продемонстрированы или допущены к тому, что трубы остаются ОТКРЫТЫ. Я устал читать примеры, когда труба открыта, строка передается, затем труба немедленно закрывается. Я бы хотел получить ответ, предполагающий, что трубы остаются открытыми и передают много мусора в случайные моменты времени и продолжают повторяться. без зависаний.
  3. C# основанного решение

Я сожалею звучать требовательными и сопливыми, но после нескольких дней чистящего Интернета я до сих пор не нашел хороший пример, и я не хочу использовать ВЛК. Если вы знаете подробности этого ответа и ответьте на него хорошо, эта тема станет реальным победителем на долгие годы, я уверен. Я отправлю ответ сам, если я это выясню.

Если вы собираетесь писать и говорить «Вам нужно использовать две трубы», пожалуйста, объясните , почему, и как вы знаете, это правда, потому что ничего, что я читал об этом, объясняет, почему это так ,

спасибо!

+0

Почему вы не хотите использовать WCF с привязкой названных труб? Вы видели такие решения, как http://stackoverflow.com/questions/16432813/async-two-way-communication-with-windows-named-pipes-net – admax

+0

Эй, почему так зол, я просто спросил ** почему ** вы не хотите wcf. И это был комментарий, а не предлагаемое решение. Если вы знаете о некоторых недостатках wcf, пожалуйста, поделитесь ими, это может быть полезно сообществу. – admax

+1

Не сердитый, просто конкретный. Причина в том, что я стараюсь избегать технологий, которые считаются «устаревшими», даже если они все еще используются, особенно если они не добавляют много добавочного значения для доллара. WCF не добавляет много поверх Named Pipes + JSON, поэтому я бы предпочел не использовать его. Мне нравится оставаться «минимальным». Я только передаю несколько байтов назад и вперед, чтобы поддерживать распределенную систему, все, что мне нужно, это именованный канал! –

ответ

0

Я думаю, что когда вы используете асинхронную связь, вам нужно использовать две трубы.

Один RECV труба другой, посылают трубу

Потому что, Вы не знаете, когда вы ПРИЕМ данные.

Когда вы отправляете данные с одним каналом, данные recv не могут записываться на трубку.

Напротив, вы не можете писать данные отправки на трубе.

поэтому у вас есть две трубы для асинхронной связи.

+1

Трубы имеют сокет. любой нормальный сокет unix и отправлять и получать на обоих концах, если он открывается таким образом. Я думаю, что давно трубы Unix были односторонними, но это давно прошлое. Трюк с использованием полнодуплексных труб в окнах - это убедиться, что порядок действий ведется, а затем труба не путается. Если вы используете две трубы, я уверен, что это упрощает, но вы просто тратите ресурсы и порты. –

+0

oh..I смотри. Я этого не знал. спасибо за ваши знания .. Мне нужно учиться больше. – lucidmaj7

6

Вам не нужно использовать две трубы. Я нашел много ответов в сети, в которых говорится, что вам нужно использовать две трубы. Я рылся вокруг, не спал всю ночь, пытался и пытался снова, и понял, как это сделать, это очень просто, но вы должны все исправить (особенно получить вещи в правильном порядке), или это просто не сработает , Другой трюк заключается в том, чтобы всегда гарантировать, что у вас есть вызов для чтения, или он закроется. Не пишите, прежде чем вы знаете, что кто-то читает. Не начинайте чтение, если вы сначала не настроили событие. Такого рода вещи.

вот класс труб, который я использую. Вероятно, он недостаточно прочен, чтобы справляться с ошибками, замыканиями и переполнениями труб.

Хорошо, я понятия не имею, что здесь не так, но форматирование немного отключено! VVVV

namespace Squall 
{ 
    public interface PipeSender 
    { 
     Task SendCommandAsync(PipeCommandPlusString pCmd); 
    } 

    /****************************************************************************** 
    * 
    * 
    * 
    * 
    ******************************************************************************/ 
    public class ClientPipe : BasicPipe 
    { 
     NamedPipeClientStream m_pPipe; 

     public ClientPipe(string szServerName, string szPipeName) 
      : base("Client") 
     { 
      m_szPipeName = szPipeName; // debugging 
      m_pPipe = new NamedPipeClientStream(szServerName, szPipeName, PipeDirection.InOut, PipeOptions.Asynchronous); 
      base.SetPipeStream(m_pPipe); // inform base class what to read/write from 
     } 

     public void Connect() 
     { 
      Debug.WriteLine("Pipe " + FullPipeNameDebug() + " connecting to server"); 
      m_pPipe.Connect(); // doesn't seem to be an async method for this routine. just a timeout. 
      StartReadingAsync(); 
     } 

     // the client's pipe index is always 0 
     internal override int PipeId() { return 0; } 
    } 

    /****************************************************************************** 
    * 
    * 
    * 
    * 
    ******************************************************************************/ 
    public class ServerPipe : BasicPipe 
    { 
     public event EventHandler<EventArgs> GotConnectionEvent; 

     NamedPipeServerStream m_pPipe; 
     int m_nPipeId; 

     public ServerPipe(string szPipeName, int nPipeId) 
      : base("Server") 
     { 
      m_szPipeName = szPipeName; 
      m_nPipeId = nPipeId; 
      m_pPipe = new NamedPipeServerStream(
       szPipeName, 
       PipeDirection.InOut, 
       NamedPipeServerStream.MaxAllowedServerInstances, 
       PipeTransmissionMode.Message, 
       PipeOptions.Asynchronous); 
      base.SetPipeStream(m_pPipe); 
      m_pPipe.BeginWaitForConnection(new AsyncCallback(StaticGotPipeConnection), this); 
     } 

     static void StaticGotPipeConnection(IAsyncResult pAsyncResult) 
     { 
      ServerPipe pThis = pAsyncResult.AsyncState as ServerPipe; 
      pThis.GotPipeConnection(pAsyncResult); 
     } 

     void GotPipeConnection(IAsyncResult pAsyncResult) 
     { 
      m_pPipe.EndWaitForConnection(pAsyncResult); 

      Debug.WriteLine("Server Pipe " + m_szPipeName + " got a connection"); 

      if (GotConnectionEvent != null) 
      { 
       GotConnectionEvent(this, new EventArgs()); 
      } 

      // lodge the first read request to get us going 
      // 
      StartReadingAsync(); 
     } 

     internal override int PipeId() { return m_nPipeId; } 
    } 

    /****************************************************************************** 
    * 
    * 
    * 
    * 
    ******************************************************************************/ 

    public abstract class BasicPipe : PipeSender 
    { 
     public static int MaxLen = 1024 * 1024; // why not 
     protected string m_szPipeName; 
     protected string m_szDebugPipeName; 

     public event EventHandler<PipeEventArgs> ReadDataEvent; 
     public event EventHandler<EventArgs> PipeClosedEvent; 

     protected byte[] m_pPipeBuffer = new byte[BasicPipe.MaxLen]; 

     PipeStream m_pPipeStream; 

     public BasicPipe(string szDebugPipeName) 
     { 
      m_szDebugPipeName = szDebugPipeName; 
     } 

     protected void SetPipeStream(PipeStream p) 
     { 
      m_pPipeStream = p; 
     } 

     protected string FullPipeNameDebug() 
     { 
      return m_szDebugPipeName + "-" + m_szPipeName; 
     } 

     internal abstract int PipeId(); 

     public void Close() 
     { 
      m_pPipeStream.WaitForPipeDrain(); 
      m_pPipeStream.Close(); 
      m_pPipeStream.Dispose(); 
      m_pPipeStream = null; 
     } 

     // called when Server pipe gets a connection, or when Client pipe is created 
     public void StartReadingAsync() 
     { 
      Debug.WriteLine("Pipe " + FullPipeNameDebug() + " calling ReadAsync"); 

      // okay we're connected, now immediately listen for incoming buffers 
      // 
      byte[] pBuffer = new byte[MaxLen]; 
      m_pPipeStream.ReadAsync(pBuffer, 0, MaxLen).ContinueWith(t => 
      { 
       Debug.WriteLine("Pipe " + FullPipeNameDebug() + " finished a read request"); 

       int ReadLen = t.Result; 
       if (ReadLen == 0) 
       { 
        Debug.WriteLine("Got a null read length, remote pipe was closed"); 
        if (PipeClosedEvent != null) 
        { 
         PipeClosedEvent(this, new EventArgs()); 
        } 
        return; 
       } 

       if (ReadDataEvent != null) 
       { 
        ReadDataEvent(this, new PipeEventArgs(pBuffer, ReadLen)); 
       } 
       else 
       { 
        Debug.Assert(false, "something happened"); 
       } 

       // lodge ANOTHER read request 
       // 
       StartReadingAsync(); 

      }); 
     } 

     protected Task WriteByteArray(byte[] pBytes) 
     { 
      // this will start writing, but does it copy the memory before returning? 
      return m_pPipeStream.WriteAsync(pBytes, 0, pBytes.Length); 
     } 

     public Task SendCommandAsync(PipeCommandPlusString pCmd) 
     { 
      Debug.WriteLine("Pipe " + FullPipeNameDebug() + ", writing " + pCmd.GetCommand() + "-" + pCmd.GetTransmittedString()); 
      string szSerializedCmd = JsonConvert.SerializeObject(pCmd); 
      byte[] pSerializedCmd = Misc.StringToBytes(szSerializedCmd); 
      Task t = WriteByteArray(pSerializedCmd); 
      return t; 
     } 
    } 

    /****************************************************************************** 
    * 
    * 
    * 
    * 
    ******************************************************************************/ 

    public class PipeEventArgs 
    { 
     public byte[] m_pData; 
     public int m_nDataLen; 

     public PipeEventArgs(byte[] pData, int nDataLen) 
     { 
      // is this a copy, or an alias copy? I can't remember right now. 
      m_pData = pData; 
      m_nDataLen = nDataLen; 
     } 
    } 

    /****************************************************************************** 
    * if we're just going to send a string back and forth, then we can use this 
    * class. It it allows us to get the bytes as a string. sort of silly. 
    ******************************************************************************/ 

    [Serializable] 
    public class PipeCommandPlusString 
    { 
     public string m_szCommand; // must be public to be serialized 
     public string m_szString; // ditto 

     public PipeCommandPlusString(string sz, string szString) 
     { 
      m_szCommand = sz; 
      m_szString = szString; 
     } 

     public string GetCommand() 
     { 
      return m_szCommand; 
     } 

     public string GetTransmittedString() 
     { 
      return m_szString; 
     } 
    } 
} 

и вот мой тест трубы, работающие на одном процессе. Он работает на двух процессов тоже, я проверил

namespace NamedPipeTest 
{ 
    public partial class Form1 : Form 
    { 
     SynchronizationContext _context; 
     Thread m_pThread = null; 
     volatile bool m_bDieThreadDie; 
     ServerPipe m_pServerPipe; 
     ClientPipe m_pClientPipe; 

     public Form1() 
     { 
      InitializeComponent(); 
     } 

     private void Form1_Load(object sender, EventArgs e) 
     { 
      _context = SynchronizationContext.Current; 

      m_pServerPipe = new ServerPipe("SQUALL_PIPE", 0); 
      m_pServerPipe.ReadDataEvent += M_pServerPipe_ReadDataEvent; 
      m_pServerPipe.PipeClosedEvent += M_pServerPipe_PipeClosedEvent; 

      // m_pThread = new Thread(StaticThreadProc); 
      // m_pThread.Start(this); 
     } 

     private void M_pServerPipe_PipeClosedEvent(object sender, EventArgs e) 
     { 
      Debug.WriteLine("Server: Pipe was closed, shutting down"); 

      // have to post this on the main thread 
      _context.Post(delegate 
      { 
       Close(); 
      }, null); 
     } 

     private void M_pServerPipe_ReadDataEvent(object sender, PipeEventArgs e) 
     { 
      // this gets called on an anonymous thread 

      byte[] pBytes = e.m_pData; 
      string szBytes = Misc.BytesToString(pBytes, e.m_pData.Length); 
      PipeCommandPlusString pCmd = JsonConvert.DeserializeObject<PipeCommandPlusString>(szBytes); 
      string szValue = pCmd.GetTransmittedString(); 

      if (szValue == "CONNECT") 
      { 
       Debug.WriteLine("Got command from client: " + pCmd.GetCommand() + "-" + pCmd.GetTransmittedString() + ", writing command back to client"); 
       PipeCommandPlusString pCmdToSend = new PipeCommandPlusString("SERVER", "CONNECTED"); 
       // fire off an async write 
       Task t = m_pServerPipe.SendCommandAsync(pCmdToSend); 
      } 
     } 

     static void StaticThreadProc(Object o) 
     { 
      Form1 pThis = o as Form1; 
      pThis.ThreadProc(); 
     } 

     void ThreadProc() 
     { 
      m_pClientPipe = new ClientPipe(".", "SQUALL_PIPE"); 
      m_pClientPipe.ReadDataEvent += PClientPipe_ReadDataEvent; 
      m_pClientPipe.PipeClosedEvent += M_pClientPipe_PipeClosedEvent; 
      m_pClientPipe.Connect(); 

      PipeCommandPlusString pCmd = new PipeCommandPlusString("CLIENT", "CONNECT"); 
      int Counter = 1; 
      while (Counter++ < 10) 
      { 
       Debug.WriteLine("Counter = " + Counter); 
       m_pClientPipe.SendCommandAsync(pCmd); 
       Thread.Sleep(3000); 
      } 

      while (!m_bDieThreadDie) 
      { 
       Thread.Sleep(1000); 
      } 

      m_pClientPipe.ReadDataEvent -= PClientPipe_ReadDataEvent; 
      m_pClientPipe.PipeClosedEvent -= M_pClientPipe_PipeClosedEvent; 
      m_pClientPipe.Close(); 
      m_pClientPipe = null; 
     } 

     private void M_pClientPipe_PipeClosedEvent(object sender, EventArgs e) 
     { 
      // wait around for server to shut us down 
     } 

     private void PClientPipe_ReadDataEvent(object sender, PipeEventArgs e) 
     { 
      byte[] pBytes = e.m_pData; 
      string szBytes = Misc.BytesToString(pBytes, e.m_nDataLen); 
      PipeCommandPlusString pCmd = JsonConvert.DeserializeObject<PipeCommandPlusString>(szBytes); 
      string szValue = pCmd.GetTransmittedString(); 

      Debug.WriteLine("Got command from server: " + pCmd.GetCommand() + "-" + pCmd.GetTransmittedString()); 

      if (szValue == "CONNECTED") 
      { 
       PipeCommandPlusString pCmdToSend = new PipeCommandPlusString("CLIENT", "DATA"); 
       m_pClientPipe.SendCommandAsync(pCmdToSend); 
      } 
     } 

     private void Form1_FormClosing(object sender, FormClosingEventArgs e) 
     { 
      if (m_pThread != null) 
      { 
       m_bDieThreadDie = true; 
       m_pThread.Join(); 
       m_bDieThreadDie = false; 
      } 

      m_pServerPipe.ReadDataEvent -= M_pServerPipe_ReadDataEvent; 
      m_pServerPipe.PipeClosedEvent -= M_pServerPipe_PipeClosedEvent; 
      m_pServerPipe.Close(); 
      m_pServerPipe = null; 

     } 
    } 
} 
0

Просто создайте трубу, как перекрывается и ваш код может блокировать на чтение в одном потоке, при записи в трубе от другого.

void StartServer() 
    { 
     Task.Factory.StartNew(() => 
     { 
      var server = new NamedPipeServerStream("PipesOfPiece", PipeDirection.InOut, 1, PipeTransmissionMode.Byte, PipeOptions.Asynchronous); 
      server.WaitForConnection(); 
      reader = new StreamReader(server); 
      writer = new StreamWriter(server); 
     }); 
    } 

    private async void timer1_Tick(object sender, EventArgs e) 
    { 
     timer1.Stop(); 
     if (null != reader) 
     { 
      char[] buf = new char[50]; 

      int count = await reader.ReadAsync(buf, 0, 50); 

      if (0 < count) 
      { 
       m_textBox_from.Text = new string(buf, 0, count); 
      } 
     } 
     timer1.Start(); 
    } 
Смежные вопросы