2009-08-12 3 views
1

У меня проблема с чтением из HttpResponseStream, потому что StreamReader, который я обертываю, читается быстрее, чем поток ответа получает фактический ответ. Я извлекаю файл с достаточно маленьким размером (около 60 тыс.), Но парсер, который обрабатывает ответ в фактическом объекте, терпит неудачу, потому что он попадает в неожиданный символ (код 65535), который, как я знаю, является персонажем, созданным при чтении с StreamReader, и больше нет доступных символов.Ошибка чтения из HttpResponseStream

Для записи я знаю, что возвращаемое содержимое является действительным и будет правильно анализироваться, поскольку сбой происходит в разных точках файла каждый раз, когда я запускаю код. Это строка parser.Load(), в которой она терпит неудачу.

Есть ли способ гарантировать, что я прочитал весь контент, прежде чем пытаться разобрать его, не копируя поток ответов в MemoryStream или строку, а затем обрабатывая его?

/// <summary> 
    /// Makes a Query where the expected Result is an RDF Graph ie. CONSTRUCT and DESCRIBE Queries 
    /// </summary> 
    /// <param name="sparqlQuery">SPARQL Query String</param> 
    /// <returns>RDF Graph</returns> 
    public Graph QueryWithResultGraph(String sparqlQuery) 
    { 
     try 
     { 
      //Build the Query URI 
      StringBuilder queryUri = new StringBuilder(); 
      queryUri.Append(this._endpoint.ToString()); 
      queryUri.Append("?query="); 
      queryUri.Append(Uri.EscapeDataString(sparqlQuery)); 

      if (!this._defaultGraphUri.Equals(String.Empty)) 
      { 
       queryUri.Append("&default-graph-uri="); 
       queryUri.Append(Uri.EscapeUriString(this._defaultGraphUri)); 
      } 

      //Make the Query via HTTP 
      HttpWebResponse httpResponse = this.DoQuery(new Uri(queryUri.ToString()),false); 

      //Set up an Empty Graph ready 
      Graph g = new Graph(); 
      g.BaseURI = this._endpoint; 

      //Parse into a Graph based on Content Type 
      String ctype = httpResponse.ContentType; 
      IRDFReader parser = MIMETypesHelper.GetParser(ctype); 
      parser.Load(g, new StreamReader(httpResponse.GetResponseStream())); 

      return g; 
     } 
     catch (UriFormatException uriEx) 
     { 
      //URI Format Invalid 
      throw new Exception("The format of the URI was invalid", uriEx); 
     } 
     catch (WebException webEx) 
     { 
      //Some sort of HTTP Error occurred 
      throw new Exception("A HTTP Error occurred", webEx); 
     } 
     catch (RDFException) 
     { 
      //Some problem with the RDF or Parsing thereof 
      throw; 
     } 
     catch (Exception) 
     { 
      //Other Exception 
      throw; 
     } 
    } 

    /// <summary> 
    /// Internal Helper Method which executes the HTTP Requests against the SPARQL Endpoint 
    /// </summary> 
    /// <param name="target">URI to make Request to</param> 
    /// <param name="sparqlOnly">Indicates if only SPARQL Result Sets should be accepted</param> 
    /// <returns>HTTP Response</returns> 
    private HttpWebResponse DoQuery(Uri target, bool sparqlOnly) 
    { 
     //Expect errors in this function to be handled by the calling function 

     //Set-up the Request 
     HttpWebRequest httpRequest; 
     HttpWebResponse httpResponse; 
     httpRequest = (HttpWebRequest)WebRequest.Create(target); 

     //Use HTTP GET/POST according to user set preference 
     if (!sparqlOnly) 
     { 
      httpRequest.Accept = MIMETypesHelper.HTTPAcceptHeader(); 
      //For the time being drop the application/json as this doesn't play nice with Virtuoso 
      httpRequest.Accept = httpRequest.Accept.Replace("," + MIMETypesHelper.JSON[0], String.Empty); 
     } 
     else 
     { 
      httpRequest.Accept = MIMETypesHelper.HTTPSPARQLAcceptHeader(); 
     } 
     httpRequest.Method = this._httpMode; 
     httpRequest.Timeout = this._timeout; 

     //HTTP Debugging 
     if (Options.HTTPDebugging) 
     { 
      Tools.HTTPDebugRequest(httpRequest); 
     } 

     httpResponse = (HttpWebResponse)httpRequest.GetResponse(); 

     //HTTP Debugging 
     if (Options.HTTPDebugging) 
     { 
      Tools.HTTPDebugResponse(httpResponse); 
     } 

     return httpResponse; 
    } 

Редактировать

Чтобы пояснить, что я уже говорил это не ошибка в Parser, это вопрос о StreamReader чтения быстрее, чем поток Response предоставляет данные. Я могу обойти эту проблему, выполнив следующие действия, но хотел бы предложения лучших или более элегантные решения:

  //Parse into a Graph based on Content Type 
      String ctype = httpResponse.ContentType; 
      IRDFReader parser = MIMETypesHelper.GetParser(ctype); 
      Stream response = httpResponse.GetResponseStream(); 
      MemoryStream temp = new MemoryStream(); 
      Tools.StreamCopy(response, temp); 
      response.Close(); 
      temp.Seek(0, SeekOrigin.Begin); 
      parser.Load(g, new StreamReader(temp)); 

Обработать класс 2

BlockingStreamReader согласно предложению Имон по:

/// <summary> 
/// A wrapper to a Stream which does all its Read() and Peek() calls using ReadBlock() to handle slow underlying streams (eg Network Streams) 
/// </summary> 
public sealed class BlockingStreamReader : StreamReader 
{ 
    private bool _peeked = false; 
    private int _peekChar = -1; 

    public BlockingStreamReader(StreamReader reader) : base(reader.BaseStream) { } 

    public BlockingStreamReader(Stream stream) : base(stream) { } 

    public override int Read() 
    { 
     if (this._peeked) 
     { 
      this._peeked = false; 
      return this._peekChar; 
     } 
     else 
     { 
      if (this.EndOfStream) return -1; 

      char[] cs = new char[1]; 
      base.ReadBlock(cs, 0, 1); 

      return cs[0]; 
     } 
    } 

    public override int Peek() 
    { 
     if (this._peeked) 
     { 
      return this._peekChar; 
     } 
     else 
     { 
      if (this.EndOfStream) return -1; 

      this._peeked = true; 

      char[] cs = new char[1]; 
      base.ReadBlock(cs, 0, 1); 

      this._peekChar = cs[0]; 
      return this._peekChar; 
     } 
    } 

    public new bool EndOfStream 
    { 
     get 
     { 
      return (base.EndOfStream && !this._peeked); 
     } 
    } 
} 

Редактировать 3

Здесь находится значительно улучшенный solutio n, который может обернуть любой TextReader и предоставить EndOfStream. Он использует внутренний буфер, который заполняется с помощью ReadBlock() на обернутом TextReader. Все Read() методы читателя может быть определено с помощью этого буфера, размер буфера задается:

/// <summary> 
/// The BlockingTextReader is an implementation of a <see cref="TextReader">TextReader</see> designed to wrap other readers which may or may not have high latency. 
/// </summary> 
/// <remarks> 
/// <para> 
/// This is designed to avoid premature detection of end of input when the input has high latency and the consumer tries to read from the input faster than it can return data. All methods are defined by using an internal buffer which is filled using the <see cref="TextReader.ReadBlock">ReadBlock()</see> method of the underlying <see cref="TextReader">TextReader</see> 
/// </para> 
/// </remarks> 
public sealed class BlockingTextReader : TextReader 
{ 
    private char[] _buffer; 
    private int _pos = -1; 
    private int _bufferAmount = -1; 
    private bool _finished = false; 
    private TextReader _reader; 

    public const int DefaultBufferSize = 1024; 

    public BlockingTextReader(TextReader reader, int bufferSize) 
    { 
     if (reader == null) throw new ArgumentNullException("reader", "Cannot read from a null TextReader"); 
     if (bufferSize < 1) throw new ArgumentException("bufferSize must be >= 1", "bufferSize"); 
     this._reader = reader; 
     this._buffer = new char[bufferSize]; 
    } 

    public BlockingTextReader(TextReader reader) 
     : this(reader, DefaultBufferSize) { } 

    public BlockingTextReader(Stream input, int bufferSize) 
     : this(new StreamReader(input), bufferSize) { } 

    public BlockingTextReader(Stream input) 
     : this(new StreamReader(input)) { } 

    private void FillBuffer() 
    { 
     this._pos = -1; 
     if (this._finished) 
     { 
      this._bufferAmount = 0; 
     } 
     else 
     { 
      this._bufferAmount = this._reader.ReadBlock(this._buffer, 0, this._buffer.Length); 
      if (this._bufferAmount == 0 || this._bufferAmount < this._buffer.Length) this._finished = true; 
     } 
    } 

    public override int ReadBlock(char[] buffer, int index, int count) 
    { 
     if (count == 0) return 0; 
     if (buffer == null) throw new ArgumentNullException("buffer"); 
     if (index < 0) throw new ArgumentException("index", "Index must be >= 0"); 
     if (count < 0) throw new ArgumentException("count", "Count must be >= 0"); 
     if ((buffer.Length - index) < count) throw new ArgumentException("Buffer too small"); 

     if (this._bufferAmount == -1 || this._pos >= this._bufferAmount) 
     { 
      if (!this._finished) 
      { 
       this.FillBuffer(); 
       if (this.EndOfStream) return 0; 
      } 
      else 
      { 
       return 0; 
      } 
     } 

     this._pos = Math.Max(0, this._pos); 
     if (count <= this._bufferAmount - this._pos) 
     { 
      //If we have sufficient things buffered to fufill the request just copy the relevant stuff across 
      Array.Copy(this._buffer, this._pos, buffer, index, count); 
      this._pos += count; 
      return count; 
     } 
     else 
     { 
      int copied = 0; 
      while (copied < count) 
      { 
       int available = this._bufferAmount - this._pos; 
       if (count < copied + available) 
       { 
        //We can finish fufilling this request this round 
        int toCopy = Math.Min(available, count - copied); 
        Array.Copy(this._buffer, this._pos, buffer, index + copied, toCopy); 
        copied += toCopy; 
        this._pos += toCopy; 
        return copied; 
       } 
       else 
       { 
        //Copy everything we currently have available 
        Array.Copy(this._buffer, this._pos, buffer, index + copied, available); 
        copied += available; 
        this._pos = this._bufferAmount; 

        if (!this._finished) 
        { 
         //If we haven't reached the end of the input refill our buffer and continue 
         this.FillBuffer(); 
         if (this.EndOfStream) return copied; 
         this._pos = 0; 
        } 
        else 
        { 
         //Otherwise we have reached the end of the input so just return what we've managed to copy 
         return copied; 
        } 
       } 
      } 
      return copied; 
     } 
    } 

    public override int Read(char[] buffer, int index, int count) 
    { 
     return this.ReadBlock(buffer, index, count); 
    } 

    public override int Read() 
    { 
     if (this._bufferAmount == -1 || this._pos >= this._bufferAmount - 1) 
     { 
      if (!this._finished) 
      { 
       this.FillBuffer(); 
       if (this.EndOfStream) return -1; 
      } 
      else 
      { 
       return -1; 
      } 
     } 

     this._pos++; 
     return (int)this._buffer[this._pos]; 
    } 

    public override int Peek() 
    { 
     if (this._bufferAmount == -1 || this._pos >= this._bufferAmount - 1) 
     { 
      if (!this._finished) 
      { 
       this.FillBuffer(); 
       if (this.EndOfStream) return -1; 
      } 
      else 
      { 
       return -1; 
      } 
     } 

     return (int)this._buffer[this._pos + 1]; 
    } 

    public bool EndOfStream 
    { 
     get 
     { 
      return this._finished && (this._pos >= this._bufferAmount - 1); 
     } 
    } 

    public override void Close() 
    { 
     this._reader.Close(); 
    } 

    protected override void Dispose(bool disposing) 
    { 
     this.Close(); 
     this._reader.Dispose(); 
     base.Dispose(disposing); 
    } 
} 
+0

Итак, через девять лет после того, как он был представлен, вы оказались первым человеком в мире, который обнаружил, что «StreamReader» читает быстрее, чем «Поток», предназначенный для чтения, верно ли? –

+0

Нет, я просто случайно задаюсь вопросом, было ли у кого-нибудь какие-либо решения более элегантными, чем выше. – RobV

+0

Решения для чего? 'StreamReader' не читает быстрее, чем' Stream'. –

ответ

1

Не зная специфику парсер вы используете, я могу только догадываться ошибки, но есть довольно легко сделать ошибку. .NET framework I/O libs почти поощряют вас делать ...

Знаете ли вы, что Streams и TextReaders могут читать меньше байтов/символов, чем запрошено?

В частности, TextReader.Read (символ [] буфер INT индекс счета ИНТ) 's документы говорят:

Возвращаемое значение

Тип: System .. :: Int32.

Число символов, которые были прочитаны. Номер будет менее или равен количеству, в зависимости от того, доступны ли данные в потоке. Этот метод возвращает ноль, если он вызывается, когда больше символов осталось читать.

Emphasis mine.

Например, если вы вызываете reader.Read (buffer, 0, 100), то вы не должны не использовать Предположим, что было прочитано 100 символов.

Редактировать: Очень вероятно, что парсер действительно предполагает это; и объясняет ваше наблюдаемое поведение: если вы полностью кэшируете поток в MemoryStream, всегда будет достаточно символов для заполнения запроса, но если вы этого не сделаете, анализатор получит меньше символов, чем запрашивается в непредсказуемое время, когда базовый поток «медленный».

Edit2: Вы можете исправить ошибку, заменив все экземпляры TextReader.Read() в парсер с TextReader.ReadBlock().

+0

Я знал об этом, я не уверен, что это необходимо. поскольку ошибка в StreamReader больше всего выглядит так, как она себя ведет, когда базовый поток может быть медленным. Parser не является проблемой, если я использую второй фрагмент кода (добавленный к исходному вопросу), который читает весь поток перед его синтаксическим анализом, обрабатывает штраф – RobV

+0

Это ошибка в синтаксическом анализаторе с очень высокой вероятностью. По замыслу, если базовый поток является «медленным», потоковая программа возвращает меньшее количество символов, чем запрошено. Использование memystream в качестве основного потока заставляет streamreader всегда возвращать полное количество символов - работая над ошибкой в ​​синтаксическом анализаторе. –

+0

Парсер использует базовый токенизатор, который читает символ по символу с помощью метода Read(), поэтому вы, скорее всего, правы, я проверю объект ReadBlock() и приму ваш ответ, если это решит проблему. – RobV

0

Для поддержки блокировки чтения сценария с, а не подклассов StreamReader, вы можете создать подкласс TextReader: это позволит избежать проблем с EndOfStream, и это означает, что вы можете сделать любой читатель блокировка - не только StreamReader s:

public sealed class BlockingReader : TextReader 
{ 
    bool hasPeeked; 
    int peekChar; 
    readonly TextReader reader; 

    public BlockingReader(TextReader reader) { this.reader = reader; } 

    public override int Read() 
    { 
     if (!hasPeeked) 
      return reader.Read(); 
     hasPeeked = false; 
     return peekChar; 
    } 

    public override int Peek() 
    { 
     if (!hasPeeked) 
     { 
      peekChar = reader.Read(); 
      hasPeeked = true; 
     } 
     return peekChar; 
    } 

    public override int Read(char[] buffer, int index, int count) 
    { 
     if (buffer == null) 
      throw new ArgumentNullException("buffer"); 
     if (index < 0) 
      throw new ArgumentOutOfRangeException("index"); 
     if (count < 0) 
      throw new ArgumentOutOfRangeException("count"); 
     if ((buffer.Length - index) < count) 
      throw new ArgumentException("Buffer too small"); 

     int peekCharsRead = 0; 
     if (hasPeeked) 
     { 
      buffer[index] = (char)peekChar; 
      hasPeeked = false; 
      index++; 
      count--; 
      peekCharsRead++; 
     } 

     return peekCharsRead + reader.ReadBlock(buffer, index, count); 
    } 

    protected override void Dispose(bool disposing) 
    { 
     try 
     { 
      if (disposing) 
       reader.Dispose(); 
     } 
     finally 
     { 
      base.Dispose(disposing); 
     } 
    } 
} 
Смежные вопросы