Использование Rx в C# Я пытаюсь создать запрос опроса для REST API. Проблема, с которой я столкнулся, заключается в том, что Observable необходимо отправить ответы в порядке. Средства Если запрос A отправился в X-время и запрос B пошел в X + dx время, а ответ B пришел до A. Наблюдаемое выражение должно игнорировать или отменить запрос A.Использование Rx создать запрос на опрос для вызова webservice
Я написал пример кода, который пытается изобразить сценарий , Как я могу исправить это, чтобы получить только последний ответ и отменить или проигнорировать предыдущие ответы.
class Program
{
static int i = 0;
static void Main(string[] args)
{
GenerateObservableSequence();
Console.ReadLine();
}
private static void GenerateObservableSequence()
{
var timerData = Observable.Timer(TimeSpan.Zero,
TimeSpan.FromSeconds(1));
var asyncCall = Observable.FromAsync<int>(() =>
{
TaskCompletionSource<int> t = new TaskCompletionSource<int>();
i++;
int k = i;
var rndNo = new Random().Next(3, 10);
Task.Delay(TimeSpan.FromSeconds(rndNo)).ContinueWith(r => { t.SetResult(k); });
return t.Task;
});
var obs = from t in timerData
from data in asyncCall
select data;
var hot = obs.Publish();
hot.Connect();
hot.Subscribe(j =>
{
Console.WriteLine("{0}", j);
});
}
}
После @Enigmativity ответа: Добавлена функция опроса Aync всегда принимать последний ответ:
public static IObservable<T> PollingAync<T> (Func<Task<T>> AsyncCall, double TimerDuration)
{
return Observable
.Create<T>(o =>
{
var z = 0L;
return
Observable
.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(TimerDuration))
.SelectMany(nr =>
Observable.FromAsync<T>(AsyncCall),
(nr, obj) => new { nr, obj})
.Do(res => z = Math.Max(z, res.nr))
.Where(res => res.nr >= z)
.Select(res => res.obj)
.Subscribe(o);
});
}
Как избежать исключений из старых ответов, которые были отфильтрованы с использованием предложения where ? –
@BalrajSingh - Итак, если бы я заменил '++ i' операцией, которая иногда выдавала ошибку? Это возможный сценарий? Какая ошибка может произойти? – Enigmativity
Я имею в виду вместо Observable.Timer (TimeSpan.FromSeconds (rnd.Next (3, 10)) Я заменил бы это асинхронным вызовом на веб-службу, и это может вызвать любой тип ошибки HTTP. В этом случае мне нужно отменить все старые запросы, которые я делаю так, чтобы он не мог произойти с какой-либо ошибкой, и, если это произойдет, я должен иметь способ справиться с этим, не заканчивая поток Observable. –