2013-05-26 4 views
4

Я смотрел на это thread на создание простого пула потоков. Там, я наткнулся на @MilanGardian's response for .NET 3.5, который был элегантен и служил своей цели:NullReferenceException при создании потока

using System; 
using System.Collections.Generic; 
using System.Threading; 

namespace SimpleThreadPool 
{ 
    public sealed class Pool : IDisposable 
    { 
     public Pool(int size) 
     { 
      this._workers = new LinkedList<Thread>(); 
      for (var i = 0; i < size; ++i) 
      { 
       var worker = new Thread(this.Worker) { Name = string.Concat("Worker ", i) }; 
       worker.Start(); 
       this._workers.AddLast(worker); 
      } 
     } 

     public void Dispose() 
     { 
      var waitForThreads = false; 
      lock (this._tasks) 
      { 
       if (!this._disposed) 
       { 
        GC.SuppressFinalize(this); 

        this._disallowAdd = true; // wait for all tasks to finish processing while not allowing any more new tasks 
        while (this._tasks.Count > 0) 
        { 
         Monitor.Wait(this._tasks); 
        } 

        this._disposed = true; 
        Monitor.PulseAll(this._tasks); // wake all workers (none of them will be active at this point; disposed flag will cause then to finish so that we can join them) 
        waitForThreads = true; 
       } 
      } 
      if (waitForThreads) 
      { 
       foreach (var worker in this._workers) 
       { 
        worker.Join(); 
       } 
      } 
     } 

     public void QueueTask(Action task) 
     { 
      lock (this._tasks) 
      { 
       if (this._disallowAdd) { throw new InvalidOperationException("This Pool instance is in the process of being disposed, can't add anymore"); } 
       if (this._disposed) { throw new ObjectDisposedException("This Pool instance has already been disposed"); } 
       this._tasks.AddLast(task); 
       Monitor.PulseAll(this._tasks); // pulse because tasks count changed 
      } 
     } 

     private void Worker() 
     { 
      Action task = null; 
      while (true) // loop until threadpool is disposed 
      { 
       lock (this._tasks) // finding a task needs to be atomic 
       { 
        while (true) // wait for our turn in _workers queue and an available task 
        { 
         if (this._disposed) 
         { 
          return; 
         } 
         if (null != this._workers.First && object.ReferenceEquals(Thread.CurrentThread, this._workers.First.Value) && this._tasks.Count > 0) // we can only claim a task if its our turn (this worker thread is the first entry in _worker queue) and there is a task available 
         { 
          task = this._tasks.First.Value; 
          this._tasks.RemoveFirst(); 
          this._workers.RemoveFirst(); 
          Monitor.PulseAll(this._tasks); // pulse because current (First) worker changed (so that next available sleeping worker will pick up its task) 
          break; // we found a task to process, break out from the above 'while (true)' loop 
         } 
         Monitor.Wait(this._tasks); // go to sleep, either not our turn or no task to process 
        } 
       } 

       task(); // process the found task 
       this._workers.AddLast(Thread.CurrentThread); 
       task = null; 
      } 
     } 

     private readonly LinkedList<Thread> _workers; // queue of worker threads ready to process actions 
     private readonly LinkedList<Action> _tasks = new LinkedList<Action>(); // actions to be processed by worker threads 
     private bool _disallowAdd; // set to true when disposing queue but there are still tasks pending 
     private bool _disposed; // set to true when disposing queue and no more tasks are pending 
    } 


    public static class Program 
    { 
     static void Main() 
     { 
      using (var pool = new Pool(5)) 
      { 
       var random = new Random(); 
       Action<int> randomizer = (index => 
       { 
        Console.WriteLine("{0}: Working on index {1}", Thread.CurrentThread.Name, index); 
        Thread.Sleep(random.Next(20, 400)); 
        Console.WriteLine("{0}: Ending {1}", Thread.CurrentThread.Name, index); 
       }); 

       for (var i = 0; i < 40; ++i) 
       { 
        var i1 = i; 
        pool.QueueTask(() => randomizer(i1)); 
       } 
      } 
     } 
    } 
} 

Я использую это следующим образом:

static void Main(string[] args) 
{ 
    ... 
    ... 
     while(keepRunning) 
     { 
     ... 
     pool.QueueTask(() => DoTask(eventObject); 
     } 
    ... 
} 

private static void DoTask(EventObject e) 
{ 
    // Do some computations 

    pool.QueueTask(() => DoAnotherTask(eventObject)); // this is a relatively smaller computation 
} 

Я получаю следующее исключение после выполнения кода в течение двух дней:

Unhandled Exception: System.NullReferenceException: Object reference not set to an instance of an object. 
    at System.Collections.Generic.LinkedList`1.InternalInsertNodeBefore(LinkedListNode`1 node, LinkedListNode`1 newNode) 
    at System.Collections.Generic.LinkedList`1.AddLast(T value) 
    at MyProg.Pool.Worker() 
    at System.Threading.ThreadHelper.ThreadStart_Context(Object state) 
    at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state) 
    at System.Threading.ThreadHelper.ThreadStart() 

Я не могу понять, что вызывает это, поскольку я не могу получить эту ошибку снова. Любые предложения по устранению этого?

+0

Ваша трассировка стека указывает на 'this._workers.AddLast (Thread.CurrentThread);' как преступник. Я не очень много работал с LinkedLists на C#, но, возможно, он сработал в порядке и не является потокобезопасным. – TyCobb

+1

Почти все случаи «NullReferenceException» одинаковы. См. «[Что такое исключение NullReferenceException в .NET?] (Http://stackoverflow.com/questions/4660142/what-is-a-nullreferenceexception-in-net)» для некоторых советов. –

+0

Я думаю, что вы получаете ошибку NULLReferenceException, поскольку у вас нет первого инициализированного или правильно созданного объекта. попробуйте создать объект с помощью 'new' - что может помочь –

ответ

3

Похоже, что доступ к связанному списку _workers не синхронизирован должным образом. Рассмотрим этот сценарий:

Предположим, что в какой-то момент this._workets список содержит один элемент.

Первый поток вызывает this._workers.AddLast(Thread.CurrentThread);, но прерывается на очень специальном месте - внутри AddLast() метода:

public void AddLast(LinkedListNode<T> node) 
{ 
    this.ValidateNewNode(node); 
    if (this.head == null) 
    { 
     this.InternalInsertNodeToEmptyList(node); 
    } 
    else 
    { 
     // here we got interrupted - the list was not empty, 
     // but it would be pretty soon, and this.head becomes null 
     // InternalInsertNodeBefore() does not expect that 
     this.InternalInsertNodeBefore(this.head, node); 
    } 
    node.list = (LinkedList<T>) this; 
} 

Другие нити вызывает this._workers.RemoveFirst();. Существует не lock() вокруг этого утверждения, поэтому он завершается, и теперь список пуст. AddLast() теперь должен позвонить InternalInsertNodeToEmptyList(node);, но он не может, поскольку условие уже было оценено.

Ввод простой lock(this._tasks) вокруг одной линии this._workers.AddLast() должен предотвратить такой сценарий.

Другие плохие сценарии включают добавление элемента в один и тот же список одновременно двумя потоками.

+0

+1 Спасибо за ваше время. Завершение этого заявления внутри замка решило проблему. – Legend

3

Думаю, я нашел проблему. Образец кода пропущенный lock()

private void Worker() 
{ 
    Action task = null; 
    while (true) // loop until threadpool is disposed 
    { 
     lock (this._tasks) // finding a task needs to be atomic 
     { 
      while (true) // wait for our turn in _workers queue and an available task 
      { 
      .... 
      } 
     } 

     task(); // process the found task 
     this._workers.AddLast(Thread.CurrentThread); 
     task = null; 
    } 
} 

замок должен быть продлен или обернуты вокруг this._workers.AddLast(Thread.CurrentThread);

Если вы посмотрите на другой код, который модифицирует LinkedList (Pool.QueueTask), он обернут в lock.

+0

+1 Спасибо. Это, похоже, устранило проблему. Я уточню, если я это повторю. – Legend

Смежные вопросы