2010-09-08 4 views
6

Это может быть глупый вопрос, как я немного новичок в RX :)Изменить интервал RX-операторов?

Я выборки событие (RX для .Net 4.0):

eventAsObservable.Sample (TimeSpan.FromSeconds (1)). Timestamp(). Подписаться (x => Console.WriteLine ("testing:" + x.Value.EventArgs.str));

Проблема в том, что время выборки должно быть изменено «на лету», я предполагаю, что могу создать некоторое свойство, которое удалит существующий обработчик и создаст новый, когда он изменится, но он кажется немного грязным и более уязвимых для сроков. Есть ли способ просто изменить интервал?

Пример: Допустим, кто-то пишет строку символов, когда определенная последовательность детектируемых вы хотите изменить время выборки, не пропуская события, и, предпочтительно, не получая событие более чем один раз

+0

Каков ваш сценарий? –

+0

Это автозаполнение, но интервал выборки различен в зависимости от источника данных (поскольку локальный поиск быстрее, чем веб-службы, например) – Homde

ответ

7

I не знаю способа изменения существующего интервала выборки, но то, что вы может сделать образец на самой высокой частоте, в которой вы нуждаетесь, а затем фильтровать с помощью предложения Where, в котором используется переменная может изменить.

Например:

static IObservable<T> SampleEvery<T>(this IObservable<T> source, 
    Func<int> multipleProvider) 
{ 
    int counter = 0; 
    Func<T, bool> predicate = ignored => { 
     counter++; 
     if (counter >= multipleProvider()) 
     { 
      counter = 0; 
     } 
     return counter == 0; 
    }; 
    return source.Where(predicate); 
} 

Вы бы тогда называть это так:

// Keep this somewhere you can change it 
int multiple = 1; 

eventAsObservable.Sample(TimeSpan.FromSeconds(1)) 
       .SampleEvery(() => multiple) 
       .Timestamp() 
       .Subscribe(x => Console.WriteLine("testing:" + 
                x.Value.EventArgs.str)); 

Теперь, изменяя значение multiple будет изменять эффективную частоту дискретизации.

Это довольно уродливый хак, но я думаю, что он должен работать.

+0

Вы не указали() на «if (counter> = multipleProvider)» –

+0

@Paul: Да, oops. Закрепление. –

+0

Похоже на жизнеспособное решение, я дам ему несколько тестов, спасибо! Интересно, какое хорошее решение было бы для таких сценариев, как это ... возможно, возможность отправить метод/лямбда, который возвращает временной интервал вместо фактического времени. Не кажется, что надуманный, что вы хотите изменить параметры для разных операторов на лету – Homde

0

Почему вы просто не подписываетесь дважды?

Observable.Merge(
    eventAsObservable.Sample(TimeSpan.FromSeconds(1)).Timestamp().SelectMany(x => doLocalLookup(x)), 
    eventAsObservable.Sample(TimeSpan.FromSeconds(10)).Timestamp().SelectMany(x => doRemoteLookup(x)), 
).Subscribe(Console.WriteLine); 

Или если поиск активен только на основе какого-либо префикса или классификатора, такого как Google? Оператор:

Observable.Merge(
    eventAsObservable.Sample(TimeSpan.FromSeconds(1)).Where(x => isLocal(x)).SelectMany(x => doLocalLookup(x)), 
    eventAsObservable.Sample(TimeSpan.FromSeconds(10)).Where(x => isARemoteQuery(x).SelectMany(x => doRemoteLookup(x)), 
).Subscribe(Console.WriteLine); 
+0

Интервал может быть любым значением, его можно настроить на основе источника , Лучшее решение sofar - это создание новой подписки, а затем утилизация старой, есть небольшая вероятность того, что событие срабатывает дважды. Было бы замечательно, если бы был способ получить доступ к фактическому свойству, хотя – Homde

+0

Я хочу сказать, что вы можете просто сохранить их все, но переключить их вывод с помощью предложения Where. Возможно, я не совсем понимаю, что вы делаете ... –

5

Я знаю, что этот вопрос уже был дан ответ, но я думал, что я хотел бы добавить еще несколько способов ее решения в пути Rx.

Вы можете использовать Switch на последовательности TimeSpan-х:

private Subject<TimeSpan> sampleFrequencies = new Subject<TimeSpan>(); 

sampleFrequencies 
    .Select(x => eventAsObservable.Sample(Observable.Interval(x)).Timestamp()) 
    .Switch() 
    .Subscribe(x => .WriteLine("testing:" + x.Value.EventArgs.str)); 

// To change: 
// sampleFrequencies.OnNext(TimeSpan.FromSeconds(5)); 

Кроме того, она также может быть решена с помощью Defer, TakeUntil и Repeat (это один немного безумнее и включен в качестве мысленного упражнения):

private TimeSpan sampleFrequency = TiemSpan.FromSeconds(2); 
private Subject<Unit> frequencyChanged = new Subject<Unit>(); 

(Observable 
    .Defer(() => eventAsObservable 
     .Sample(Observable.Interval(sampleFrequency) 
    ) 
    .Timestamp() 
    .TakeUntil(frequencyChanged) 
).Repeat() 
.Subscribe(x => .WriteLine("testing:" + x.Value.EventArgs.str)); 

// To change: 
// sampleFrequency = TimeSpan.FromSeconds(5); 
// frequencyChanged.OnNext(new Unit()); 
+0

На самом деле, я просто сделал свойство, которое создало новую подписку, а затем удалило старую. У меня есть небольшой шанс, что я дважды убью мероприятие, но я думаю, что риск введения ошибок/накладных расходов другими методами делает его самым привлекательным решением. Например, я могу изменить Sample на Throttle. Другие решения казались слишком сложными, но я ценю помощь! – Homde

+0

В моем решении по-прежнему используется Sample, поэтому его можно легко заменить на Throttle (материал timestamp был взят непосредственно из ваших требований). Версия Switch делает практически то, что вы делаете сейчас (отмена, перезапуск), но изнутри коммутатора. –

+0

Хорошее решение с коммутатором, спасибо. –

2

TL; ДР: Создать наблюдаемым с использованием ObservableFromIntervalFunctor, как показано ниже:

void Main() 
{ 
    // Pick an initial period, it can be changed later. 
    var intervalPeriod = TimeSpan.FromSeconds(1); 

    // Create an observable using a functor that captures the interval period. 
    var o = ObservableFromIntervalFunctor(() => intervalPeriod); 

    // Log every value so we can visualize the observable. 
    o.Subscribe(Console.WriteLine); 

    // Sleep for a while so you can observe the observable. 
    Thread.Sleep(TimeSpan.FromSeconds(5.0)); 

    // Changing the interval period will takes effect on next tick. 
    intervalPeriod = TimeSpan.FromSeconds(0.3); 

} 

IObservable<long> ObservableFromIntervalFunctor(Func<TimeSpan> intervalPeriodFunctor) 
{ 
    return Observable.Generate(0L, s => true, s => s + 1, s => s, s => intervalPeriodFunctor()); 
} 

Объяснение: Observable.Generate имеет перегрузку, позволяющий указать время, когда следующее значение будет генерироваться с помощью функтора. Пропустив функтор, который захватил переменную времени, вы можете сделать наблюдаемую.изменение интервального периода путем изменения захваченной временной переменной.

LINQPad фрагмент here

+0

Примечание. Если интервал изменяется с более медленного на более быстрый, этот подход должен подождать до тех пор, пока медленный интервал не завершится до того, как изменение интервала вступит в силу. Это контрастирует с ответом Джона Скита, который вступит в силу после минимального времени выборки. – r590

Смежные вопросы