2017-01-23 2 views
1

Чтобы начать, стоит упомянуть, что в рамках одного решения F # сериализация и десериализация сообщений Bond работает нормально. Однако у меня возникают проблемы с правильной обработкой отправки и/или получения сообщения через ZeroMQ.Ошибка выполнения «EndOfStreamException» в сериализации MS Bond над ZeroMQ

На стороне абонента следующей программы есть ошибка времени выполнения. Файл .bond определяется и компилируется с помощью компилятора связей. Затем dll создается из C# для вызова из F #. Затем у меня есть две программы F #. Один, который публикует сериализованные данные через сокет tcp и другой, который является подписчиком. Когда сообщение получено на суб, линия, которая пытается Unmarshal необработанных данных, является той, которая вызывает ошибку времени выполнения. Может ли кто-нибудь увидеть причину этого?

[EDIT] Комментарий от пользователя Fyodor, я внес изменения на стороне издателя, который изменяет ошибку на стороне абонента. Таким образом, ошибка, вероятно, имеет какое-то отношение к тому, как я упаковываю и распаковываю информацию.

Это .bond файл

namespace Examples 

struct Record 
{ 
    0: map<string, double> payload; 
} 

Вот издатель:

// publisher 

open System 
open Bond 
open Bond.Protocols 
open Bond.IO.Safe 
open ZeroMQ 

let ctx = new ZContext() 
let publisher = new ZSocket(ctx, ZSocketType.PUB) 
publisher.Bind("tcp://*:5556") 

let src = new Examples.Record() 
src.payload.Add("a", 1.) 
src.payload.Add("b", 2.) 

let output = new OutputBuffer() 
let writer = new CompactBinaryWriter<OutputBuffer>(output) 

while true do 
    Marshal.To(writer, src) 
    //let input = new InputBuffer(output.Data) 
    //let byteArr = input.ReadBytes(int(input.Length - 1L)) 
    let updateFrame = new ZFrame(System.Text.Encoding.ASCII.GetString output.Data.Array) 
    publisher.Send(updateFrame) 

Здесь абонент:

// subscriber 

open Bond 
open Bond.Protocols 
open Bond.IO.Safe 
open System 
open System.Text 
open ZeroMQ 

let ctx = new ZContext() 
let subscriber = new ZSocket(ctx, ZSocketType.SUB) 
subscriber.Connect("tcp://127.0.0.1:5556") 
subscriber.SubscribeAll() 

let output = new OutputBuffer()  
while true do  
    let received = subscriber.ReceiveFrame() 
    let byteArr = Encoding.ASCII.GetBytes (received.ReadString()) 
    let arrSeg = ArraySegment<byte>(byteArr) 
    let input = new InputBuffer(arrSeg) 
    let dst = Unmarshal<Examples.Record>.From(input) 
    for KeyValue(k, v) in dst.payload do 
     printfn "%A %A" k v 
+3

Я вижу, что вы создаете 'ZFrame' из' byteArr.ToString() '. Это не делает то, что вы думаете. Попробуйте распечатать результат 'byteArr.ToString()', чтобы узнать, что вы действительно отправляете. –

ответ

4

На приемной стороне, когда вы пытаетесь декодировать маршалированную Bond Compact Binary как строку ASCII, вы теряете часть полезной нагрузки. При маршалировании структуры типа Record на компактную двоичную систему первые четыре байта полезной нагрузки составляют 0x43 0x42 0x10 0x00. При чтении строки из ZFrame, the first embedded NUL (0x00), который встречается, сигнализирует конец строки, независимо от размера кадра. Таким образом, сторона чтения видит только 0x43 0x42 0x10 вместо всей полезной нагрузки (29 байт, когда я тестировал).

Поскольку Compact Binary является двоичный протокол, вы хотите использовать ZFrame конструктор, который принимает буфер на стороне издателя:

let updateFrame = new ZFrame(output.Data.Array, output.Data.Offset, output.Data.Count) 

На стороне абонента, вы просто хотите читать буфер:

let byteArr = received.Read() 

Кроме того, на стороне издателя, вы постоянно накапливающиеся данные в том же OUTPUTBUFFER. Вы хотите сбросить output.Position до 0 перед вами Маршаллом вашей следующей записи повторно использовать буфер вместо выращивания его:

while true do 
    Marshal.To(writer, src) 
    let updateFrame = new ZFrame(output.Data.Array, output.Data.Offset, output.Data.Count)output.Data.Array) 
    publisher.Send(updateFrame) 
    output.Position <- 0 

Другой вещь, чтобы отметить: буфер по умолчанию выделена для OutputBuffer является 65KiB. Подумайте о том, чтобы сделать это меньше, как только вы узнаете о том, насколько большой будет ваша полезная нагрузка.

NB: Я отлаживал это в приложении C#, которое имело аналогичную семантику. Вот что я использовал:

namespace so_q_zmq 
{ 
    using System; 
    using System.Collections.Generic; 
    using System.Text; 
    using System.Threading.Tasks; 
    using Bond; 
    using Bond.IO.Safe; 
    using Bond.Protocols; 
    using ZeroMQ; 

    [Schema] 
    class Record 
    { 
     [Id(0)] 
     public Dictionary<string, double> payload = new Dictionary<string, double>(); 
    } 

    class Program 
    { 
     static void Main(string[] args) 
     { 
      var pTask = Task.Run(() => 
      { 
       try 
       { 
        Publisher(); 
       } 
       catch (Exception ex) 
       { 
        Console.WriteLine("Publisher failed: {0}", ex); 
       } 
      }); 

      var sTask = Task.Run(() => 
      { 
       try 
       { 
        Subscriber(); 
       } 
       catch (Exception ex) 
       { 
        Console.WriteLine("Subscriber failed: {0}", ex); 
       } 
      }); 

      Task.WaitAll(pTask, sTask); 
      Console.WriteLine("Done"); 
      Console.ReadLine(); 
     } 

     static void Publisher() 
     { 
      var ctx = new ZContext(); 
      var publisher = new ZSocket(ctx, ZSocketType.PUB); 
      publisher.Bind("tcp://127.0.0.1:12345"); 

      var src = new Record(); 
      src.payload.Add("a", 1.0); 
      src.payload.Add("b", 2.0); 

      var output = new OutputBuffer(); 
      var writer = new CompactBinaryWriter<OutputBuffer>(output); 

      for (;;) 
      { 
       Marshal.To(writer, src); 
       // INCORRECT: 
       // var str = Encoding.ASCII.GetString(output.Data.Array); 
       // var updateFrame = new ZFrame(str); 
       var updateFrame = new ZFrame(output.Data.Array, output.Data.Offset, output.Data.Count); 
       publisher.Send(updateFrame); 
       output.Position = 0; 
      } 
     } 

     static void Subscriber() 
     { 
      var ctx = new ZContext(); 
      var subscriber = new ZSocket(ctx, ZSocketType.SUB); 
      subscriber.Connect("tcp://127.0.0.1:12345"); 
      subscriber.SubscribeAll(); 

      for (;;) 
      { 
       var received = subscriber.ReceiveFrame(); 
       // INCORRECT 
       // var str = received.ReadString(); 
       // var byteArr = Encoding.ASCII.GetBytes(str); 
       var byteArr = received.Read(); 
       var arrSeg = new ArraySegment<byte>(byteArr); // There's an InputBuffer ctor that takes a byte[] directly 
       var input = new InputBuffer(arrSeg); 
       var dst = Unmarshal<Record>.From(input); 
       foreach (var kvp in dst.payload) 
       { 
        Console.WriteLine("{0} {1}", kvp.Key, kvp.Value); 
       } 
      } 
     } 
    } 
} 
Смежные вопросы