У меня есть метод, который выполняет некоторую работу асинхронно с использованием наблюдаемого. Я хотел бы знать, что является лучшим способом сделать этот метод async, так что я смогу ждать и выполнять некоторую работу после наблюдаемых завершений.Ожидайте, когда вы увидите полный
Моя первая попытка была использована Ожидать на наблюдаемом.
public async Task DoWorkAsync()
{
var observable = Observable.Create<int>(o =>
{
Task.Run(() =>
{
Thread.Sleep(1000);
Console.WriteLine("OnNext");
o.OnNext(1);
o.OnError(new Exception("exception in observable logic"));
//o.OnCompleted();
});
return Disposable.Empty;
});
//observable = observable.Publish().RefCount();
observable.Subscribe(i => Console.WriteLine(i));
Console.WriteLine("Awaiting...");
await observable;
Console.WriteLine("After Awaiting...");
}
В зависимости от сценария, я имел различные проблемы с этим подходом (+/- означает, что эта часть кода раскомментирована/комментировал):
+ OnNext + OnCompleted -OnError -RefCount: OnNext был вызван 2 раза (наблюдаемый был подписан 2 раза). Этого я бы хотел избежать.
+ OnNext + OnCompleted -OnError + RefCount: Когда я использую метод RefCount(), код работает.
-OnNext + OnCompleted -OnError + RefCount: "Последовательность не содержит элемент" не будет сгенерировано исключение, когда мой наблюдаемыми не вызывает OnNext.
+ OnNext -OnCompleted + OnError -RefCount: OnNext был вызван 2 раза. Исключение.
+ OnNext -OnCompleted + OnError + RefCount: зависает после отображения 1 (возможно, потому, что он хочет вернуться в ожидаемый поток). Мы можем заставить его работать (и поднять исключение) с помощью SubscribeOn (ThreadPoolScheduler.Instance)
Во всяком случае, в случае, когда наблюдаемый пуст (не OnNext не rised) мы получаем исключение, даже если OnError не называется и мы не имеем никакого исключения в наблюдаемой логике. Вот почему ожидание наблюдаемого не является хорошим решением.
Вот почему я попробовал другое решение, используя TaskCompletionSource
public async Task DoWorkAsync()
{
var observable = Observable.Create<int>(o =>
{
Task.Run(() =>
{
Thread.Sleep(1000);
Console.WriteLine("OnNext");
o.OnNext(1);
o.OnError(new Exception("exception in observable logic"));
//o.OnCompleted();
});
return Disposable.Empty;
});
var tcs = new TaskCompletionSource<bool>();
observable.Subscribe(i => Console.WriteLine(i),
e =>
{
//tcs.TrySetException(e);
tcs.SetResult(false);
},
() => tcs.TrySetResult(true));
Console.WriteLine("Awaiting...");
await tcs.Task;
Console.WriteLine("After Awaiting...");
}
Это хорошо работает во всех сценариях и в случае OnError вызывается мы могли бы использовать либо tcs.SetResult (ложь) и не имеют информацию о деталях исключений во внешнем методе или мы могли бы использовать tcs.TrySetException (e) и иметь возможность поймать исключение во внешнем методе.
Можете ли вы предложить мне, если есть какое-то лучшее/чистое решение, или мое второе решение - это путь?
EDIT
Так что я хотел бы знать, если есть лучшее решение, чем мое второе решение, которое будет:
- не требуется использовать .Publish().RefCount()
- не требует дополнительной подписки (то, что происходит в ожидании наблюдаемых под капотом - OnNext вызывается 2 раза)
- Конечно, я мог бы обернуть мое решение в какой-то метод расширения асинхронной для подписок, который возвращает Task
Если когда-нибудь вы окажетесь делать 'вернуть Disposable.Empty' внутри' Observable «Создай», тогда ты делаешь что-то не так. – Enigmativity
@ Энигматичность Это всего лишь пример, и я думаю, что это не относится к моей проблеме. Но в любом случае, что я должен вернуть, если мне не нужно ничего распоряжаться? – astanula
По крайней мере, создайте 'Subject' в 'Observable.Create ', а затем выполните '{var subject = new Subject (); var subscription = subject.Subscribe (o);/* ваш код */return подписка; '. –
Enigmativity