2

Использование Rx в C# Я пытаюсь создать запрос опроса для REST API. Проблема, с которой я столкнулся, заключается в том, что Observable необходимо отправить ответы в порядке. Средства Если запрос A отправился в X-время и запрос B пошел в X + dx время, а ответ B пришел до A. Наблюдаемое выражение должно игнорировать или отменить запрос A.Использование Rx создать запрос на опрос для вызова webservice

Я написал пример кода, который пытается изобразить сценарий , Как я могу исправить это, чтобы получить только последний ответ и отменить или проигнорировать предыдущие ответы.

class Program 
    { 
     static int i = 0; 

     static void Main(string[] args) 
     { 
      GenerateObservableSequence(); 

      Console.ReadLine(); 
     } 

     private static void GenerateObservableSequence() 
     { 
      var timerData = Observable.Timer(TimeSpan.Zero, 
       TimeSpan.FromSeconds(1)); 

      var asyncCall = Observable.FromAsync<int>(() => 
      { 
       TaskCompletionSource<int> t = new TaskCompletionSource<int>(); 
       i++; 

       int k = i; 
       var rndNo = new Random().Next(3, 10); 
       Task.Delay(TimeSpan.FromSeconds(rndNo)).ContinueWith(r => { t.SetResult(k); }); 
       return t.Task; 
      }); 

      var obs = from t in timerData 
      from data in asyncCall 
      select data; 

      var hot = obs.Publish(); 
      hot.Connect(); 

       hot.Subscribe(j => 
      { 
       Console.WriteLine("{0}", j); 
      }); 
     } 
    } 

После @Enigmativity ответа: Добавлена ​​функция опроса Aync всегда принимать последний ответ:

public static IObservable<T> PollingAync<T> (Func<Task<T>> AsyncCall, double TimerDuration) 
     { 
      return Observable 
     .Create<T>(o => 
     { 
      var z = 0L; 
      return 
       Observable 
        .Timer(TimeSpan.Zero, TimeSpan.FromSeconds(TimerDuration)) 
        .SelectMany(nr => 
         Observable.FromAsync<T>(AsyncCall), 
         (nr, obj) => new { nr, obj}) 
        .Do(res => z = Math.Max(z, res.nr)) 
        .Where(res => res.nr >= z) 
        .Select(res => res.obj) 
        .Subscribe(o); 
     }); 

    } 

ответ

2

Начнем с упрощения кода.

Это в основном тот же код:

var rnd = new Random(); 

var i = 0; 

var obs = 
    from n in Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1)) 
    let r = ++i 
    from t in Observable.Timer(TimeSpan.FromSeconds(rnd.Next(3, 10))) 
    select r; 

obs.Subscribe(Console.WriteLine); 

Я получаю такой результат:

 
2 
1 
3 
4 
8 
5 
11 
6 
9 
7 
10 

С другой стороны, это может быть записано как:

var obs = 
    Observable 
     .Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1)) 
     .Select(n => ++i) 
     .SelectMany(n => 
      Observable.Timer(TimeSpan.FromSeconds(rnd.Next(3, 10))), (n, _) => n); 

Итак, теперь для вашего требования:

Если запрос A отправился в X-время и запрос B отправился в X + dx время, а ответ B был до A, выражение Observable должно игнорировать или отменить запрос A.

Вот код:

var rnd = new Random(); 

var i = 0; 
var z = 0L; 

var obs = 
    Observable 
     .Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1)) 
     .Select(n => new { n, r = ++i }) 
     .SelectMany(nr => 
      Observable.Timer(TimeSpan.FromSeconds(rnd.Next(3, 10))), (nr, _) => nr) 
     .Do(nr => z = Math.Max(z, nr.n)) 
     .Where(nr => nr.n >= z) 
     .Select(nr => nr.r); 

Я не люблю использовать .Do подобное, но я не могу думать о альтернативы пока нет.

Это дает такую ​​вещь:

 
1 
5 
8 
9 
10 
11 
14 
15 
16 
17 
22 

Обратите внимание, что значения только по возрастанию.

Теперь вы должны использовать Observable.Create для инкапсуляции состояния, которое вы используете. Таким образом, ваше окончательное наблюдение должно выглядеть так:

var obs = 
    Observable 
     .Create<int>(o => 
     { 
      var rnd = new Random(); 
      var i = 0; 
      var z = 0L; 
      return 
       Observable 
        .Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1)) 
        .Select(n => new { n, r = ++i }) 
        .SelectMany(nr => 
         Observable.Timer(TimeSpan.FromSeconds(rnd.Next(3, 10))), 
         (nr, _) => nr) 
        .Do(nr => z = Math.Max(z, nr.n)) 
        .Where(nr => nr.n >= z) 
        .Select(nr => nr.r) 
        .Subscribe(o); 
     }); 
+0

Как избежать исключений из старых ответов, которые были отфильтрованы с использованием предложения where ? –

+0

@BalrajSingh - Итак, если бы я заменил '++ i' операцией, которая иногда выдавала ошибку? Это возможный сценарий? Какая ошибка может произойти? – Enigmativity

+0

Я имею в виду вместо Observable.Timer (TimeSpan.FromSeconds (rnd.Next (3, 10)) Я заменил бы это асинхронным вызовом на веб-службу, и это может вызвать любой тип ошибки HTTP. В этом случае мне нужно отменить все старые запросы, которые я делаю так, чтобы он не мог произойти с какой-либо ошибкой, и, если это произойдет, я должен иметь способ справиться с этим, не заканчивая поток Observable. –

5

Это обычный сценарий, и может быть исправлено просто.

Ключевая часть вашего образца кода в вопросе

var obs = from t in timerData 
      from data in asyncCall 
      select data; 

Это может быть прочитано как «для каждого значения в timerData получить все значения в asyncCall». Это оператор SelectMany (или FlatMap). Оператор SelectMany принимает все значения из внутренней последовательности (asyncCall) и возвращает их значения по мере их поступления. Это означает, что вы можете выйти из порядка.

Вы хотите отменить предыдущую внутреннюю последовательность, когда внешняя последовательность (timerData) создает новое значение. Для этого мы хотим использовать вместо этого оператор Switch.

var obs = timerData.Select(_=>asyncCall) 
        .Switch(); 

Полный код может быть очищен до следующего. (удалено избыточное издание Опубликовать/Подключить, распорядиться подпиской на нажатие клавиши)

класс Программа { { { статический int i = 0;

static void Main(string[] args) 
    { 
     using (GenerateObservableSequence().Subscribe(x => Console.WriteLine(x))) 
     { 
      Console.ReadLine(); 
     } 
    } 

    private static IObservable<int> GenerateObservableSequence() 
    { 
     var timerData = Observable.Timer(TimeSpan.Zero, 
      TimeSpan.FromSeconds(1)); 

     var asyncCall = Observable.FromAsync<int>(() => 
     { 
      TaskCompletionSource<int> t = new TaskCompletionSource<int>(); 
      i++; 

      int k = i; 
      var rndNo = new Random().Next(3, 10); 
      Task.Delay(TimeSpan.FromSeconds(rndNo)).ContinueWith(r => { t.SetResult(k); }); 
      return t.Task; 
     }); 

     return from t in timerData 
       from data in asyncCall 
       select data; 
    } 
} 

--EDIT--

Похоже, я неправильно понял вопрос. И @Enigmativity предоставил более точный ответ. Это его ответ.

//Probably should be a field? 
var rnd = new Random(); 
var obs = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1)) 
     //.Select(n => new { n, r = ++i }) 
     //No need for the `i` counter. Rx does this for us with this overload of `Select` 
     .Select((val, idx) => new { Value = val, Index = idx}) 
     .SelectMany(nr => 
      Observable.Timer(TimeSpan.FromSeconds(rnd.Next(3, 10))), 
      (nr, _) => nr) 
     //.Do(nr => z = Math.Max(z, nr.n)) 
     //.Where(nr => nr.n >= z) 
     //Replace external State and Do with scan and Distinct 
     .Scan(new { Value = 0L, Index = -1 }, (prev, cur) => { 
      return cur.Index > prev.Index 
       ? cur 
       : prev; 
     }) 
     .DistinctUntilChanged() 
     .Select(nr => nr.Value) 
     .Dump(); 
+1

Разве это не всегда зависит от последнего запроса? Мое понимание вопроса состоит в том, что если происходит два вызова 'asyncCall' и результаты возвращаются в порядке вызова, тогда все два результата будут действительными, но если второй вызов вернется первым, то проигнорируйте результат первого вызова. – Enigmativity

+0

Основываясь на вашем понимании вопроса, ваше предположение также верно. –

+0

@LeeCampbell любое предложение о том, как обработать исключения предыдущих вызовов, которые мы проигнорировали. Вместо Observable.Timer (TimeSpan.FromSeconds (rnd.Next (3, 10)) Я вызову функцию async, как показано в обновленном вопросе. Эта функция async может вызвать исключение. Поэтому я хочу отменить предыдущий асинхронный вызов и обрабатывать исключение, не заканчивая поток IObservable. –

Смежные вопросы