2016-11-22 1 views
4

я сделал метод расширения:Архивирование с бесконечной последовательностью, которая является истинным, то всегда ложно

public static IObservable<T> RateLimit<T>(this IObservable<T> source, 
              TimeSpan minDelay) 
{ 
    return 
     source.TimeInterval() 
      .Select(
       (x, i) => 
        Observable.Return(x.Value) 
         .Delay(i == 0 
          ? TimeSpan.Zero 
          : TimeSpan.FromTicks(
            Math.Max(minDelay.Ticks - x.Interval.Ticks, 0)))) 
      .Concat(); 
} 

это создает новый наблюдаемыми, что позволяет только элементы через с минимальным отрывом во времени.

Чтобы удалить начальную задержку, необходимо обработать первый элемент по-разному.

Как видно, есть тест, чтобы узнать, имеем ли мы дело с первым предметом путем тестирования i == 0. Проблема здесь в том, что если мы обработаем более int.MaxValue элементов, это не удастся.

Вместо этого, я думал о следующей последовательности

var trueThenFalse = Observable.Return(true) 
        .Concat(Observable.Repeat(false)) 

и сжать его вместе с моим источником:

source.TimeInterval().Zip(trueThenFalse, ... 

, но при прохождении этой бесконечной последовательности в Zip, мы оказываемся ввести жесткий цикл где trueThenFalse испускает все предметы за один раз (до бесконечности). Потерпеть неудачу.

Я мог бы легко кодировать вокруг этого с побочными эффектами (например, bool), но это будет представлять потерю чистоты, которой я не был бы доволен.

Любые предложения?

EDIT

Хотя не совсем то же самое поведение, следующий код демонстрирует некоторые неприятные черты

var trueThenFalse = Observable.Return(true) 
    .Concat(Observable.Repeat(false)); 
var src = Observable.Interval(TimeSpan.FromSeconds(1)); //never completes 
src.Zip(trueThenFalse,(i,tf)=>tf).ForEach(x=>Trace.TraceInformation("{0}",x)); 

и в конце концов умирает с OOME. Это связано с тем, что trueThenFalse, как представляется, отключает все свои значения, но они не потребляются Zip своевременно.

+0

Кажется, работает для меня с trueThenFalse и Zip. – Evk

+0

Итак, возможно, это связано с тем, как запланированы продолжения? Учитывая, что 'IObservable' только действительно предоставляет метод' Subscribe', мне трудно понять, как бесконечная (холодная) последовательность не будет пытаться не использовать все свои элементы сразу после подписки на эту «push-модель». – spender

+0

Но zip захватывает следующий элемент из двух последовательностей. Сначала он захватывает «истину» из первой последовательности и первое значение из второго (без задержки). Затем он захватывает «ложное» и второе значение из второй последовательности, но здесь есть задержка. Не уверен, почему он должен немедленно свернуть все предметы. Может быть, вы можете отправить код с trueThenFalse и Zip, который, как вы сказали, не удается? – Evk

ответ

3

Таким образом, получается, что Zip имеет another overload, который может объединять последовательность IObservable с последовательностью IEnumerable.

Объединив семантику ввода IObservable с семантикой вытягивания IEnumerable, можно получить мой тестовый пример.

Таким образом, с помощью следующего метода:

private IEnumerable<T> Inf<T>(T item) 
{ 
    for (;;) 
    { 
     yield return item; 
    } 
} 

мы можем сделать IEnumerable:

var trueThenFalse = Enumerable.Repeat(true, 1).Concat(Inf(false)); 

, а затем Zip его с источником наблюдаемого:

var src = Observable.Interval(TimeSpan.FromSeconds(1)); 
src.Zip(trueThenFalse, (i, tf) => tf).ForEach(x => Trace.TraceInformation("{0}", x)); 

... и все работает так, как ожидалось.

я теперь следующая реализация моего метода RateLimiter:

public static IObservable<T> RateLimit<T>(this IObservable<T> source, 
              TimeSpan minDelay) 
{ 
    var trueThenFalse = Enumerable.Repeat(true, 1).Concat(Inf(false)); 
    return 
     source.TimeInterval() 
      .Zip(trueThenFalse, (item, firstTime) => Observable.Return(item.Value) 
       .Delay(firstTime 
        ? TimeSpan.Zero 
        : TimeSpan.FromTicks(
         Math.Max(minDelay.Ticks - item.Interval.Ticks, 0)))) 

      .Concat(); 
} 
Смежные вопросы