2015-04-24 4 views
1

Я пишу данные в сеть из Rx. Естественно, я использую Finally, чтобы закрыть мой поток, когда заканчивается подписка. Это работает как на OnError(), так и на OnComplete(). Rx будет работать OnNext() ... OnNext(), OnComplete(), Finally() в последовательности.Закройте неуправляемые ресурсы, когда конец подписки в реактивных расширениях

Однако иногда я хочу завершить последовательность раньше, для этого я использую Dispose(). Каким-то образом Finally() теперь запускается параллельно с последним вызовом OnNext(), в результате чего возникают исключения при записи в поток в OnNext(), а также незавершенные записи.

Моя подписка выглядит примерно так:

NetworkStream stm = client.GetStream(); 
IDisposable disp = obs 
    .Finally(() => { 
     client.Close(); 
    }) 
    .Subscribe(d => { 
     client.GetStream().Write(d.a, 0, d.a.Lenght); 
     client.GetStream().Write(d.b, 0, d.b.Lenght); 
    }() => { 
     client.GetStream().Write(something(), 0, 1); 
    }); 
Thread.sleep(1000); 
disp.Dispose(); 

Я также попробовал альтернативу, CancellationToken.

Как я могу отменить мою подписку правильно? Я не возражаю, если он пропускает OnComplete(), пока Finally() все еще работает. Однако запуск Finally() в параллель проблематичен.

У меня также есть ощущение, что должен быть лучший способ управлять ресурсами, перемещая объявление в последовательность, что было бы еще лучшим решением.

Редактировать: Следующий код более четко отображает проблему. Я бы надеялся, что он всегда печатает истину, вместо этого он дает ложь чаще, чем нет, указав конец Dispose до последнего OnNext.

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Net.Sockets; 
using System.Reactive; 
using System.Reactive.Disposables; 
using System.Reactive.Linq; 
using System.Text; 
using System.Threading; 
using System.Threading.Tasks; 

namespace ConsoleApplication1 
{ 
    class Program 
    { 
     static void Main(string[] args) 
     { 
      Console.WriteLine("Try finally"); 
      for (int i = 0; i < 10; i++) 
      { 
       Finally(); 
      } 
      Console.WriteLine("Try using"); 
      for (int i = 0; i < 10; i++) 
      { 
       Using(); 
      } 
      Console.WriteLine("Try using2"); 
      for (int i = 0; i < 10; i++) 
      { 
       Using2(); 
      } 
      Console.ReadKey(); 
     } 

     private static void Using2() 
     { 
      bool b = true, c = true, d; 
      var dis = Disposable.Create(() => c = b); 
      IDisposable obDis = Observable.Using(
       () => dis, 
       _ => Observable.Create<Unit>(obs=> 
        Observable.Generate(0, 
        i => i < 1000, 
        i => i + 1, 
        i => i, 
        i => TimeSpan.FromMilliseconds(1) 
       ).Subscribe(__ => { b = false; Thread.Sleep(100); b = true; }))) 
       .Subscribe(); 
      Thread.Sleep(15); 
      obDis.Dispose(); 
      d = b; 
      Thread.Sleep(101); 
      Console.WriteLine("OnDispose: {1,5} After: {2,5} Sleep: {0,5}", b, c, d); 
     } 

     private static void Using() 
     { 
      bool b = true, c = true, d; 
      var dis = Disposable.Create(() => c = b); 
      IDisposable obDis = Observable.Using(
       () => dis, 
       _ => Observable.Generate(0, 
        i => i < 1000, 
        i => i + 1, 
        i => i, 
        i => TimeSpan.FromMilliseconds(1) 
       )).Subscribe(_ => { b = false; Thread.Sleep(100); b = true; }); 
      Thread.Sleep(15); 
      obDis.Dispose(); 
      d = b; 
      Thread.Sleep(101); 
      Console.WriteLine("OnDispose: {1,5} After: {2,5} Sleep: {0,5}", b, c, d); 
     } 

     private static void Finally() 
     { 
      bool b = true, c = true, d; 
      IDisposable obDis = Observable.Generate(0, 
       i => i < 1000, 
       i => i + 1, 
       i => i, 
       _ => DateTime.Now.AddMilliseconds(1) 
       ) 
       .Finally(() => c = b) 
       .Subscribe(_ => { b = false; Thread.Sleep(100); b = true; }); 
      Thread.Sleep(15); 
      obDis.Dispose(); 
      d = b; 
      Thread.Sleep(101); 
      Console.WriteLine("OnDispose: {1,5} After: {2,5} Sleep: {0,5}", b, c, d); 
     } 
    } 
} 

ответ

5

Finally скорее всего не то, что вы хотите. Он не будет удалять ваш ресурс при отмене подписки. Скорее, он будет вести себя как обычный блок finally в C#, то есть он гарантирует, что какой-то код будет выполнен независимо от того, действительно ли код в его соответствующем try -блоке сделал исключение. Кроме того, учитывая, что this question on MSDN, ваш код в Finally может даже не выполняться при любых обстоятельствах, так как ваша подписка не указывает обработчик ошибок.

Что вы, вероятно, хотите, Using:

IDisposable disp = Observable 
    .Using(
     () => Disposable.Create(() => client.Close), 
     _ => obs) 
    .Subscribe(....); 

Using заботится о том, что ресурс должным образом утилизировать, когда наблюдаемые завершаясь или отмены подписки.

Предполагая client является TcpClient, она становится еще проще:

IDisposable disp = Observable 
    .Using(
     () => client), 
     _ => obs) 
    .Subscribe(....); 

Я ожидаю, что звонки на OnNext не будут пересекаться с закрытием клиента, даже когда unsubsribing рано, но я не проверял это.

Последнее: Остерегайтесь закрытия внешних переменных, таких как stm в вашем примере. Безопаснее всегда работать с местными жителями. Полная перезапись, как я хотел бы попробовать это выглядит следующим образом:

IDisposable disp = Observable.Using(
    () => client, 
    _ => Observable.Using(
     () => client.GetStream(), 
     stream => Observable.Create<Unit>(observer => obs 
      .Subscribe(
       d => { 
        stream.Write(d.a, 0, d.a.Lenght); 
        stream.Write(d.b, 0, d.b.Lenght); 
       }, 
       () => { 
        stream.Write(something(), 0, 1); 
       })))) 
    .Subscribe(); 
+0

Фактически 'Наконец' действительно работает на ошибках, я думаю, что ошибка была исправлена ​​с 2012 года. В любом случае, сброс вашего ответа выглядит многообещающим, у меня было ощущение, что« Наблюдаемое. Использование »должно соответствовать, но я не мог понять как еще. 'client' действительно является' TcpClient', поэтому я собираюсь дать это. – Dorus

+0

yes 'Using' - это путь. – Brandon

+0

Btw, ваш последний пример не компилируется. Во второй строке есть ошибка скобки, и что-то не так с 'Observable.Create', которое я не могу исправить. – Dorus

0

Я думаю, что вы просто сделали неверное предположение о том, как NetworkStream работает.

NetworkStream.Write и TcpClient.Close не обязательно ждать, пока клиент прочитает данные. (Кроме того, NetworkStream.Flush ничего не делает).

Когда вы вызываете Close, вы, вероятно, закрываете сокет, прежде чем клиент прочитает все.

Посмотрите на этот смежный вопрос: NetworkStream doesn't always send data unless I Thread.Sleep() before closing the NetworkStream. What am I doing wrong?

Эта страница по-разному упоминает такие вещи, как, используя перегрузку Close, которая принимает тайм-аут, или указав LingerOption - но лучше либо отправив Shutdown или иметь более высокий абстракция сообщений уровня, когда клиент подтверждает ваше сообщение с ответом его собственной.

Смежные вопросы