Я пишу данные в сеть из Rx. Естественно, я использую Finally
, чтобы закрыть мой поток, когда заканчивается подписка. Это работает как на OnError()
, так и на OnComplete()
. Rx будет работать OnNext()
... OnNext()
, OnComplete()
, Finally()
в последовательности.Закройте неуправляемые ресурсы, когда конец подписки в реактивных расширениях
Однако иногда я хочу завершить последовательность раньше, для этого я использую Dispose()
. Каким-то образом Finally()
теперь запускается параллельно с последним вызовом OnNext()
, в результате чего возникают исключения при записи в поток в OnNext()
, а также незавершенные записи.
Моя подписка выглядит примерно так:
NetworkStream stm = client.GetStream();
IDisposable disp = obs
.Finally(() => {
client.Close();
})
.Subscribe(d => {
client.GetStream().Write(d.a, 0, d.a.Lenght);
client.GetStream().Write(d.b, 0, d.b.Lenght);
}() => {
client.GetStream().Write(something(), 0, 1);
});
Thread.sleep(1000);
disp.Dispose();
Я также попробовал альтернативу, CancellationToken
.
Как я могу отменить мою подписку правильно? Я не возражаю, если он пропускает OnComplete()
, пока Finally()
все еще работает. Однако запуск Finally()
в параллель проблематичен.
У меня также есть ощущение, что должен быть лучший способ управлять ресурсами, перемещая объявление в последовательность, что было бы еще лучшим решением.
Редактировать: Следующий код более четко отображает проблему. Я бы надеялся, что он всегда печатает истину, вместо этого он дает ложь чаще, чем нет, указав конец Dispose до последнего OnNext
.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
using System.Reactive;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApplication1
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Try finally");
for (int i = 0; i < 10; i++)
{
Finally();
}
Console.WriteLine("Try using");
for (int i = 0; i < 10; i++)
{
Using();
}
Console.WriteLine("Try using2");
for (int i = 0; i < 10; i++)
{
Using2();
}
Console.ReadKey();
}
private static void Using2()
{
bool b = true, c = true, d;
var dis = Disposable.Create(() => c = b);
IDisposable obDis = Observable.Using(
() => dis,
_ => Observable.Create<Unit>(obs=>
Observable.Generate(0,
i => i < 1000,
i => i + 1,
i => i,
i => TimeSpan.FromMilliseconds(1)
).Subscribe(__ => { b = false; Thread.Sleep(100); b = true; })))
.Subscribe();
Thread.Sleep(15);
obDis.Dispose();
d = b;
Thread.Sleep(101);
Console.WriteLine("OnDispose: {1,5} After: {2,5} Sleep: {0,5}", b, c, d);
}
private static void Using()
{
bool b = true, c = true, d;
var dis = Disposable.Create(() => c = b);
IDisposable obDis = Observable.Using(
() => dis,
_ => Observable.Generate(0,
i => i < 1000,
i => i + 1,
i => i,
i => TimeSpan.FromMilliseconds(1)
)).Subscribe(_ => { b = false; Thread.Sleep(100); b = true; });
Thread.Sleep(15);
obDis.Dispose();
d = b;
Thread.Sleep(101);
Console.WriteLine("OnDispose: {1,5} After: {2,5} Sleep: {0,5}", b, c, d);
}
private static void Finally()
{
bool b = true, c = true, d;
IDisposable obDis = Observable.Generate(0,
i => i < 1000,
i => i + 1,
i => i,
_ => DateTime.Now.AddMilliseconds(1)
)
.Finally(() => c = b)
.Subscribe(_ => { b = false; Thread.Sleep(100); b = true; });
Thread.Sleep(15);
obDis.Dispose();
d = b;
Thread.Sleep(101);
Console.WriteLine("OnDispose: {1,5} After: {2,5} Sleep: {0,5}", b, c, d);
}
}
}
Фактически 'Наконец' действительно работает на ошибках, я думаю, что ошибка была исправлена с 2012 года. В любом случае, сброс вашего ответа выглядит многообещающим, у меня было ощущение, что« Наблюдаемое. Использование »должно соответствовать, но я не мог понять как еще. 'client' действительно является' TcpClient', поэтому я собираюсь дать это. – Dorus
yes 'Using' - это путь. – Brandon
Btw, ваш последний пример не компилируется. Во второй строке есть ошибка скобки, и что-то не так с 'Observable.Create', которое я не могу исправить. – Dorus