2013-07-05 3 views
52

Teaser: Ребята, этот вопрос не о том, как реализовать политику повтора. Речь идет о правильном завершении блока потока данных TPL.Внедрение правильного завершения повторного блока

Этот вопрос в основном является продолжением моего предыдущего вопроса Retry policy within ITargetBlock. Ответом на этот вопрос было умное решение @ svick, которое использует TransformBlock (источник) и TransformManyBlock (цель). Остается только оставить этот блок в правильном порядке : дождаться завершения всех повторных попыток, а затем завершить целевой блок. Вот то, что я закончил с (это просто фрагмент кода, не обращайте слишком много внимания на не THREADSAFE retries набор):

var retries = new HashSet<RetryingMessage<TInput>>(); 

TransformManyBlock<RetryableMessage<TInput>, TOutput> target = null; 
target = new TransformManyBlock<RetryableMessage<TInput>, TOutput>(
    async message => 
    { 
     try 
     { 
      var result = new[] { await transform(message.Data) }; 
      retries.Remove(message); 
      return result; 
     } 
     catch (Exception ex) 
     { 
      message.Exceptions.Add(ex); 
      if (message.RetriesRemaining == 0) 
      { 
       if (failureHandler != null) 
        failureHandler(message.Exceptions); 

       retries.Remove(message); 
      } 
      else 
      { 
       retries.Add(message); 
       message.RetriesRemaining--; 

       Task.Delay(retryDelay) 
        .ContinueWith(_ => target.Post(message)); 
      } 
      return null; 
     } 
    }, dataflowBlockOptions); 

source.LinkTo(target); 

source.Completion.ContinueWith(async _ => 
{ 
    while (target.InputCount > 0 || retries.Any()) 
     await Task.Delay(100); 

    target.Complete(); 
}); 

Идея заключается в том, чтобы выполнить какую-то опрос и проверить, есть ли все сообщения, ожидающие обработки, и нет сообщений, требующих повторной попытки. Но в этом решении мне не нравится идея опроса.

Да, я могу инкапсулировать логику добавления/удаления повторений в отдельный класс и даже, например, выполнять некоторые действия, когда набор попыток становится пустым, но как иметь дело с target.InputCount > 0 условием? Не существует такого обратного вызова, вызываемого при отсутствии ожидающих сообщений для блока, поэтому кажется, что проверка target.ItemCount в цикле с небольшой задержкой является единственным вариантом.

Кто-нибудь знает более умный способ достичь этого?

+1

Похоже, что ITargetBlock поддерживает push-уведомление через наблюдателя, возвращаемого методом расширения AsObserver. См. Http://msdn.microsoft.com/en-us/library/hh160359.aspx и http://msdn.microsoft.com/en-us/library/ee850490.aspx. – JamieSee

+0

Похоже, вы пытаетесь использовать исключения как обычный программный поток, что является плохой практикой. поиск Google или посмотреть на следующую тему на SO: http://stackoverflow.com/questions/729379/why-not-use-exceptions-as-regular-flow-of-control Вся логика повтора должна быть в блоке try, а не в блоке исключений. Не ответ на ваш вопрос, но я подумал, что вы должны знать. – Nullius

+4

@Nullius, логика повторения основана на * исключениях * - повторите попытку в случае временной ошибки. Я не думаю, что логика повтора в блоке 'try' - хорошая идея, так как вы не знаете тип ошибки и является ли эта ошибка временной или нет. – Alex

ответ

1

Объединяя hwcverwe ответ и JamieSee комментарий может быть идеальным решением.

Во-первых, вам нужно создать более одного события:

var signal = new ManualResetEvent(false); 
var completedEvent = new ManualResetEvent(false); 

Затем вы должны создать наблюдателя, а также подписаться на TransformManyBlock, так что вы будете уведомлены, когда соответствующее событие происходит:

var observer = new RetryingBlockObserver<TOutput>(completedEvent); 
var observable = target.AsObservable(); 
observable.Subscribe(observer); 

наблюдаемое может быть довольно легко:

private class RetryingBlockObserver<T> : IObserver<T> { 
     private ManualResetEvent completedEvent; 

     public RetryingBlockObserver(ManualResetEvent completedEvent) {     
      this.completedEvent = completedEvent; 
     } 

     public void OnCompleted() { 
      completedEvent.Set(); 
     } 

     public void OnError(Exception error) { 
      //TODO 
     } 

     public void OnNext(T value) { 
      //TODO 
     } 
    } 

И ча п ждать либо сигнала или завершения (исчерпания всех элементов источника), или оба

source.Completion.ContinueWith(async _ => { 

      WaitHandle.WaitAll(completedEvent, signal); 
      // Or WaitHandle.WaitAny, depending on your needs! 

      target.Complete(); 
     }); 

Вы можете проверить значение результата из WaitAll, чтобы понять, какое событие было установлено, и реагировать соответствующим образом. Вы также можете добавить в код другие события, передав их наблюдателю, чтобы он мог установить их, когда это необходимо.Вы можете отличать свое поведение и реагировать по-разному при возникновении ошибки, например

2

Возможно, ManualResetEvent может сделать трюк.

Добавить общественную собственность TransformManyBlock

private ManualResetEvent _signal = new ManualResetEvent(false); 
public ManualResetEvent Signal { get { return _signal; } } 

И вот вы идете:

var retries = new HashSet<RetryingMessage<TInput>>(); 

TransformManyBlock<RetryableMessage<TInput>, TOutput> target = null; 
target = new TransformManyBlock<RetryableMessage<TInput>, TOutput>(
    async message => 
    { 
     try 
     { 
      var result = new[] { await transform(message.Data) }; 
      retries.Remove(message); 

      // Sets the state of the event to signaled, allowing one or more waiting threads to proceed 
      if(!retries.Any()) Signal.Set(); 
      return result; 
     } 
     catch (Exception ex) 
     { 
      message.Exceptions.Add(ex); 
      if (message.RetriesRemaining == 0) 
      { 
       if (failureHandler != null) 
        failureHandler(message.Exceptions); 

       retries.Remove(message); 

       // Sets the state of the event to signaled, allowing one or more waiting threads to proceed 
       if(!retries.Any()) Signal.Set(); 
      } 
      else 
      { 
       retries.Add(message); 
       message.RetriesRemaining--; 

       Task.Delay(retryDelay) 
        .ContinueWith(_ => target.Post(message)); 
      } 
      return null; 
     } 
    }, dataflowBlockOptions); 

source.LinkTo(target); 

source.Completion.ContinueWith(async _ => 
{ 
    //Blocks the current thread until the current WaitHandle receives a signal. 
    target.Signal.WaitOne(); 

    target.Complete(); 
}); 

Я не уверен, где ваш target.InputCount установлен. Таким образом, на месте вы измените target.InputCount вы можете добавить следующий код:

if(InputCount == 0) Signal.Set(); 
+0

Дело в том, что 'target.InputCount' является * черным ящиком * - это свойство только для чтения' TransformManyBlock' из потока данных TPL. – Alex

Смежные вопросы