3

Я экспериментирую с реактивными расширениями на разных платформах, и одна вещь, которая меня раздражает, - это глюки.Обходные пути для сбоев RX?

Несмотря на то, что для кода пользовательского интерфейса эти сбои могут быть not be that problematic, и обычно можно найти оператора, который работает вокруг них, я все еще считаю, что отладочный код сложнее при наличии сбоев: промежуточные результаты не важны для отладки, но мой ум не знает, когда результат является промежуточным или «окончательным».

Проработав немного с использованием функциональных FRP в Haskell и синхронных системах передачи данных, он также «чувствует» неправильно, но это, конечно, субъективно.

Но при подключении RX к приводам, отличным от UI (например, двигателям или переключателям), я думаю, что глюки более проблематичны. Как убедиться, что на внешние исполнительные механизмы отправлено только правильное значение?

Возможно, это может быть решено каким-то «диспетчером», который знает, когда какой-то «внешний датчик» инициировал инициирующее событие, чтобы все внутренние события обрабатывались до перенаправления конечного результата (результатов) на исполнительные механизмы. Что-то вроде описанного в flapjax paper.

вопрос (ы) Я надеюсь получить ответы на следующие:

  1. Есть ли что-то в RX, что делает фиксирующих глюки для синхронных уведомлений невозможно?
  2. Если нет, то для RX существует (желательно качество продукции) библиотека или подход, который фиксирует синхронные сбои? Особенно для однопоточного Javascript это может иметь смысл?
  3. Если общее решение не существует, как RX будет использоваться для управления внешними датчиками/исполнительными механизмами без сбоев на приводах?

Позвольте мне привести пример

Предположим, я хочу, чтобы напечатать последовательность кортежей (a,b) где контракт

a=n b=10 * floor(n/10) 

п натуральное число поток = 0,1,2 .. ..

Поэтому я ожидаю следующую последовательность

(a=0, b=0) 
(a=1, b=0) 
(a=2, b=0) 
... 
(a=9, b=0) 
(a=10, b=10) 
(a=11, b=10) 
... 

В RX, чтобы сделать более интересным, я буду использовать фильтр для вычисления потока б

var n = Observable 
     .Interval(TimeSpan.FromSeconds(1)) 
     .Publish() 
     .RefCount(); 

var a = n.Select(t => "a=" + t); 

var b = n.Where(t => t % 10 == 0) 
     .Select(t => "b=" + t); 

var ab = a.CombineLatest(b, Tuple.Create); 

ab.Subscribe(Console.WriteLine); 

Это дает то, что я верил, что глюк (временное нарушение инварианта/контракта):

(a=0, b=0) 
(a=1, b=0) 
(a=2, b=0) 
... 
(a=10, b=0) <-- glitch? 
(a=10, b=10) 
(a=11, b=10) 

Я понимаю, что это правильное поведение CombineLatest, но я также думал, что это называется глюком, потому что в реальной чистой системе FRP вы не получаете эти промежуточные-инвариантные результаты.

Обратите внимание, что в этом примере я не смог бы использовать Zip, а также WithLatestFrom дал бы неправильный результат.

Конечно, я мог бы просто упростить этот пример в одно монадическое вычисление, никогда не добавляя несколько потоков в поток (это будет означать, что вы не можете фильтровать, а просто карту), но это не главное: IMO в RX всегда получить «глюк», когда вы расколоть и воссоединиться наблюдаемый поток:

s 
/\ 
    a b 
    \/
    t 

Например, в Flapjax вы не получите эти проблемы.

Есть ли в этом смысл?

спасибо, Peter

+1

Нет глюка. Пример, связанный с отображением фактического вывода 'a0 ----- (b0b1) (c1c4) (d4d9) (e9e16)' в сравнении с желаемым выходом 'a0 ----- b1 ---- c4 ---- d9 ---- e16 --- 'неправильно. Фактический вывод правильный для этого запроса. Это не сбой - это правильное функционирование этого запроса. Вам действительно нужно создать некоторые [mcve], показывая фактические сбои, чтобы мы могли ответить вам. – Enigmativity

+0

Mea culpa, у меня создалось впечатление, что CombineLatest * всегда * вызывает сбои, как только любой из его входов поступает из одного источника. – Ziriax

+1

Совсем нет. Вы должны ожидать, что 'CombineLatest' будет работать детерминированным способом. Невозможно справиться с «мгновенными» изменениями стоимости. Источники должны генерировать значения последовательно - один абонент должен получить значение сначала, а второй - так, если два значения ** теоретически ** означают изменение в одном и том же экземпляре, они ** не делают ** это в вычислительной реальности , Это не может быть иначе. Это особенно верно, поскольку каждый наблюдаемый источник и оператор - черные ящики. Невозможно заглянуть внутрь и посмотреть, что он должен «теоретически» делать. – Enigmativity

ответ

1

Update: Позвольте мне ответить на мой собственный вопрос в контексте RX.

Прежде всего, кажется, что я понимаю, что такое «глюк», был неправильным. С точки зрения чистого FRP, то, что выглядело как глюки в RX для меня, кажется, действительно правильное поведение в RX.

Таким образом, я предполагаю, что в RX нам нужно четко указать «время», в котором мы ожидаем задействовать значения, объединенные с датчиками.

В моем собственном примере исполнителем является консоль, а датчик - интервал n.

Так что, если я изменить мой код

ab.Subscribe(Console.WriteLine); 

в

ab.Sample(n).Subscribe(Console.WriteLine); 

тогда только "правильные" значения печатаются.

Это означает, что, когда мы получаем наблюдаемую последовательность, которая объединяет значения от датчиков, мы должны знать все исходные датчики, объединить их все и пробовать значения с этим объединенным сигналом перед отправкой любых значений на приводы ...

альтернативный подход был бы «поднять» IObservable в структуру «учуяли», которая помнит и объединяет происходящие датчики, например, как это:

public struct Sensed<T> 
{ 
    public IObservable<T> Values; 
    public IObservable<Unit> Sensors; 

    public Sensed(IObservable<T> values, IObservable<Unit> sensors) 
    { 
     Values = values; 
     Sensors = sensors; 
    } 

    public IObservable<Unit> MergeSensors(IObservable<Unit> sensors) 
    { 
     return sensors == Sensors ? Sensors : Sensors.Merge(sensors); 
    } 

    public IObservable<T> MergeValues(IObservable<T> values) 
    { 
     return values == Values ? Values : Values.Merge(values); 
    } 
} 

и тогда мы должны передать весь метод RX в эта «чувствительная» структура:

public static class Sensed 
{ 
    public static Sensed<T> Sensor<T>(this IObservable<T> source) 
    { 
     var hotSource = source.Publish().RefCount(); 
     return new Sensed<T>(hotSource, hotSource.Select(_ => Unit.Default)); 
    } 

    public static Sensed<long> Interval(TimeSpan period) 
    { 
     return Observable.Interval(period).Sensor(); 
    } 

    public static Sensed<TOut> Lift<TIn, TOut>(this Sensed<TIn> source, Func<IObservable<TIn>, IObservable<TOut>> lifter) 
    { 
     return new Sensed<TOut>(lifter(source.Values), source.Sensors); 
    } 

    public static Sensed<TOut> Select<TIn, TOut>(this Sensed<TIn> source, Func<TIn, TOut> func) 
    { 
     return source.Lift(values => values.Select(func)); 
    } 

    public static Sensed<T> Where<T>(this Sensed<T> source, Func<T, bool> func) 
    { 
     return source.Lift(values => values.Where(func)); 
    } 

    public static Sensed<T> Merge<T>(this Sensed<T> source1, Sensed<T> source2) 
    { 
     return new Sensed<T>(source1.MergeValues(source2.Values), source1.MergeSensors(source2.Sensors)); 
    } 

    public static Sensed<TOut> CombineLatest<TIn1, TIn2, TOut>(this Sensed<TIn1> source1, Sensed<TIn2> source2, Func<TIn1, TIn2, TOut> func) 
    { 
     return new Sensed<TOut>(source1.Values.CombineLatest(source2.Values, func), source1.MergeSensors(source2.Sensors)); 
    } 

    public static IDisposable Actuate<T>(this Sensed<T> source, Action<T> next) 
    { 
     return source.Values.Sample(source.Sensors).Subscribe(next); 
    } 
} 

Мой пример становится:

var n = Sensed.Interval(TimeSpan.FromMilliseconds(100)); 
var a = n.Select(t => "a=" + t); 
var b = n.Where(t => t % 10 == 0).Select(t => "b=" + t); 
var ab = a.CombineLatest(b, Tuple.Create); 
ab.Actuate(Console.WriteLine); 

И опять только «желательные» значения передаются на привод, но с этой конструкцией, инициировавший датчики помнить в структуре опознаются.

Я не уверен, что любое из этого означает «смысл» (каламбур), может быть, я должен просто отпустить мое желание получить чистую FRP и жить с ним. В конце концов, время относительное ;-)

Peter

Смежные вопросы