2015-02-07 3 views
2
var incomingStream = ... 
var outgoingStream = ... 

await incomingStream.CopyToAsync(outgoingStream); 

Приведенный выше код достаточно прост и копирует входящий поток в поток outgoign. Обе потоки, которые передаются, перемещаются вперед/назад по второму.Как создать поток обтекания потока, который может преобразовать поток

Теперь, позволяет сказать, что я хотел Transform поток что-то вроде Func<Stream,Stream,Task> как бы я сделать это без чтения всех данных.

Ofcause я мог бы просто сделать

var ms = new MemoryStream(); 
incomingStream.CopyTo(ms); 

--- do transform of streams and seek 
ms.CopyTo(outgoingStream) 

но прочитал бы дырочная вещь в мс, есть ли какая-либо сборка, которая позволяет мне читать из входящего потока и писать в новый поток, который не буферизует все, а вместо этого просто сохраняет небольшой внутренний поток для буферизованных данных, и он не будет читать из входящего потока до данные снова удаляются.

То, что я пытаюсь сделать, это:

protected async Task XmlToJsonStream(Stream instream, Stream outStream) 
    { 
     XmlReaderSettings readerSettings = new XmlReaderSettings(); 
     readerSettings.IgnoreWhitespace = false; 
     var reader = XmlReader.Create(instream, readerSettings); 
     var jsonWriter = new JsonTextWriter(new StreamWriter(outStream)); 
     jsonWriter.WriteStartObject(); 

     while (await reader.ReadAsync()) 
     { 
      jsonWriter.writeReader(reader); 
     } 
     jsonWriter.WriteEndObject(); 
     jsonWriter.Flush(); 
    } 
    protected async Task XmlFilterStream(Stream instream, Stream outStream) 
    { 
     XmlReaderSettings readerSettings = new XmlReaderSettings(); 
     readerSettings.IgnoreWhitespace = false; 
     var reader = XmlReader.Create(instream, readerSettings); 
     var writer = XmlWriter.Create(outStream, new XmlWriterSettings { Async = true, CloseOutput = false }) 

     while (reader.Read()) 
     { 
      writer.writeReader(reader); 
     } 


    } 

, но я не знаю, как подключить его.

var incomingStream = ... 
var outgoingStream = ... 
var temp=... 
XmlFilterStream(incomingStream,temp); 
XmlToJsonStream(temp,outgoingstream); 

, потому что, если я использую MemoryStream, как темп, то не только в конце есть все хранится в потоке. Поиск в потоке, который отбрасывает данные снова, когда он был прочитан.

Все вышеперечисленное представляет собой пример кода, отсутствующий в распоряжении некоторых объектов и ищет причины, но я надеюсь, что мне удалось проиллюстрировать то, что я собираюсь сделать. Чтобы иметь возможность на основе настроек подключаться и играть между простым копированием потока, выполнять фильтрацию xml и необязательно преобразовывать его в json.

ответ

2

Потоки являются последовательностями байт, поэтому преобразование потока будет чем-то вроде Func<ArraySegment<byte>, ArraySegment<byte>>. Затем вы можете применить его потоковым способом:

async Task TransformAsync(this Stream source, Func<ArraySegment<byte>, ArraySegment<byte>> transform, Stream destination, int bufferSize = 1024) 
{ 
    var buffer = new byte[bufferSize]; 
    while (true) 
    { 
    var bytesRead = await source.ReadAsync(buffer, 0, bufferSize); 
    if (bytesRead == 0) 
     return; 
    var bytesToWrite = transform(new ArraySegment(buffer, 0, bytesRead)); 
    if (bytesToWrite.Count != 0) 
     await destination.WriteAsync(bytesToWrite.Buffer, bytesToWrite.Offset, bytesToWrite.Count); 
    } 
} 

Это немного сложнее, но это общая идея. Для этого требуется некоторая логика, чтобы гарантировать, что WriteAsync записывает все байты; и обычно существует метод «флеш», который требуется в дополнение к методу transform, который вызывается, когда поток источника завершается, поэтому алгоритм преобразования имеет последний шанс вернуть свои окончательные данные для записи в выходной поток.

Если вы хотите потоки других вещей, таких как XML или JSON-типы, то вам, вероятно, лучше идти с Reactive Extensions.

+0

Я вижу, что этот способ более чистый, но с использованием потоков, как в моем примере, можно было бы использовать XmlReader/Writer и JsonReader/Writers для преобразования. Я подумаю о дизайне больше. –

+0

@ PoulK.Sørensen Вы когда-нибудь выяснили что-нибудь? Я в подобной ситуации. Попытка использовать SqlClient Streaming, которая просто берет «поток» в качестве параметра, но я хочу сжать исходный поток по мере его поступления в базу данных. Поэтому по существу я хочу обернуть исходный поток (то есть «FileStream») с помощью «GZipStream», так что каждый раз, когда SqlClient Streaming предположительно инициирует «ReadAsync», мой поток-оболочка сначала будет считывать данные из базового потока, сжимать данные, а затем возвращать сжатые байты для потоковой передачи SqlClient. – Terry

0

Я не уверен, что полностью понимаю ваш вопрос, но я думаю, вы спрашиваете, как работать с входным потоком, не загружая его полностью в память.

В этом случае, вы не хотели бы делать что-то вроде этого:

var ms = new MemoryStream(); 
incomingStream.CopyTo(ms); 

Это делает нагрузку весь поток ввода incomingStream в память - в ms.

Из всего, что я вижу, ваш метод XmlFilterStream кажется излишним, то есть XmlToJsonStream делает все, что XmlFilterStream все равно.

Почему не просто:

protected async Task XmlToJsonStream(Stream instream, Stream outStream) 
{ 
    XmlReaderSettings readerSettings = new XmlReaderSettings(); 
    readerSettings.IgnoreWhitespace = false; 
    var reader = XmlReader.Create(instream, readerSettings); 
    var jsonWriter = new JsonTextWriter(new StreamWriter(outStream)); 
    jsonWriter.WriteStartObject(); 

    while (await reader.ReadAsync()) 
    { 
     jsonWriter.writeReader(reader); 
    } 
    jsonWriter.WriteEndObject(); 
    jsonWriter.Flush(); 
} 

И называть это так:

var incomingStream = ... 
var outgoingStream = ... 
XmlToJsonStream(incomingStream ,outgoingstream); 

Если ответом является то, что вы опустили некоторые важные детали в XmlFilterStream тогда, не видя эти подробности, я будет рекомендовать вам просто интегрировать их в одну функцию XmlToJsonStream.

+0

Прошу прощения за то, что ушел. XmlFilterStream будет делать больше, чем просто читать/писать, он будет читать xml, (if (Include (reader) {writer (reader)). Но я получаю вашу точку –

Смежные вопросы