2010-07-16 2 views
0

Думаю, мне может потребоваться передумать мой дизайн. Мне сложно сузить ошибку, из-за которой компьютер полностью зависает, иногда бросая HRESULT 0x8007000E из VS 2010.Threadpool/WaitHandle утечка ресурсов/авария

У меня есть консольное приложение (которое я позже преобразую в службу), которое обрабатывает передача файлов на основе очереди базы данных.

Я сжимаю потоки, разрешенные для передачи. Это связано с тем, что некоторые системы, к которым мы подключаемся, могут содержать только определенное количество подключений от определенных учетных записей.

Например, система А может принимать только 3 одновременных соединения (что означает 3 отдельных потока). Каждый из этих потоков имеет свой собственный уникальный объект соединения, поэтому нам не следует запускать какие-либо проблемы с синхронизацией, так как они не используют соединение.

Мы хотим обрабатывать файлы из этих систем в циклах. Так, например, мы разрешим 3 подключения, которые могут передавать до 100 файлов на соединение. Это означает, что для перемещения 1000 файлов из System A мы можем обрабатывать только 300 файлов за цикл, так как разрешено 3 потока по 100 файлов. Поэтому за время жизни этого переноса у нас будет 10 потоков. Мы можем работать только по 3 за раз. Таким образом, будет 3 цикла, и последний цикл будет использовать только 1 поток для передачи последних 100 файлов. (3 нитки х 100 файлов = 300 файлов за цикл)

В настоящее время архитектура примером является:

  1. System.Threading.Timer проверяет очередь каждые 5 секунд что-то делать, вызывая GetScheduledTask()
  2. Если нет ничего, GetScheduledTask() просто ничего не делает
  3. Если есть работа, создать ThreadPool нить для обработки работы [Work нитяных]
  4. работы Поток а видит, что есть 1000 файлов для передачи
  5. работа Поток А видит, что он может иметь только 3 темы, работающих в системе он становится все файлы из
  6. Работа нитяным начинается три новые рабочие темы [B, C, D] и переводы
  7. Работа Поток А ждет B, C, D [WaitHandle.WaitAll(transfersArray)]
  8. Работа Поток А видит, что есть еще несколько файлов в очереди (должно быть 700 сейчас)
  9. Работа Thread А создает новый массив, чтобы ждать на [transfersArray = new TransferArray[3], которое максимально для системы А, но может варьироваться в зависимости от системы
  10. Рабочая резьба А запускает три новых рабочих резьбы [B, C, D] и ждет их [WaitHandle.WaitAll(transfersArray)]
  11. Процесс повторяется, пока больше нет файлов для перемещения.
  12. сигналы Работа Пронизывайте, что это делается

Я использую ManualResetEvent для обработки сигналов.

Мои вопросы:

  1. Есть ли вопиющее обстоятельство, которое может вызвать утечку ресурсов или проблема, что я переживаю?
  2. Должен ли я цикл через массив после каждого WaitHandle.WaitAll(array) и вызвать array[index].Dispose()?
  3. Количество Handle под Task Manager для этого процесса медленно ползет вверх
  4. Я зову начальное создание Worker Thread А из System.Threading. Таймер. Будут ли какие-то проблемы с этим? Код для этого таймера:

(Некоторые код класса для планирования)

private ManualResetEvent _ResetEvent; 

private void Start() 
{ 
    _IsAlive = true; 
    ManualResetEvent transferResetEvent = new ManualResetEvent(false); 
    //Set the scheduler timer to 5 second intervals 
    _ScheduledTasks = new Timer(new TimerCallback(ScheduledTasks_Tick), transferResetEvent, 200, 5000); 
} 

private void ScheduledTasks_Tick(object state) 
{ 
    ManualResetEvent resetEvent = null; 
    try 
    { 
     resetEvent = (ManualResetEvent)state; 
     //Block timer until GetScheduledTasks() finishes 
     _ScheduledTasks.Change(Timeout.Infinite, Timeout.Infinite); 
     GetScheduledTasks(); 
    } 
    finally 
    { 
     _ScheduledTasks.Change(5000, 5000); 
     Console.WriteLine("{0} [Main] GetScheduledTasks() finished", DateTime.Now.ToString("MMddyy HH:mm:ss:fff")); 
     resetEvent.Set(); 
    } 
} 


private void GetScheduledTask() 
{ 
    try 
    { 
     //Check to see if the database connection is still up 
     if (!_IsAlive) 
     { 
      //Handle 
      _ConnectionLostNotification = true; 
      return; 
     } 

     //Get scheduled records from the database 
     ISchedulerTask task = null; 

     using (DataTable dt = FastSql.ExecuteDataTable(
       _ConnectionString, "hidden for security", System.Data.CommandType.StoredProcedure, 
       new List<FastSqlParam>() { new FastSqlParam(ParameterDirection.Input, SqlDbType.VarChar, "@ProcessMachineName", Environment.MachineName) })) //call to static class 
     { 
      if (dt != null) 
      { 
       if (dt.Rows.Count == 1) 
       { //Only 1 row is allowed 
        DataRow dr = dt.Rows[0]; 

        //Get task information 
        TransferParam.TaskType taskType = (TransferParam.TaskType)Enum.Parse(typeof(TransferParam.TaskType), dr["TaskTypeId"].ToString()); 
        task = ScheduledTaskFactory.CreateScheduledTask(taskType); 

        task.Description = dr["Description"].ToString(); 
        task.IsEnabled = (bool)dr["IsEnabled"]; 
        task.IsProcessing = (bool)dr["IsProcessing"]; 
        task.IsManualLaunch = (bool)dr["IsManualLaunch"]; 
        task.ProcessMachineName = dr["ProcessMachineName"].ToString(); 
        task.NextRun = (DateTime)dr["NextRun"]; 
        task.PostProcessNotification = (bool)dr["NotifyPostProcess"]; 
        task.PreProcessNotification = (bool)dr["NotifyPreProcess"]; 
        task.Priority = (TransferParam.Priority)Enum.Parse(typeof(TransferParam.SystemType), dr["PriorityId"].ToString()); 
        task.SleepMinutes = (int)dr["SleepMinutes"]; 
        task.ScheduleId = (int)dr["ScheduleId"]; 
        task.CurrentRuns = (int)dr["CurrentRuns"]; 
        task.TotalRuns = (int)dr["TotalRuns"]; 

        SchedulerTask scheduledTask = new SchedulerTask(new ManualResetEvent(false), task); 
        //Queue up task to worker thread and start 
        ThreadPool.QueueUserWorkItem(new WaitCallback(this.ThreadProc), scheduledTask);  
       } 
      } 
     } 

    } 
    catch (Exception ex) 
    { 
     //Handle 
    } 
} 

private void ThreadProc(object taskObject) 
{ 
    SchedulerTask task = (SchedulerTask)taskObject; 
    ScheduledTaskEngine engine = null; 
    try 
    { 
     engine = SchedulerTaskEngineFactory.CreateTaskEngine(task.Task, _ConnectionString); 
     engine.StartTask(task.Task);  
    } 
    catch (Exception ex) 
    { 
     //Handle 
    } 
    finally 
    { 
     task.TaskResetEvent.Set(); 
     task.TaskResetEvent.Dispose(); 
    } 
} 
+0

Похоже, что это была ошибка кодирования, связанная с объявлением массива событий сброса. Я делал 'ManualResetEvent [] События = новые ManualResetEvents [число];' вместо 'WaitHandle [] события = новый WaitHandle [число]' –

ответ

0

Оказывается, источник этой странной проблемы не был связан с архитектурой, а скорее из-за преобразования решения с 3.5 до 4.0. Я повторно создал решение, не выполняя никаких изменений кода, и проблема никогда не повторялась.

2

0x8007000E ошибка из-за нехватки памяти. Это и количество обработчиков, похоже, указывают на утечку ресурсов. Убедитесь, что вы удаляете каждый объект, который реализует IDisposable. Сюда входят массивы ManualResetEvent, которые вы используете.

Если у вас есть время, вы также можете преобразовать его в класс .NET 4.0 Task; он был разработан для обработки сложных сценариев, таких как это намного более чисто. Определив дочерние объекты Task, вы можете уменьшить общее количество потоков (потоки довольно дороги не только из-за планирования, но и из-за их пространства стека).

0

Я думаю, вы должны полностью пересмотреть свою архитектуру. Тот факт, что вы можете иметь только 3 одновременных соединения, почти попросил вас использовать 1 поток, чтобы сгенерировать список файлов и 3 потока для их обработки. Ваш поток производителя вставлял бы все файлы в очередь, а 3 потока потребителей деактивировали и продолжали обрабатывать по мере поступления элементов в очередь. Блокирующая очередь может значительно упростить код. Если вы используете .NET 4.0, вы можете воспользоваться классом BlockingCollection.

public class Example 
{ 
    private BlockingCollection<string> m_Queue = new BlockingCollection<string>(); 

    public void Start() 
    { 
     var threads = new Thread[] 
      { 
       new Thread(Producer), 
       new Thread(Consumer), 
       new Thread(Consumer), 
       new Thread(Consumer) 
      }; 
     foreach (Thread thread in threads) 
     { 
      thread.Start(); 
     } 
    } 

    private void Producer() 
    { 
     while (true) 
     { 
      Thread.Sleep(TimeSpan.FromSeconds(5)); 
      ScheduledTask task = GetScheduledTask(); 
      if (task != null) 
      { 
       foreach (string file in task.Files) 
       { 
        m_Queue.Add(task); 
       } 
      } 
     } 
    } 

    private void Consumer() 
    { 
     // Make a connection to the resource that is assigned to this thread only. 
     while (true) 
     { 
      string file = m_Queue.Take(); 
      // Process the file. 
     } 
    } 
} 

В приведенном выше примере я определенно упростил вещи, но, надеюсь, вы получите общую идею. Обратите внимание на то, что это намного проще, поскольку не так много способов синхронизации потоков (большинство из них будет встроено в очередь блокировки), и, конечно, не нужно использовать объекты WaitHandle. Очевидно, вам придется добавить правильные механизмы, чтобы изящно закрыть потоки, но это должно быть довольно легко.

1

Я ищу ответы на аналогичную проблему (количество ручек увеличивается с течением времени).

Я посмотрел на вашу архитектуру приложений и хотел бы предложить вам что-то, что может помочь вам:

вы слышали о IOCP (Input Output Completion Ports).

Я не уверен в том, что этот навык реализуется с использованием C#, но на C/C++ это кусок пирога. С помощью этого вы создаете уникальный пул потоков (количество потоков в этом пуле в общем случае определяется как 2 x количество процессоров или ядер процессоров на ПК или сервере) Вы связываете этот пул с ручкой IOCP и пулом делает работу. См. Справку по следующим функциям: CreateIoCompletionPort(); PostQueuedCompletionStatus(); GetQueuedCompletionStatus();

В общем создании и выходе потоков на лету может потребоваться много времени и приводит к штрафам за производительность и фрагментации памяти. Есть тысячи литературы о IOCP в MSDN и в google.