2013-06-14 3 views
12

Я хотел бы запустить Task, который имеет «heartbeat», который продолжает работать с определенным интервалом времени, пока задача не завершится.Создание задачи с биением

Я думаю, метод расширения, как это будет хорошо работать:

public static async Task WithHeartbeat(this Task primaryTask, TimeSpan heartbeatInterval, Action<CancellationToken> heartbeatAction, CancellationToken cancellationToken) 

Например:

public class Program { 
    public static void Main() { 
     var cancelTokenSource = new CancellationTokenSource(); 
     var cancelToken = cancelTokenSource.Token; 
     var longRunningTask = Task.Factory.StartNew(SomeLongRunningTask, cancelToken, TaskCreationOptions.LongRunning, TaskScheduler.Current); 
     var withHeartbeatTask = longRunningTask.WithHeartbeat(TimeSpan.FromSeconds(1), PerformHeartbeat, cancelToken); 
     withHeartbeatTask.Wait(); 
     Console.WriteLine("Long running task completed!"); 
     Console.ReadLine() 
    } 

    private static void SomeLongRunningTask() { 
     Console.WriteLine("Starting long task"); 
     Thread.Sleep(TimeSpan.FromSeconds(9.5)); 
    } 
    private static int _heartbeatCount = 0; 
    private static void PerformHeartbeat(CancellationToken cancellationToken) { 
     Console.WriteLine("Heartbeat {0}", ++_heartbeatCount); 
    } 
} 

Эта программа должна вывести:

Starting long task 
Heartbeat 1 
Heartbeat 2 
Heartbeat 3 
Heartbeat 4 
Heartbeat 5 
Heartbeat 6 
Heartbeat 7 
Heartbeat 8 
Heartbeat 9 
Long running task completed! 

Обратите внимание, что (при нормальных обстоятельствах) не должно выводиться «Heartbeat 10», поскольку сердцебиение начинается после первоначального тайм-аута (т. 1 секунда). Аналогично, если задача занимает меньше времени, чем интервал сердцебиения, сердцебиение не должно происходить вообще.

Что такое хороший способ реализовать это?

Справочная информация: У меня есть служба, которая слушает очередь Azure Service Bus. Я бы хотел, чтобы сообщение Complete (которое навсегда удалило бы его из очереди), пока я не закончу его обработку, что может занять больше времени, чем максимальное сообщение LockDuration 5 минут. Таким образом, мне нужно использовать этот подход, чтобы позвонить RenewLockAsync до истечения времени блокировки, чтобы сообщение не перегревалось во время длительной обработки.

+0

Это созвучно отчетность о проделанной работе в задаче асинхронной (только что вещь, запускающее отчет является интервалом времени, и нет никакого реального прогресса в докладе, за исключением возможно, подсчет пульса). Помогает ли любая из этих ссылок? http://blogs.msdn.com/b/dotnet/archive/2012/06/06/async-in-4-5-enabling-progress-and-cancellation-in-async-apis.aspx http: // stackoverflow .com/вопросы/15408148/c-sharp-async-wait-progress-event-on-task-object –

+0

@TimS. Они похожи, но не совсем то, что я хочу, особенно в случае никогда не сообщать, быстро ли завершена задача. Кроме того, сердцебиение не знает прогресса в расчете. Тем не менее, я был бы рад узнать, можете ли вы реализовать подход прогресса, чтобы соответствовать моему API расширения и иметь тот же чистый эффект с более простым кодом. –

ответ

11

Вот моя попытка:

public static class TaskExtensions { 
    /// <summary> 
    /// Issues the <paramref name="heartbeatAction"/> once every <paramref name="heartbeatInterval"/> while <paramref name="primaryTask"/> is running. 
    /// </summary> 
    public static async Task WithHeartbeat(this Task primaryTask, TimeSpan heartbeatInterval, Action<CancellationToken> heartbeatAction, CancellationToken cancellationToken) { 
     if (cancellationToken.IsCancellationRequested) { 
      return; 
     } 

     var stopHeartbeatSource = new CancellationTokenSource(); 
     cancellationToken.Register(stopHeartbeatSource.Cancel); 

     await Task.WhenAny(primaryTask, PerformHeartbeats(heartbeatInterval, heartbeatAction, stopHeartbeatSource.Token)); 
     stopHeartbeatSource.Cancel(); 
    } 

    private static async Task PerformHeartbeats(TimeSpan interval, Action<CancellationToken> heartbeatAction, CancellationToken cancellationToken) { 
     while (!cancellationToken.IsCancellationRequested) { 
      try { 
       await Task.Delay(interval, cancellationToken); 
       if (!cancellationToken.IsCancellationRequested) { 
        heartbeatAction(cancellationToken); 
       } 
      } 
      catch (TaskCanceledException tce) { 
       if (tce.CancellationToken == cancellationToken) { 
        // Totally expected 
        break; 
       } 
       throw; 
      } 
     } 
    } 
} 

или с небольшой подстройкой, вы можете даже сделать сердцебиение асинхронного как в:

/// <summary> 
    /// Awaits a fresh Task created by the <paramref name="heartbeatTaskFactory"/> once every <paramref name="heartbeatInterval"/> while <paramref name="primaryTask"/> is running. 
    /// </summary> 
    public static async Task WithHeartbeat(this Task primaryTask, TimeSpan heartbeatInterval, Func<CancellationToken, Task> heartbeatTaskFactory, CancellationToken cancellationToken) { 
     if (cancellationToken.IsCancellationRequested) { 
      return; 
     } 

     var stopHeartbeatSource = new CancellationTokenSource(); 
     cancellationToken.Register(stopHeartbeatSource.Cancel); 

     await Task.WhenAll(primaryTask, PerformHeartbeats(heartbeatInterval, heartbeatTaskFactory, stopHeartbeatSource.Token)); 

     if (!stopHeartbeatSource.IsCancellationRequested) { 
      stopHeartbeatSource.Cancel(); 
     } 
    } 

    public static Task WithHeartbeat(this Task primaryTask, TimeSpan heartbeatInterval, Func<CancellationToken, Task> heartbeatTaskFactory) { 
     return WithHeartbeat(primaryTask, heartbeatInterval, heartbeatTaskFactory, CancellationToken.None); 
    } 

    private static async Task PerformHeartbeats(TimeSpan interval, Func<CancellationToken, Task> heartbeatTaskFactory, CancellationToken cancellationToken) { 
     while (!cancellationToken.IsCancellationRequested) { 
      try { 
       await Task.Delay(interval, cancellationToken); 
       if (!cancellationToken.IsCancellationRequested) { 
        await heartbeatTaskFactory(cancellationToken); 
       } 
      } 
      catch (TaskCanceledException tce) { 
       if (tce.CancellationToken == cancellationToken) { 
        // Totally expected 
        break; 
       } 
       throw; 
      } 
     } 
    } 

, который позволит вам изменить образец код, чтобы что-то вроде это:

private static async Task PerformHeartbeat(CancellationToken cancellationToken) { 
    Console.WriteLine("Starting heartbeat {0}", ++_heartbeatCount); 
    await Task.Delay(1000, cancellationToken); 
    Console.WriteLine("Finishing heartbeat {0}", _heartbeatCount); 
} 

Вышеуказанный вариант может быть заменен асинхронным вызовом типа RenewLockAsync, так что вам не нужно тратить время на потоковую обработку, используя блокирующий вызов типа RenewLock, который потребует подхода Action.

Я answering my own question per SO guidelines, но я также открыт для более элегантных подходов к этой проблеме.

+0

Привет, я попал на этот пост из вашего комментария по моему вопросу о похожих интересах. В случае очереди SB, когда и где именно вы обновляете Lock? рабочая роль у меня есть только один поток как таковой, и прием и обработка сообщений выполняется в цикле while в методе Run(). – Aravind

+0

@Aravind Когда я получаю сообщение SB, я создаю задачу для ее обработки. Задача использует это вспомогательное устройство сердцебиения для пульса до тех пор, пока оно работает. –

+0

О, хорошо. Поскольку эта обработка сообщений я использую, возможно, не так часто используется. Я не создавал задачи для обработки каждого сообщения. Я не нашел использование RenewLock в примере кода, поэтому его так спрашивали. – Aravind

0

Вот мой подход

using System; 
using System.Threading; 
using System.Threading.Tasks; 

namespace ConsoleApplication3 
{ 
class Program 
{ 
    static void Main(string[] args) 
    { 
     Console.WriteLine("Start Main"); 
     StartTest().Wait(); 
     Console.ReadLine(); 
     Console.WriteLine("Complete Main"); 
    } 

    static async Task StartTest() 
    { 
     var cts = new CancellationTokenSource(); 

     // ***Use ToArray to execute the query and start the download tasks. 
     Task<bool>[] tasks = new Task<bool>[2]; 
     tasks[0] = LongRunningTask("", 20, cts.Token); 
     tasks[1] = Heartbeat("", 1, cts.Token); 

     // ***Call WhenAny and then await the result. The task that finishes 
     // first is assigned to firstFinishedTask. 
     Task<bool> firstFinishedTask = await Task.WhenAny(tasks); 

     Console.WriteLine("first task Finished."); 
     // ***Cancel the rest of the downloads. You just want the first one. 
     cts.Cancel(); 

     // ***Await the first completed task and display the results. 
     // Run the program several times to demonstrate that different 
     // websites can finish first. 
     var isCompleted = await firstFinishedTask; 
     Console.WriteLine("isCompleted: {0}", isCompleted); 
    } 

    private static async Task<bool> LongRunningTask(string id, int sleep, CancellationToken ct) 
    { 
     Console.WriteLine("Starting long task"); 


     await Task.Delay(TimeSpan.FromSeconds(sleep)); 

     Console.WriteLine("Completed long task"); 
     return true; 
    } 

    private static async Task<bool> Heartbeat(string id, int sleep, CancellationToken ct) 
    { 
     while(!ct.IsCancellationRequested) 
     { 
      await Task.Delay(TimeSpan.FromSeconds(sleep)); 
      Console.WriteLine("Heartbeat Task Sleep: {0} Second", sleep); 
     } 

     return true; 
    } 

} 

}

Смежные вопросы