Мне нужно прослушивать сокет UDP, и через 10 секунд или 100 элементов в буфере нужно вызвать некоторую логику. Как правило, он работает нормально, но я не знаю, как правильно остановить прослушивание сокета.Правильный способ прекратить прослушивание гнезда UDP с помощью RxExtensions
var ip = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 1234);
var socket = new UdpClient(ip);
var cancellationTokenSource = new CancellationTokenSource();
var observable =
Observable
.FromAsync(socket.ReceiveAsync)
.DoWhile(() => !cancellationTokenSource.IsCancellationRequested)
.Buffer(TimeSpan.FromSeconds(10), 100);
var subscribtion = observable.Subscribe(o =>
{
//logic
});
//simulate close method from another thread
Task.Factory.StartNew(() =>
{
Task.Delay(TimeSpan.FromSeconds(12)).Wait();
cancellationTokenSource.Cancel();
socket.Close();
subscribtion.Dispose();
});
Когда я имитирую закрытие сокета, существует ситуация, когда существует некоторые данные в буфере, которые не могут быть обработаны - это любой способ избежать этого?
Когда я отправить несколько сообщений, из другого процесса с 500ms задержки, это будет работает как пример ниже:
- 20 сообщений будет доход
- Некоторые логика будет вызывать - абонентская логика
- 4 сообщений будет доход
- Simulate близкий метод будет вызывать
Когда «закрытым способом» будет вызывать Мне нужно немедленно обработать все данные в буфере и закрыть приложение, не дожидаясь даже за время ожидания буфера. Время задержки буфера определяется пользователем, поэтому я не хочу ждать вызова абонентской логики, потому что это может быть довольно долгое время.
Можете ли вы предоставить [mcve]? Я хотел бы запустить код, демонстрирующий вашу проблему. Тогда это может быть исправлено. Я собираюсь сказать, что решение будет ** не **, чтобы смешивать TPL и Rx. Rx более мощный, и на этом вы должны сосредоточиться. – Enigmativity
Спасибо за ваш интерес. Возможно, я попробую задать свой вопрос и больше сосредоточиться на Rx. В моем примере я использовал метод Observable with Buffer. Он информирует подписчиков каждые 10 секунд или 100 пунктов в буфере. Но у меня есть особая ситуация, когда кто-то закрывает мое приложение. Это может произойти в не указанное время, например, если это произойдет, когда третья секунда от последних подписчиков сообщит и существует какая-то информация в буфере, я должен удержать свое приложение еще на 7 секунд для обработки всех данных из буфера. Есть ли способ сказать классу Observable (по запросу) - остановить наблюдателя вашего источника и в последний раз информировать подписчиков. – tom
Что означает «остановить наблюдателя от вашего источника и в последний раз информировать подписчиков»? – Enigmativity