2016-12-03 2 views
1

Существует код:Задержка при условии

someObservable.Select(x => getY(x));  
Y getY(X x) 
{ 
    if (x.Value == X.ABC) 
    return new Y(1); 
    else 
    return new Y(2); 
} 

На каком-то состоянии, мне нужно перепроверить x.Value после некоторого периода времени. Самое простое и плохое решение использовать Thread.Sleep:

Y getY(X x) 
{ 
    if (x.Value == X.ABC) 
    return new Y(1); 
    else 
    if (x.SomethingElse == true) 
    { 
    Thread.Sleep(timeout); 
    if (x.Value == X.ABC) 
     return new Y(1); 
    else 
     return new Y(2); 
    } 
} 

Что такое правильный код здесь? Мне нужно, чтобы события были заказаны так же, как и я. Это означает, что если у меня есть задержка, и я получаю новое значение, которое он должен ждать, чтобы обработать.

ответ

0

Если это общая проблема, я предлагаю вам реализовать пользовательский оператор. Метод задержки имеет перегрузку, которая принимает другую наблюдаемую для контроля времени задержки. В этом примере четные числа сразу же отжимаются, когда нечетные числа задерживаются;

Observable.Interval(TimeSpan.FromSeconds(1)) 
      .Delay(i => (i % 2 == 0) ? Observable.Return(0L) : Observable.Timer(TimeSpan.FromSeconds(0.9))) 
      .Subscribe(Console.WriteLine); 

EDIT: Вот проще, сохраняющего оператор задержки:

static IObservable<T> DelayOrdered<T>(this IObservable<T> observable, Func<T, TimeSpan> delaySelector, IScheduler scheduler = default(IScheduler)) 
     { 
      scheduler = scheduler ?? DefaultScheduler.Instance; 
      return Observable.Create<T>(observer => 
      { 
       var now = scheduler.Now; 

       return observable 
         .Subscribe(value => 
       { 
        now = now.Add(delaySelector(value)); 
        scheduler.Schedule(now,() => observer.OnNext(value)); 
       }); 
      }); 
     } 

Использование:

Observable.Interval(TimeSpan.FromSeconds(0.2)) 
         .DelayOrdered(i => (i % 2 == 0) ? TimeSpan.Zero : TimeSpan.FromSeconds(1)) 
         .Subscribe(Console.WriteLine); 

Заказ будет сохранена, так как абсолютное плановое время может только увеличиться.

+0

Спасибо. Единственная проблема, которая мне нужна для сохранения порядка. Я получил некоторое предложение для решения ниже. Что ты думаешь об этом ? –

+0

@NN_ да, ваше предлагаемое решение должно сохранить порядок. Я думаю, вы могли бы сделать его более эффективным. – Asti

+0

@NN_ См. Отредактированное решение. – Asti

2

Решение (от https://rsdn.org/forum/dotnet/6629370.1) должно возвращать IObservable, а не Y в getY и делать Observable.Delay с Concat.

IObservable<Y> getY(X x) 
{ 
    if (x.Value == X.ABC) 
    return Observable.Return(new Y(1)); 
    else 
    if (x.SomethingElse == true) 
    { 
    return Observable.Delay(Observable.Return(x), timeout).Select(xx => xx.Value == X.ABC ? new Y(1) : new Y(2)); 
    } 
} 

или

IObservable<Y> getY(X x) 
{ 
    return Observable.Create<Y>(async (obs, token) => 
    { 
    if (x.Value == X.ABC) 
     obs.OnNext(new Y(1)); 
    else 
    if (x.SomethingElse == true) 
    { 
     await Task.Delay(timeout, token); 
     if (x.Value == X.ABC) 
      obs.OnNext(new Y(1)); 
     else 
      obs.OnNext(new Y(2)); 
    } 
    } 
} 

И тогда:. SomeObservable.Select (х => Гети (х)) Concat();

+0

Это будет работать. Я думал использовать «SelectMany», что эквивалентно 'Select', за которым следует' Merge', который не сохранит порядок. Однако это решение будет правильно сохранять порядок. – Shlomo

+0

Вы все же создаете новые наблюдаемые для каждого значения. – Asti

+0

Да, это также позволило мне сделать короткие задержки, а не один длинный путь Rx: Observable.Timer (0, 100ms) .Take (пытается). Выбрать (x => ..). Где (x! = Null) .Снять (1); –

Смежные вопросы