2017-01-27 1 views
0

У меня есть наблюдательный звонок processedMessages. Мне нужно буферизовать эти сообщения и удалять их из внешней очереди каждые 10 сообщений или каждые 30 секунд. У меня есть следующий код, который должен для выполнения этой задачи:Почему мой буферизированный наблюдаемый комплект, а не предоставление значения для моего onNext на каждом интервале?

this.processedMessages.Where(m => m != null && some criteria here) 
    .Buffer(TimeSpan.FromSeconds(30), 10, this.schedulerProvider.Concurrent) 
    .Subscribe(
     (observer) => { DeleteMessages(observer); }); 

Этот код называется конструктором моей службы. Во время первого звонка processedMessages является пустой наблюдаемой. Messages потенциально добавляются к processedMessages при условии, что служба запущена. Мне нужно это, чтобы продолжать удаление сообщений, пока они добавляются к processedMessages. Вместо этого этот код запускается один раз (даже не учитывая 30-секундную задержку), немедленно вызывает DeleteMessages и затем завершает работу.

Я пробовал обернуть все это в Observable.Create() и подписаться на это позже. Это запускает код один раз, но никогда не повторяется. Я попытался добавить Repeat() на различные вспомогательные наблюдаемые (по общему признанию, граничащие с культурой груза там), но это либо ставит меня в бесконечный цикл, либо продолжает удалять одни и те же сообщения снова и снова.

Я подозреваю, что моя проблема заключается в том, что я никогда не звоню .Subscribe() по адресу: processedMessages; только на буферизованной проекции. Но если мне нужно подписаться на это наблюдаемое, то в чем смысл метода расширения буфера вообще?

EDIT:

На основании ответа Шломо, вот как я создал processedMessages.

Я попробовал оба из следующих и только понял, после рассмотрения в деталях, что они делают, что оба варианта создания наблюдаемой с помощью одного сообщения, а затем полная:

this.processedMessages.Concat(Observable.Return(newMessage)); 

this.processedMessages.Concat(new List<IMessage>() { newMessage }).ToObservable(); 

Итак правильный Followup вопрос будет Кажется, что: Как добавить новые сообщения в processedMessages, не вызывая его преждевременно?

ответ

0

Вашего наблюдаемые как вывешено должен вызывать DeleteMessages на один из трех условий:

  1. 10 сообщений, поступивших с момента подписки или последним триггером
  2. 30 секунд прошли с момента подписки или последним триггером
  3. наблюдаемого processedMessages завершает работу.

Поскольку нет видимости в processedMessages, я не могу отлаживать дальше.

Смежные вопросы