Teaser: Ребята, этот вопрос не о том, как реализовать политику повтора. Речь идет о правильном завершении блока потока данных TPL.Внедрение правильного завершения повторного блока
Этот вопрос в основном является продолжением моего предыдущего вопроса Retry policy within ITargetBlock. Ответом на этот вопрос было умное решение @ svick, которое использует TransformBlock
(источник) и TransformManyBlock
(цель). Остается только оставить этот блок в правильном порядке : дождаться завершения всех повторных попыток, а затем завершить целевой блок. Вот то, что я закончил с (это просто фрагмент кода, не обращайте слишком много внимания на не THREADSAFE retries
набор):
var retries = new HashSet<RetryingMessage<TInput>>();
TransformManyBlock<RetryableMessage<TInput>, TOutput> target = null;
target = new TransformManyBlock<RetryableMessage<TInput>, TOutput>(
async message =>
{
try
{
var result = new[] { await transform(message.Data) };
retries.Remove(message);
return result;
}
catch (Exception ex)
{
message.Exceptions.Add(ex);
if (message.RetriesRemaining == 0)
{
if (failureHandler != null)
failureHandler(message.Exceptions);
retries.Remove(message);
}
else
{
retries.Add(message);
message.RetriesRemaining--;
Task.Delay(retryDelay)
.ContinueWith(_ => target.Post(message));
}
return null;
}
}, dataflowBlockOptions);
source.LinkTo(target);
source.Completion.ContinueWith(async _ =>
{
while (target.InputCount > 0 || retries.Any())
await Task.Delay(100);
target.Complete();
});
Идея заключается в том, чтобы выполнить какую-то опрос и проверить, есть ли все сообщения, ожидающие обработки, и нет сообщений, требующих повторной попытки. Но в этом решении мне не нравится идея опроса.
Да, я могу инкапсулировать логику добавления/удаления повторений в отдельный класс и даже, например, выполнять некоторые действия, когда набор попыток становится пустым, но как иметь дело с target.InputCount > 0
условием? Не существует такого обратного вызова, вызываемого при отсутствии ожидающих сообщений для блока, поэтому кажется, что проверка target.ItemCount
в цикле с небольшой задержкой является единственным вариантом.
Кто-нибудь знает более умный способ достичь этого?
Похоже, что ITargetBlock поддерживает push-уведомление через наблюдателя, возвращаемого методом расширения AsObserver. См. Http://msdn.microsoft.com/en-us/library/hh160359.aspx и http://msdn.microsoft.com/en-us/library/ee850490.aspx. – JamieSee
Похоже, вы пытаетесь использовать исключения как обычный программный поток, что является плохой практикой. поиск Google или посмотреть на следующую тему на SO: http://stackoverflow.com/questions/729379/why-not-use-exceptions-as-regular-flow-of-control Вся логика повтора должна быть в блоке try, а не в блоке исключений. Не ответ на ваш вопрос, но я подумал, что вы должны знать. – Nullius
@Nullius, логика повторения основана на * исключениях * - повторите попытку в случае временной ошибки. Я не думаю, что логика повтора в блоке 'try' - хорошая идея, так как вы не знаете тип ошибки и является ли эта ошибка временной или нет. – Alex