2016-12-07 4 views
0

Protobuf-сети не может сериализовать следующий класс, потому что сериализации объектов типа Stream не поддерживаются:Protobuf-нетто: сериализовать свойство типа System.IO.Stream без загрузки всего потока в память

[ProtoContract] 
class StreamObject 
{ 
    [ProtoMember(1)] 
    public Stream StreamProperty { get; set; } 
} 

I знаю, что я могу обойти это, используя сериализованное свойство типа byte[] и чтение потока в это свойство, как в this question. Но для этого требуется, чтобы весь byte[] был загружен в память, которая, если поток длинный, может быстро выгрузить системные ресурсы.

Есть ли способ сериализации потока в виде массива байтов в protobuf-net без загрузки всей последовательности байтов в память?

+0

Вам нужно будет настроить процесс сериализации, потому что у вас нет объекта для передачи по проводу простым способом. Возможно, есть крючок, который дает вам доступ к проводу, но я не уверен, где это будет для protobuf-net. – Guvante

+1

Понял. Согласно [this] (http://stackoverflow.com/q/4523165/2561985) в настоящее время нет доступных крючков. Ответ старый, но после просмотра кода он все равно будет иметь место. – kcnygaard

ответ

2

Основная трудность здесь не в protobuf-net, это V2 protocol buffer format. Есть два способа повторный элемент (например, массив байт или поток) может быть закодирован:

  • В качестве упакованного повторил элемента. Здесь все элементы поля упаковываются в одну пару «ключ-значение» с проводным типом 2 (с разделителем длины). Каждый элемент кодируется так же, как обычно, за исключением тега, который предшествует ему.

    protobuf-net автоматически кодирует байтовые массивы в этом формате, однако для этого требуется знание общего количества байтов заблаговременно. Для байтового потока это может потребовать загрузки всего потока в память (например, когда StreamProperty.CanSeek == false), что нарушает ваши требования.

  • В качестве повторный элемент. Здесь закодированное сообщение имеет ноль или несколько пар ключ-значение с тем же номером тега.

    Для байтового потока использование этого формата вызовет массовое раздувание в закодированном сообщении, поскольку каждому байту потребуется дополнительный целочисленный ключ.

Как вы можете видеть, ни одно представление по умолчанию не отвечает вашим потребностям. Вместо этого имеет смысл кодировать большой поток байтов как последовательность «довольно больших» кусков, где каждый кусок упаковывается, но общая последовательность не является.

Следующая версия StreamObject делает это:

[ProtoContract] 
class StreamObject 
{ 
    public StreamObject() : this(new MemoryStream()) { } 

    public StreamObject(Stream stream) 
    { 
     if (stream == null) 
      throw new ArgumentNullException(); 
     this.StreamProperty = stream; 
    } 

    [ProtoIgnore] 
    public Stream StreamProperty { get; set; } 

    internal static event EventHandler OnDataReadBegin; 

    internal static event EventHandler OnDataReadEnd; 

    const int ChunkSize = 4096; 

    [ProtoMember(1, IsPacked = false, OverwriteList = true)] 
    IEnumerable<ByteBuffer> Data 
    { 
     get 
     { 
      if (OnDataReadBegin != null) 
       OnDataReadBegin(this, new EventArgs()); 

      while (true) 
      { 
       byte[] buffer = new byte[ChunkSize]; 
       int read = StreamProperty.Read(buffer, 0, buffer.Length); 
       if (read <= 0) 
       { 
        break; 
       } 
       else if (read == buffer.Length) 
       { 
        yield return new ByteBuffer { Data = buffer }; 
       } 
       else 
       { 
        Array.Resize(ref buffer, read); 
        yield return new ByteBuffer { Data = buffer }; 
        break; 
       } 
      } 

      if (OnDataReadEnd != null) 
       OnDataReadEnd(this, new EventArgs()); 
     } 
     set 
     { 
      if (value == null) 
       return; 
      foreach (var buffer in value) 
       StreamProperty.Write(buffer.Data, 0, buffer.Data.Length); 
     } 
    } 
} 

[ProtoContract] 
struct ByteBuffer 
{ 
    [ProtoMember(1, IsPacked = true)] 
    public byte[] Data { get; set; } 
} 

Обратите внимание на OnDataReadBegin и OnDataReadEnd события? Затем я добавил для целей отладки, чтобы включить проверку того, что входной поток фактически передается потоку в выходной поток protobuf. Следующий класс теста делает это:

internal class TestClass 
{ 
    public void Test() 
    { 
     var writeStream = new MemoryStream(); 

     long beginLength = 0; 
     long endLength = 0; 

     EventHandler begin = (o, e) => { beginLength = writeStream.Length; Console.WriteLine(string.Format("Begin serialization of Data, writeStream.Length = {0}", writeStream.Length)); }; 
     EventHandler end = (o, e) => { endLength = writeStream.Length; Console.WriteLine(string.Format("End serialization of Data, writeStream.Length = {0}", writeStream.Length)); }; 

     StreamObject.OnDataReadBegin += begin; 
     StreamObject.OnDataReadEnd += end; 

     try 
     { 
      int length = 1000000; 

      var inputStream = new MemoryStream(); 
      for (int i = 0; i < length; i++) 
      { 
       inputStream.WriteByte(unchecked((byte)i)); 
      } 
      inputStream.Position = 0; 

      var streamObject = new StreamObject(inputStream); 

      Serializer.Serialize(writeStream, streamObject); 
      var data = writeStream.ToArray(); 

      StreamObject newStreamObject; 
      using (var s = new MemoryStream(data)) 
      { 
       newStreamObject = Serializer.Deserialize<StreamObject>(s); 
      } 

      if (beginLength >= endLength) 
      { 
       throw new InvalidOperationException("inputStream was completely buffered before writing to writeStream"); 
      } 

      inputStream.Position = 0; 
      newStreamObject.StreamProperty.Position = 0; 

      if (!inputStream.AsEnumerable().SequenceEqual(newStreamObject.StreamProperty.AsEnumerable())) 
      { 
       throw new InvalidOperationException("!inputStream.AsEnumerable().SequenceEqual(newStreamObject.StreamProperty.AsEnumerable())"); 
      } 
      else 
      { 
       Console.WriteLine("Streams identical."); 
      } 
     } 
     finally 
     { 
      StreamObject.OnDataReadBegin -= begin; 
      StreamObject.OnDataReadEnd -= end; 
     } 
    } 
} 

public static class StreamExtensions 
{ 
    public static IEnumerable<byte> AsEnumerable(this Stream stream) 
    { 
     if (stream == null) 
      throw new ArgumentNullException(); 
     int b; 
     while ((b = stream.ReadByte()) != -1) 
      yield return checked((byte)b); 
    } 
} 

И выход из выше:

Begin serialization of Data, writeStream.Length = 0 
End serialization of Data, writeStream.Length = 1000888 
Streams identical. 

Что означает, что входной поток действительно передается на выход без полной загрузки в память сразу.

Прототип fiddle.

Есть ли механизм, позволяющий выставлять упакованный повторяющийся элемент пошагово с байтами из потока, зная длину заранее?

Оказывается, нет. Предполагая, что у вас есть поток, для которого CanSeek == true, вы можете инкапсулировать его в IList<byte>, который перечисляет через байты в потоке, обеспечивает случайный доступ к байтам в потоке и возвращает длину потока в IList.Count. Существует пример скрипта here, показывающий такую ​​попытку. К сожалению, однако, ListDecorator.Write() просто перечисляет список и буферизует его закодированное содержимое, прежде чем записывать их в выходной поток, что заставляет поток ввода полностью загружаться в память. I думаю это происходит потому, что protobuf-net кодирует List<byte> по-разному от byte [], а именно в виде последовательности с ограничением по длине Base 128 Varints. Поскольку представление Varint в byte иногда требует более одного байта, длина не может быть заранее рассчитана из списка. См. this answer для получения дополнительной информации о различии в том, как кодируются массивы и списки байтов. Должна быть возможность реализовать кодировку IList<byte> так же, как и byte [] - она ​​просто недоступна.

+0

Отличный ответ, спасибо. Возможно, стоит отметить, что для моего конкретного случая доступно свойство 'StreamProperty.Length', что означает, что количество необходимых байтов известно. Имеется ли механизм для выписывания упакованного повторяющегося элемента поэтапно с байтами из потока, зная длину заранее? – kcnygaard

Смежные вопросы