2016-09-28 3 views
0

У меня есть код, который запускает тысячи URL-адресов через стороннюю библиотеку. Иногда метод в библиотеке зависает, который занимает поток. Через некоторое время все потоки заняты процессами, которые ничего не делают, и он останавливается.Как обрабатывать потоки, зависающие при использовании SemaphoreSlim

Я использую команду SemaphoreSlim для управления добавлением новых потоков, чтобы у меня было оптимальное количество выполняемых задач. Мне нужен способ определить задачи, которые выполнялись слишком долго, а затем убить их, но также освободить поток от SemaphoreSlim, чтобы можно было создать новую задачу.

Я борюсь с подходом здесь, поэтому я сделал несколько тестовых кодов, которые имитируют то, что я делаю. Он создает задачи, которые имеют 10% -ный шанс повесить так быстро, что все потоки повесили.

Как я должен проверять их и убивать их?

Вот код:

class Program 
{ 
    public static SemaphoreSlim semaphore; 
    public static List<Task> taskList; 
    static void Main(string[] args) 
    { 

     List<string> urlList = new List<string>(); 
     Console.WriteLine("Generating list"); 
     for (int i = 0; i < 1000; i++) 
     { 
      //adding random strings to simulate a large list of URLs to process 
      urlList.Add(Path.GetRandomFileName()); 
     } 
     Console.WriteLine("Queueing tasks"); 

     semaphore = new SemaphoreSlim(10, 10); 

     Task.Run(() => QueueTasks(urlList)); 

     Console.ReadLine(); 
    } 
    static void QueueTasks(List<string> urlList) 
    { 
     taskList = new List<Task>(); 

     foreach (var url in urlList) 
     { 
      Console.WriteLine("{0} tasks can enter the semaphore.", 
        semaphore.CurrentCount); 
      semaphore.Wait(); 

      taskList.Add(DoTheThing(url)); 
     } 
    } 
    static async Task DoTheThing(string url) 
    { 

     Random rand = new Random(); 

     // simulate the IO process 
     await Task.Delay(rand.Next(2000, 10000)); 

     // add a 10% chance that the thread will hang simulating what happens occasionally with http request 
     int chance = rand.Next(1, 100); 
     if (chance <= 10) 
     { 
      while (true) 
      { 
       await Task.Delay(1000000); 
      } 
     } 

     semaphore.Release(); 
     Console.WriteLine(url); 
    } 
} 
+0

Вы не должны убивать задачи в первую очередь. Используйте некоторый механизм сотрудничества для прерывания операции, чтобы он мог освободить семафор, за который он несет ответственность. К сожалению, на ваш вопрос не хватает всех деталей, необходимых для понимания самой реализации задачи, но, вообще говоря, при работе с сетевым вводом-выводом (например, HTTP-запросами) у вас не должно быть активного потока ... вы должны использовать .NET асинхронные API для выполняемых операций. –

+0

«Как я должен быть ... убивать их?» - Вы не должны - убивать нити плохо плохо. Если ваша сторонняя библиотека рушится, вам может потребоваться запустить библиотеку в отдельном «AppDomain», чтобы вы могли ее закрыть. – Enigmativity

+1

Я согласен с @Enigmativity, за исключением того, что для полной изоляции вам понадобится отдельный * процесс *, а не только AppDomain. –

ответ

0

Как люди уже отмечали, Aborting темы, в общем-то плохо, и нет гарантированного способа сделать это в C#. Используя отдельный процесс для выполнения работы, а затем убивайте, это немного лучше, чем попытка Thread.Abort; но все же не лучший способ. В идеале вам нужны совместные потоки/процессы, которые используют IPC, чтобы решить, когда нужно выручать себя. Таким образом, очистка выполняется правильно.

Со всем сказанным вы можете использовать код, как показано ниже, чтобы сделать то, что вы намереваетесь сделать. Я написал это, полагая, что ваша задача будет выполнена в потоке. С небольшими изменениями вы можете использовать ту же логику для выполнения своей задачи в процессе.

Код ни в коем случае не является пуленепробиваемым и предназначен для иллюстрации. Параллельный код не очень хорошо тестируется. Замки удерживаются дольше, чем необходимо, и некоторые места, которые я не блокирую (например, функция журнала)

class TaskInfo { 
    public Thread Task; 
    public DateTime StartTime; 

    public TaskInfo(ParameterizedThreadStart startInfo, object startArg) { 
     Task = new Thread(startInfo); 
     Task.Start(startArg); 
     StartTime = DateTime.Now; 
    } 

} 

class Program { 

    const int MAX_THREADS = 1; 
    const int TASK_TIMEOUT = 6; // in seconds 
    const int CLEANUP_INTERVAL = TASK_TIMEOUT; // in seconds 

    public static SemaphoreSlim semaphore; 

    public static List<TaskInfo> TaskList; 
    public static object TaskListLock = new object(); 

    public static Timer CleanupTimer; 

    static void Main(string[] args) { 
     List<string> urlList = new List<string>(); 
     Log("Generating list"); 
     for (int i = 0; i < 2; i++) { 
      //adding random strings to simulate a large list of URLs to process 
      urlList.Add(Path.GetRandomFileName()); 
     } 
     Log("Queueing tasks"); 

     semaphore = new SemaphoreSlim(MAX_THREADS, MAX_THREADS); 

     Task.Run(() => QueueTasks(urlList)); 

     CleanupTimer = new Timer(CleanupTasks, null, CLEANUP_INTERVAL * 1000, CLEANUP_INTERVAL * 1000); 


     Console.ReadLine(); 
    } 

    // TODO: Guard against re-entrancy 
    static void CleanupTasks(object state) { 
     Log("CleanupTasks started"); 

     lock (TaskListLock) { 
      var now = DateTime.Now; 
      int n = TaskList.Count; 
      for (int i = n - 1; i >= 0; --i) { 
       var task = TaskList[i]; 
       Log($"Checking task with ID {task.Task.ManagedThreadId}"); 

       // kill processes running for longer than anticipated 
       if (task.Task.IsAlive && now.Subtract(task.StartTime).TotalSeconds >= TASK_TIMEOUT) { 
        Log("Cleaning up hung task"); 
        task.Task.Abort(); 
       } 

       // remove task if it is not alive 
       if (!task.Task.IsAlive) { 
        Log("Removing dead task from list"); 
        TaskList.RemoveAt(i); 
        continue; 
       } 

      } 

      if (TaskList.Count == 0) { 
       Log("Disposing cleanup thread"); 
       CleanupTimer.Dispose(); 
      } 
     } 

     Log("CleanupTasks done"); 
    } 

    static void QueueTasks(List<string> urlList) { 
     TaskList = new List<TaskInfo>(); 

     foreach (var url in urlList) { 
      Log($"Trying to schedule url = {url}"); 
      semaphore.Wait(); 
      Log("Semaphore acquired"); 

      ParameterizedThreadStart taskRoutine = obj => { 
       try { 
        DoTheThing((string)obj); 
       } finally { 
        Log("Releasing semaphore"); 
        semaphore.Release(); 
       } 
      }; 

      var task = new TaskInfo(taskRoutine, url); 
      lock (TaskListLock) 
       TaskList.Add(task); 
     } 

     Log("All tasks queued"); 
    } 

    // simulate all processes get hung 
    static void DoTheThing(string url) { 
     while (true) 
      Thread.Sleep(5000); 
    } 

    static void Log(string msg) { 
     Console.WriteLine("{0:HH:mm:ss.fff} Thread {1,2} {2}", DateTime.Now, Thread.CurrentThread.ManagedThreadId.ToString(), msg); 
    } 
}