2016-11-24 3 views
0

Мне нужно прослушивать сокет UDP, и через 10 секунд или 100 элементов в буфере нужно вызвать некоторую логику. Как правило, он работает нормально, но я не знаю, как правильно остановить прослушивание сокета.Правильный способ прекратить прослушивание гнезда UDP с помощью RxExtensions

var ip = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 1234); 
var socket = new UdpClient(ip); 

var cancellationTokenSource = new CancellationTokenSource(); 
var observable = 
    Observable 
    .FromAsync(socket.ReceiveAsync) 
    .DoWhile(() => !cancellationTokenSource.IsCancellationRequested) 
    .Buffer(TimeSpan.FromSeconds(10), 100); 

var subscribtion = observable.Subscribe(o => 
{ 
    //logic 
}); 

//simulate close method from another thread 
Task.Factory.StartNew(() => 
{ 
    Task.Delay(TimeSpan.FromSeconds(12)).Wait(); 
    cancellationTokenSource.Cancel(); 
    socket.Close(); 
    subscribtion.Dispose(); 
}); 

Когда я имитирую закрытие сокета, существует ситуация, когда существует некоторые данные в буфере, которые не могут быть обработаны - это любой способ избежать этого?

Когда я отправить несколько сообщений, из другого процесса с 500ms задержки, это будет работает как пример ниже:

  1. 20 сообщений будет доход
  2. Некоторые логика будет вызывать - абонентская логика
  3. 4 сообщений будет доход
  4. Simulate близкий метод будет вызывать

Когда «закрытым способом» будет вызывать Мне нужно немедленно обработать все данные в буфере и закрыть приложение, не дожидаясь даже за время ожидания буфера. Время задержки буфера определяется пользователем, поэтому я не хочу ждать вызова абонентской логики, потому что это может быть довольно долгое время.

+0

Можете ли вы предоставить [mcve]? Я хотел бы запустить код, демонстрирующий вашу проблему. Тогда это может быть исправлено. Я собираюсь сказать, что решение будет ** не **, чтобы смешивать TPL и Rx. Rx более мощный, и на этом вы должны сосредоточиться. – Enigmativity

+0

Спасибо за ваш интерес. Возможно, я попробую задать свой вопрос и больше сосредоточиться на Rx. В моем примере я использовал метод Observable with Buffer. Он информирует подписчиков каждые 10 секунд или 100 пунктов в буфере. Но у меня есть особая ситуация, когда кто-то закрывает мое приложение. Это может произойти в не указанное время, например, если это произойдет, когда третья секунда от последних подписчиков сообщит и существует какая-то информация в буфере, я должен удержать свое приложение еще на 7 секунд для обработки всех данных из буфера. Есть ли способ сказать классу Observable (по запросу) - остановить наблюдателя вашего источника и в последний раз информировать подписчиков. – tom

+0

Что означает «остановить наблюдателя от вашего источника и в последний раз информировать подписчиков»? – Enigmativity

ответ

1

Добро пожаловать в StackOverflow!

Существующие перегрузки метода буфера не поддерживают время, счет и ворота. Но есть перегрузка, которая вызывает закрытие буфера, когда последовательность создает значение. Поэтому мы просто создадим последовательность путем слияния наблюдаемых всех условий закрытия буфера.

Посмотрите на эту демонстрационную версию.

onTime произведет значение после указанного периода.

onCount произведет свое первое значение после того, как x предметов прошли.

onClose немедленно произведет значение для подписки - но мы не будем подключаться к нему, пока мы не примем решение.

  var producer = Observable.Interval(TimeSpan.FromSeconds(0.2)); 
      var source = producer.Publish().RefCount(); 

      var onClose = Observable.Return(0L).Publish(); 
      var onTime = Observable.Timer(TimeSpan.FromSeconds(2)); 
      var onCount = source.Skip(10); 

      var bufferClose = Observable.Merge(onClose, onTime, onCount); 

     var subscription = 
      source 
      .Buffer(() => bufferClose) 
      .Subscribe(list => Console.WriteLine(string.Join(",", list))); 

      Console.WriteLine("Waiting for close"); 
      Console.ReadLine(); 

      onClose.Connect(); //signal 
      subscription.Dispose(); 

      Console.WriteLine("Closed"); 

      Console.ReadLine(); 

Это будет производить вывод, основанный на количестве или время, пока возврат не будет нажат, когда он немедленно закрывается с тем, что в буфере.

+0

Завтра я попробую решение, но оно выглядит очень многообещающим. Спасибо. – tom

+0

@tom, как получилось? – Asti

Смежные вопросы