У меня есть наблюдательный звонок processedMessages
. Мне нужно буферизовать эти сообщения и удалять их из внешней очереди каждые 10 сообщений или каждые 30 секунд. У меня есть следующий код, который должен для выполнения этой задачи:Почему мой буферизированный наблюдаемый комплект, а не предоставление значения для моего onNext на каждом интервале?
this.processedMessages.Where(m => m != null && some criteria here)
.Buffer(TimeSpan.FromSeconds(30), 10, this.schedulerProvider.Concurrent)
.Subscribe(
(observer) => { DeleteMessages(observer); });
Этот код называется конструктором моей службы. Во время первого звонка processedMessages
является пустой наблюдаемой. Messages
потенциально добавляются к processedMessages
при условии, что служба запущена. Мне нужно это, чтобы продолжать удаление сообщений, пока они добавляются к processedMessages
. Вместо этого этот код запускается один раз (даже не учитывая 30-секундную задержку), немедленно вызывает DeleteMessages и затем завершает работу.
Я пробовал обернуть все это в Observable.Create()
и подписаться на это позже. Это запускает код один раз, но никогда не повторяется. Я попытался добавить Repeat()
на различные вспомогательные наблюдаемые (по общему признанию, граничащие с культурой груза там), но это либо ставит меня в бесконечный цикл, либо продолжает удалять одни и те же сообщения снова и снова.
Я подозреваю, что моя проблема заключается в том, что я никогда не звоню .Subscribe()
по адресу: processedMessages
; только на буферизованной проекции. Но если мне нужно подписаться на это наблюдаемое, то в чем смысл метода расширения буфера вообще?
EDIT:
На основании ответа Шломо, вот как я создал processedMessages
.
Я попробовал оба из следующих и только понял, после рассмотрения в деталях, что они делают, что оба варианта создания наблюдаемой с помощью одного сообщения, а затем полная:
this.processedMessages.Concat(Observable.Return(newMessage));
this.processedMessages.Concat(new List<IMessage>() { newMessage }).ToObservable();
Итак правильный Followup вопрос будет Кажется, что: Как добавить новые сообщения в processedMessages
, не вызывая его преждевременно?