2010-02-18 5 views
4

У меня есть один основной поток и многие другие фоновые потоки.C# Дождитесь окончания всех потоков в ThreadPool

Основное использование этих фоновых потоков - запрос данных (многие запросы из Интернета, поэтому я создаю несколько потоков: чтобы избежать отставания пользовательского интерфейса).

Когда дело доходит до экспорта данных в основной поток (пользовательский интерфейс), мне нужно подождать, пока все остальные потоки не будут завершены.

Мой код:

//...code to open save file dialog... 

//this loop is to wait for all the threads finish their query 
//QueryThread.threadCount is the count of the background threads 
while (QueryThread.threadCount != 0) 
{ 
    Thread.CurrentThread.Join(1000); 
    Console.WriteLine(QueryThread.threadCount); 
} 

//...code to export data... 

Если я комментирую петлю, а из, программа будет работать гладко, но некоторые из моих экспортированных данных будет иметь возможность показывать некоторые «нежелательные» материал, так как некоторые из фона потоки не закончили свою работу.

Однако вышеописанный цикл while бесконечен, threadCount никогда не изменяется, что означает, что во время метода «Join()» фоновый поток не работает.

Почему блокирование фоновых потоков и как я могу решить проблему?

Большое спасибо!

ответ

2

Ваша реализация неверна, и вы не должны использовать Join как примитив синхронизации между вашими потоками.

Что вам нужно сделать, это реализовать producer-consumer pattern. Это позволит вам иметь потоки, ожидающие выполнения работы, а затем ожить, чтобы выполнить эту работу, когда вы поместите ее в очередь.

Однако изменение, которое я сделаю, состоит в том, что из потока пользовательского интерфейса не добавляйте данные непосредственно в очередь, принадлежащую производителю и потребителю. Скорее сделайте копию этих данных, а затем положите , что в очереди.

Для получения дополнительной информации о том, как реализовать шаблон производитель-потребитель в .NET, я предлагаю вам прочитать документацию MSDN под названием «How to: Synchronize a Producer and a Consumer Thread (C# Programming Guide)»

+0

Спасибо. Я также чувствую себя странно в своей реализации, но я новичок в программировании потоков. С нетерпением ждем ваших последующих действий. =] –

+0

Хорошие ссылки, но это не похоже на его проблему - я думаю, что его проблема связана с объединением нескольких вызовов данных и последующей откачкой результатов обратно в поток пользовательского интерфейса, как только они закончили? – slugster

3

Вы вызываете метод Join в текущем потоке, который не делает много смысла. Вы должны вызвать его на ваших рабочих потоков:

foreach (Thread thread in workerThreads) 
{ 
    thread.Join(1000); 
} 

К сожалению, этот вид поражения цели с использованием потоков, потому что он будет блокировать звонки, пока все остальные потоки не будут закончены.

Событие BackgroundWorker может использоваться для уведомления о завершении фоновой задачи и выполнения обновлений в форме.

+0

Я использую ThreadPool для тех фоновых потоков Как сделать метод foreach на нем? спасибо –

2

Я думаю, вы хотите посмотреть на сигнализацию. Имейте сигналы (ManualResetEvent/AutoResetEvent) для ваших потоков. Установите() связанный знак сигнала в рабочем потоке, когда это будет сделано. В основном потоке выполните `WaitAll (signal1, signal2, signal3) ', чтобы дождаться завершения ваших рабочих потоков.

Надеется, что это помогает,

+0

Я пробовал это, но он говорит, что основной поток - это STAThread, который не поддерживает WaitAll –

+0

, тогда вам нужно повторить их и WaitOne по каждому заказу не имеет значения – Marek

1

Я couldn't сопротивляться, пытаясь немного себя. Я уверен, что есть возможности для улучшения, но я думаю, что это показывает, как решать некоторые проблемы с несколькими потоками, включая исходный вопрос.

Form.cs

 

namespace STAFormWithThreadPoolSync 
{ 
    internal delegate void WorkerEvent(WorkerEventInfo info); 

    public partial class Form1 : Form 
    { 
     // We'll create a state object for each worker process 
     List<WorkerState> workerStates = new List<WorkerState>(); 

     public Form1() 
     { 
      InitializeComponent(); 
     } 

     // Executed in the main thread 
     private void button1_Click(object sender, EventArgs e) 
     { 
      workersList.Items.Clear(); 

      // Read the amount of thread we should start from the form 
      int threadCountToUse = (int)ThreadCount.Value; 

      WorkerEvent woEvent = new WorkerEvent(this.workerEventOccured); 

      // Start up all threads 
      for (int counter = 0; counter < threadCountToUse; ++counter) 
      { 
       // An object we can pass values into for the worker process to use. 
       WorkerState workerState = new WorkerState(); 

       workerState.OnStarted += woEvent; 
       workerState.OnFinished += woEvent; 

       // Register for the signal (and store its registered wait handle in the stateObj, which we also pass into the parameters!) 
       workerState.registeredWaitHandle = ThreadPool.RegisterWaitForSingleObject(workerState.finishSignal, this.ItemHasFinished, workerState, -1, true); 

       // Store the state object for later use. 
       workerStates.Add(workerState); 
      } 

      WorkersProgress.Minimum = 0; 
      WorkersProgress.Maximum = workerStates.Count; 

      workerStates.ForEach(workerState => 
       { 
        // Fire of the worker thread (with the state object) 
        ThreadPool.QueueUserWorkItem(this.ProcessItem, workerState); 
       } 
      ); 



      button1.Enabled = false; 
      CurrentResult.Value = 0; 
      CurrentResultLabel.Text = "Current value"; 
      ProgressTimer.Start(); 
     } 

     // event is run on the callers thread, so carefull accessing our controls on our form. 
     internal void workerEventOccured(WorkerEventInfo info) 
     { 
      if (this.workersList.InvokeRequired) 
      { 
       WorkerEvent workerEvent = new WorkerEvent(workerEventOccured); 
       this.Invoke(workerEvent, new object[] { info }); 
      } 
      else 
      { 
       switch (info.eventType) 
       { 
        case EventType.WorkerStarted: 
         this.workersList.Items.Add(String.Format("Worker started on thread : {0}", info.workerState.threadId)); 
         break; 

        case EventType.WorkerEnded: 
         this.workersList.Items.Add(String.Format("Worker finished on thread : {0}", info.workerState.threadId)); 
         break; 
        case EventType.AllWorkersFinished: 
         this.workersList.Items.Add("ALL workers finished"); 
         ProgressTimer.Stop(); 
         button1.Enabled = true; 
         CurrentResultLabel.Text = "Final value"; 
         break; 
       } 
      } 
     } 

     // Executed in threadpool thread. 
     private void ProcessItem(object state) 
     { 
      WorkerState workerState = state as WorkerState; 
      int threadId = Thread.CurrentThread.ManagedThreadId; 
      workerState.threadId = threadId.ToString(); 

      WorkerEventInfo weInfo = new WorkerEventInfo(); 
      weInfo.eventType = EventType.WorkerStarted; 
      weInfo.workerState = workerState; 
      workerState.Started(weInfo); 

      // Simulate work for ((threadid/2) seconds. 
      Thread.Sleep((threadId * 500)); 

      // Set the result in the state object to the threadId; 
      workerState.result = threadId; 

      // Signal that this thread is done. 
      workerState.finishSignal.Set(); 
     } 

     // Executed in threadpool thread 
     private void ItemHasFinished(object state, bool timedOut) 
     { 
      // get our state object 
      WorkerState workerState = state as WorkerState; 

      WorkerEventInfo weInfo = new WorkerEventInfo(); 
      weInfo.eventType = EventType.WorkerEnded; 
      weInfo.workerState = workerState; 

      workerState.Finished(weInfo); 
     } 

     private void ProgressTimer_Tick(object sender, EventArgs e) 
     { 
      List<WorkerState> removeStates = new List<WorkerState>(); 
      workerStates.ForEach(workerState => 
       { 
        if (workerState.finishSignal.WaitOne(0)) 
        { 
         CurrentResult.Value += workerState.result; 
         removeStates.Add(workerState); 
        } 
       } 
      ); 

      removeStates.ForEach(workerState => 
       { 
        workerState.registeredWaitHandle.Unregister(workerState.finishSignal); 
        workerStates.Remove(workerState); 
       } 
      ); 


      WorkersProgress.Value = workerStates.Count; 
      if (workerStates.Count == 0) 
      { 
       WorkerEventInfo weInfo = new WorkerEventInfo(); 
       weInfo.eventType = EventType.AllWorkersFinished; 
       weInfo.workerState = null; 
       this.workerEventOccured(weInfo); 
      } 

     } 
    } 

    internal class WorkerState 
    { 
     internal string threadId = ""; 
     internal int result = 0; 
     internal RegisteredWaitHandle registeredWaitHandle = null; 
     internal AutoResetEvent finishSignal = new AutoResetEvent(false); 
     internal event WorkerEvent OnStarted = new WorkerEvent((info) => {}); 
     internal event WorkerEvent OnFinished = new WorkerEvent((info) => { }); 

     internal void Started(WorkerEventInfo info) 
     { 
      OnStarted(info); 
     } 

     internal void Finished(WorkerEventInfo info) 
     { 
      OnFinished(info); 
      this.finishSignal.Set(); 
     } 
    } 

    internal enum EventType 
    { 
     WorkerStarted, 
     WorkerEnded, 
     AllWorkersFinished 
    } 

    internal class WorkerEventInfo 
    { 
     internal EventType eventType; 
     internal WorkerState workerState; 
    } 
} 

 

Form.Designer.cs

 

namespace STAFormWithThreadPoolSync 
{ 
    partial class Form1 
    { 
     /// 
     /// Required designer variable. 
     /// 
     private System.ComponentModel.IContainer components = null; 

     /// 
     /// Clean up any resources being used. 
     /// 
     /// true if managed resources should be disposed; otherwise, false. 
     protected override void Dispose(bool disposing) 
     { 
      if (disposing && (components != null)) 
      { 
       components.Dispose(); 
      } 
      base.Dispose(disposing); 
     } 

     #region Windows Form Designer generated code 

     /// 
     /// Required method for Designer support - do not modify 
     /// the contents of this method with the code editor. 
     /// 
     private void InitializeComponent() 
     { 
      this.components = new System.ComponentModel.Container(); 
      this.button1 = new System.Windows.Forms.Button(); 
      this.ThreadCount = new System.Windows.Forms.NumericUpDown(); 
      this.workersList = new System.Windows.Forms.ListView(); 
      this.WorkerProcessColumn = new System.Windows.Forms.ColumnHeader(); 
      this.ProgressTimer = new System.Windows.Forms.Timer(this.components); 
      this.WorkersProgress = new System.Windows.Forms.ProgressBar(); 
      this.CurrentResultLabel = new System.Windows.Forms.Label(); 
      this.CurrentResult = new System.Windows.Forms.NumericUpDown(); 
      this.label2 = new System.Windows.Forms.Label(); 
      ((System.ComponentModel.ISupportInitialize)(this.ThreadCount)).BeginInit(); 
      ((System.ComponentModel.ISupportInitialize)(this.CurrentResult)).BeginInit(); 
      this.SuspendLayout(); 
      // 
      // button1 
      // 
      this.button1.Location = new System.Drawing.Point(212, 19); 
      this.button1.Name = "button1"; 
      this.button1.Size = new System.Drawing.Size(93, 23); 
      this.button1.TabIndex = 0; 
      this.button1.Text = "Start threads"; 
      this.button1.UseVisualStyleBackColor = true; 
      this.button1.Click += new System.EventHandler(this.button1_Click); 
      // 
      // ThreadCount 
      // 
      this.ThreadCount.Location = new System.Drawing.Point(23, 21); 
      this.ThreadCount.Minimum = new decimal(new int[] { 
      2, 
      0, 
      0, 
      0}); 
      this.ThreadCount.Name = "ThreadCount"; 
      this.ThreadCount.Size = new System.Drawing.Size(183, 20); 
      this.ThreadCount.TabIndex = 1; 
      this.ThreadCount.Value = new decimal(new int[] { 
      4, 
      0, 
      0, 
      0}); 
      // 
      // workersList 
      // 
      this.workersList.Columns.AddRange(new System.Windows.Forms.ColumnHeader[] { 
      this.WorkerProcessColumn}); 
      this.workersList.Location = new System.Drawing.Point(23, 80); 
      this.workersList.Name = "workersList"; 
      this.workersList.Size = new System.Drawing.Size(486, 255); 
      this.workersList.TabIndex = 3; 
      this.workersList.UseCompatibleStateImageBehavior = false; 
      this.workersList.View = System.Windows.Forms.View.Details; 
      // 
      // WorkerProcessColumn 
      // 
      this.WorkerProcessColumn.Text = "Worker process"; 
      this.WorkerProcessColumn.Width = 482; 
      // 
      // ProgressTimer 
      // 
      this.ProgressTimer.Interval = 200; 
      this.ProgressTimer.Tick += new System.EventHandler(this.ProgressTimer_Tick); 
      // 
      // WorkersProgress 
      // 
      this.WorkersProgress.Location = new System.Drawing.Point(112, 341); 
      this.WorkersProgress.Name = "WorkersProgress"; 
      this.WorkersProgress.Size = new System.Drawing.Size(397, 24); 
      this.WorkersProgress.TabIndex = 4; 
      // 
      // CurrentResultLabel 
      // 
      this.CurrentResultLabel.AutoSize = true; 
      this.CurrentResultLabel.Location = new System.Drawing.Point(578, 266); 
      this.CurrentResultLabel.Name = "CurrentResultLabel"; 
      this.CurrentResultLabel.Size = new System.Drawing.Size(74, 13); 
      this.CurrentResultLabel.TabIndex = 5; 
      this.CurrentResultLabel.Text = "Current Result"; 
      // 
      // CurrentResult 
      // 
      this.CurrentResult.Location = new System.Drawing.Point(581, 282); 
      this.CurrentResult.Maximum = new decimal(new int[] { 
      -1593835520, 
      466537709, 
      54210, 
      0}); 
      this.CurrentResult.Name = "CurrentResult"; 
      this.CurrentResult.ReadOnly = true; 
      this.CurrentResult.Size = new System.Drawing.Size(169, 20); 
      this.CurrentResult.TabIndex = 6; 
      // 
      // label2 
      // 
      this.label2.AutoSize = true; 
      this.label2.Location = new System.Drawing.Point(25, 352); 
      this.label2.Name = "label2"; 
      this.label2.Size = new System.Drawing.Size(81, 13); 
      this.label2.TabIndex = 7; 
      this.label2.Text = "processing load"; 
      // 
      // Form1 
      // 
      this.AutoScaleDimensions = new System.Drawing.SizeF(6F, 13F); 
      this.AutoScaleMode = System.Windows.Forms.AutoScaleMode.Font; 
      this.ClientSize = new System.Drawing.Size(762, 377); 
      this.Controls.Add(this.label2); 
      this.Controls.Add(this.CurrentResult); 
      this.Controls.Add(this.CurrentResultLabel); 
      this.Controls.Add(this.WorkersProgress); 
      this.Controls.Add(this.workersList); 
      this.Controls.Add(this.ThreadCount); 
      this.Controls.Add(this.button1); 
      this.Name = "Form1"; 
      this.Text = "Form1"; 
      ((System.ComponentModel.ISupportInitialize)(this.ThreadCount)).EndInit(); 
      ((System.ComponentModel.ISupportInitialize)(this.CurrentResult)).EndInit(); 
      this.ResumeLayout(false); 
      this.PerformLayout(); 

     } 

     #endregion 

     private System.Windows.Forms.Button button1; 
     private System.Windows.Forms.NumericUpDown ThreadCount; 
     private System.Windows.Forms.ListView workersList; 
     private System.Windows.Forms.ColumnHeader WorkerProcessColumn; 
     private System.Windows.Forms.Timer ProgressTimer; 
     private System.Windows.Forms.ProgressBar WorkersProgress; 
     private System.Windows.Forms.Label CurrentResultLabel; 
     private System.Windows.Forms.NumericUpDown CurrentResult; 
     private System.Windows.Forms.Label label2; 
    } 
} 
 

Надеется, что это помогает,

+0

Спасибо! вы определенно показали свою страсть по программированию =] Но в коде отсутствует код: 1. для цикла в button1_Click 2. ForEach в ProgressTimer_Tick –

+0

Исправлена ​​ошибка, при которой кнопка button1-click не показывала источник правильно. Не вижу ничего плохого в ProgressTimer, его использование лямбда. –

0

Я решил проблему, изменив свой подход к Producer-Consumer модели.

Спасибо всем. Пожалуйста, посмотрите на эту link (предоставляется casperOne выше), но будьте осторожны, не следить за выполнением Microsoft ....

Go here вместо даст вам лучший ответ.

Конечно, я внес некоторые изменения, тип очереди - это делегировать в моем случае.

public static class QueryThread 
{ 
    private static SyncEvents _syncEvents = new SyncEvents(); 
    private static Queue<Delegate> _queryQueue = new Queue<Delegate>(); 

    static Producer queryProducer; 
    static Consumer queryConsumer; 

    public static void init() 
    { 
     queryProducer = new Producer(_queryQueue, _syncEvents); 
     queryConsumer = new Consumer(_queryQueue, _syncEvents); 

     Thread producerThread = new Thread(queryProducer.ThreadRun); 
     Thread consumerThread = new Thread(queryConsumer.ThreadRun); 

     producerThread.IsBackground = true; 
     consumerThread.IsBackground = true; 

     producerThread.Start(); 
     consumerThread.Start(); 
    } 

    public static void Enqueue(Delegate item) 
    { 
     queryQueue.Enqueue(item); 
    } 
} 

Если запрос необходим в главном потоке, епдиеий делегат указует на функцию, которая делает запрос с помощью вызова Епдиеих (делегат пункта). Это добавит делегат в «приватную» очередь Продюсера.

Производитель добавит элементы в свою очередь в общую очередь в подходящем случае (например, генерирует случайное число и помещает его в общую очередь в примере msdn).

Потребитель деактивирует делегатов и запускает их.

Спасибо всем за помощь. =]

Смежные вопросы