2017-02-14 5 views
1

У меня есть метод, который выполняет некоторую работу асинхронно с использованием наблюдаемого. Я хотел бы знать, что является лучшим способом сделать этот метод async, так что я смогу ждать и выполнять некоторую работу после наблюдаемых завершений.Ожидайте, когда вы увидите полный

Моя первая попытка была использована Ожидать на наблюдаемом.

public async Task DoWorkAsync() 
{ 
    var observable = Observable.Create<int>(o => 
    { 
     Task.Run(() => 
     { 
      Thread.Sleep(1000); 
      Console.WriteLine("OnNext"); 
      o.OnNext(1); 
      o.OnError(new Exception("exception in observable logic")); 
      //o.OnCompleted(); 
     });  
     return Disposable.Empty; 
    }); 

    //observable = observable.Publish().RefCount(); 

    observable.Subscribe(i => Console.WriteLine(i)); 
    Console.WriteLine("Awaiting..."); 
    await observable; 
    Console.WriteLine("After Awaiting..."); 
} 

В зависимости от сценария, я имел различные проблемы с этим подходом (+/- означает, что эта часть кода раскомментирована/комментировал):

  1. + OnNext + OnCompleted -OnError -RefCount: OnNext был вызван 2 раза (наблюдаемый был подписан 2 раза). Этого я бы хотел избежать.

  2. + OnNext + OnCompleted -OnError + RefCount: Когда я использую метод RefCount(), код работает.

  3. -OnNext + OnCompleted -OnError + RefCount: "Последовательность не содержит элемент" не будет сгенерировано исключение, когда мой наблюдаемыми не вызывает OnNext.

  4. + OnNext -OnCompleted + OnError -RefCount: OnNext был вызван 2 раза. Исключение.

  5. + OnNext -OnCompleted + OnError + RefCount: зависает после отображения 1 (возможно, потому, что он хочет вернуться в ожидаемый поток). Мы можем заставить его работать (и поднять исключение) с помощью SubscribeOn (ThreadPoolScheduler.Instance)

Во всяком случае, в случае, когда наблюдаемый пуст (не OnNext не rised) мы получаем исключение, даже если OnError не называется и мы не имеем никакого исключения в наблюдаемой логике. Вот почему ожидание наблюдаемого не является хорошим решением.

Вот почему я попробовал другое решение, используя TaskCompletionSource

public async Task DoWorkAsync() 
{ 
    var observable = Observable.Create<int>(o => 
    { 
     Task.Run(() => 
     { 
      Thread.Sleep(1000); 
      Console.WriteLine("OnNext"); 
      o.OnNext(1); 
      o.OnError(new Exception("exception in observable logic")); 
      //o.OnCompleted(); 
     }); 
     return Disposable.Empty; 
    }); 

    var tcs = new TaskCompletionSource<bool>(); 

    observable.Subscribe(i => Console.WriteLine(i), 
    e => 
    { 
     //tcs.TrySetException(e); 
     tcs.SetResult(false); 
    }, 
    () => tcs.TrySetResult(true)); 

    Console.WriteLine("Awaiting..."); 
    await tcs.Task; 
    Console.WriteLine("After Awaiting..."); 
} 

Это хорошо работает во всех сценариях и в случае OnError вызывается мы могли бы использовать либо tcs.SetResult (ложь) и не имеют информацию о деталях исключений во внешнем методе или мы могли бы использовать tcs.TrySetException (e) и иметь возможность поймать исключение во внешнем методе.

Можете ли вы предложить мне, если есть какое-то лучшее/чистое решение, или мое второе решение - это путь?

EDIT

Так что я хотел бы знать, если есть лучшее решение, чем мое второе решение, которое будет:

  • не требуется использовать .Publish().RefCount()
  • не требует дополнительной подписки (то, что происходит в ожидании наблюдаемых под капотом - OnNext вызывается 2 раза)
  • Конечно, я мог бы обернуть мое решение в какой-то метод расширения асинхронной для подписок, который возвращает Task
+0

Если когда-нибудь вы окажетесь делать 'вернуть Disposable.Empty' внутри' Observable «Создай», тогда ты делаешь что-то не так. – Enigmativity

+0

@ Энигматичность Это всего лишь пример, и я думаю, что это не относится к моей проблеме. Но в любом случае, что я должен вернуть, если мне не нужно ничего распоряжаться? – astanula

+0

По крайней мере, создайте 'Subject ' в 'Observable.Create ', а затем выполните '{var subject = new Subject (); var subscription = subject.Subscribe (o);/* ваш код */return подписка; '. – Enigmativity

ответ

1

EDIT:

Если удалить подписку вы можете сделать следующее:

await observable.Do(i => Console.WriteLine(i)).LastOrDefaultAsync(); 

Что касается ваших произвольных требований ... Не имеет нескольких подписей для холодного наблюдения, имеет смысл; поэтому вы публикуете его. Отказ от использования .Publish().Refcount() не имеет смысла. Я не понимаю, почему вы отказываетесь от решения, которое решает вашу проблему.


Там очень много, но я предполагаю, что это ваш главный вопрос:

Во всяком случае, в случае, когда наблюдаемый пуст (не OnNext не rised) мы получаем исключение, даже если OnError не и у нас нет исключения внутри наблюдаемой логики. Вот почему ожидание на наблюдаемом не очень хорошее решение.

await observable такое же, как await observable.LastAsync(). Поэтому, если нет элемента, вы получаете исключение. Представьте, что вы изменили этот оператор на int result = await observable;. Какое значение должно иметь значение result, если нет элементов?

Если вы меняете await observable; на await observable.LastOrDefaultAsync();, все должно работать плавно.

И да, вы должны использовать .Publish().Refcount()

+0

Да, вы правы, это может сработать, но я хотел бы получить какое-то чистое решение, чтобы у меня не было дополнительной подписки, например, в ожидании наблюдаемого. В случае ожидания наблюдаемого. LastOrDefaultAsync() У меня, вероятно, будет еще 2 дополнительных подписки. Также мне хотелось бы избежать .Publish(). Refcount() Вот почему я выбрал свое второе решение как наиболее подходящее, и я блуждаю по нему, что-то лучше? – astanula

+0

Отредактировано для новых требований. – Shlomo

0

я явно предпочитаю 2-ое решение, потому что только выписывает один раз.

Но из любопытства: какова цель написания такого метода? Если это позволить для конфигурируемых побочных эффектов, это было бы эквивалентно:

public async Task DoWorkAsync() 
{ 
    Action<int> onNext = Console.WriteLine; 

    await Task.Delay(1000); 
    onNext(1); 
    throw new Exception("exception in DoWork logic"); // ... or don't 
} 
+0

Мой код - всего лишь пример, иллюстрирующий проблему. Дело в том, что я мог бы вызвать какой-то асинхронный код в моей наблюдаемой подписке, и он может возвращать некоторые или никакие элементы, а также завершить или ошибочно. – astanula

0

Вы можете использовать ToTask метод расширения:

await observable.ToTask(); 
+0

Это не удается, если я не вызываю OnNext. Также он создает отдельную подписку. – astanula