2015-01-18 3 views
2

Задача здесь довольно проста (или так я думал ...). Я хочу заполнить очередь с помощью методов, которые будут выполняться (все они возвратят результат объекта), а затем я хочу, чтобы какое-то количество потоков выходило из этой очереди, выполняли методы и добавляли результаты в какую-то другую коллекцию (словарь в этом случае), который будет возвращен, когда вся работа будет завершена. Основной метод будет вызываться в основном потоке, который запустит обработку и должен заблокировать, пока все потоки не закончат выполнение того, что они делают, и возвратите коллекцию с результатами. Так я собрал этот класс:C# Lock Threading Issue

public class BackgroundWorkManager 
{ 
    public delegate object ThreadTask(); 

    private Thread[] workers; 
    private ManualResetEvent workerThreadMre; 
    private ManualResetEvent mainThreadMre; 
    private Queue<WorkItem> workQueue; 
    private Dictionary<string, object> results; 
    private object writeLock; 
    private int activeTasks; 

    private struct WorkItem 
    { 
     public string name; 
     public ThreadTask task; 

     public WorkItem(string name, ThreadTask task) 
     { 
      this.name = name; 
      this.task = task; 
     } 
    } 

    private void workMethod() 
    { 
     while (true) 
     { 
      workerThreadMre.WaitOne(); 

      WorkItem task; 

      lock (workQueue) 
      { 
       if (workQueue.Count == 0) 
       { 
        workerThreadMre.Reset(); 
        continue; 
       } 

       task = workQueue.Dequeue(); 
      } 

      object result = task.task(); 

      lock (writeLock) 
      { 
       results.Add(task.name, result); 
       activeTasks--; 

       if (activeTasks == 0) 
        mainThreadMre.Set(); 
      } 
     } 
    } 

    public BackgroundWorkManager() 
    { 
     workers = new Thread[Environment.ProcessorCount]; 
     workerThreadMre = new ManualResetEvent(false); 
     mainThreadMre = new ManualResetEvent(false); 
     workQueue = new Queue<WorkItem>(); 
     writeLock = new object(); 
     activeTasks = 0; 

     for (int i = 0; i < Environment.ProcessorCount; i++) 
     { 
      workers[i] = new Thread(workMethod); 
      workers[i].Priority = ThreadPriority.Highest; 
      workers[i].Start(); 
     } 
    } 

    public void addTask(string name, ThreadTask task) 
    { 
     workQueue.Enqueue(new WorkItem(name, task)); 
    } 

    public Dictionary<string, object> process() 
    { 
     results = new Dictionary<string, object>(); 

     activeTasks = workQueue.Count; 

     mainThreadMre.Reset(); 
     workerThreadMre.Set(); 
     mainThreadMre.WaitOne(); 
     workerThreadMre.Reset(); 

     return results; 
    } 
} 

Это прекрасно работает, если я использовал объект один раз, чтобы обработать очередь методов, но если я пытаюсь что-то вроде этого перерыва

  BackgroundWorkManager manager = new BackgroundWorkManager(); 

     for (int i = 0; i < 20; i++) 
     { 
      manager.addTask("result1", (BackgroundWorkManager.ThreadTask)delegate 
      { 
       return (object)(1); 
      }); 

      manager.process(); 
     } 

Things. Я либо зацикливаюсь, либо получаю исключение, говоря, что словарь, в котором я пишу результаты, уже содержит ключ (но Visual Studio Debugger говорит, что он пуст). Добавление «Thread.Sleep (1)» к методу работы, похоже, исправить, что причудливо. Это мой первый опыт работы с потоками, поэтому я не уверен, что я ужасно злоупотребляю замками или что. Если бы кто-нибудь мог дать некоторое представление о том, что я делаю неправильно, это было бы весьма признательно.

+0

Вы добавляете несколько задач с тем же именем. Почему вы удивлены тем, что «словарь уже содержит ключ»? Кроме того, должно быть довольно легко удовлетворить ваши требования с помощью PLINQ. Простой 'AsParallel' +' ToDictionary' должен сделать трюк, без блокировки или чего-то еще. – MarcinJuraszek

+0

Поскольку метод процесса создает новый экземпляр словаря, результаты которого затем добавляются. – user1869878

+0

Тот факт, что он создает новый словарь, не имеет никакого значения. Вы назначаете это поле в классе, которое используется несколькими разными потоками, поэтому довольно вероятно, что несколько потоков будут записываться в один и тот же экземпляр словаря '. – MarcinJuraszek

ответ

1

версия с параллельным классом:

List<Func<object>> actions = new List<Func<object>>(); 

actions.Add(delegate { return (object)(1); }); 
actions.Add(delegate { return (object)(1); }); 
actions.Add(delegate { return (object)(1); }); 

Dictionary<string, object> results = new Dictionary<string,object>(); 

Parallel.ForEach(actions,(f)=> { 
    lock (results) 
    { 
     results.Add(Guid.NewGuid().ToString(), f()); 
    } 
}); 
1

Существует множество вариантов работы с моделью-потребителем . Например, вы можете упростить код радикально, используя ActionBlock<T> (который является частью TPL Dataflow):

var concurrentDictionary = new ConcurrentDictionary<string, object>(); 

ActionBlock<Func<object>> actionBlock = new ActionBlock<Func<object>>((func) => 
{ 
    var obj = func(); 
    concurrentDictionary.AddOrUpdate("someKey", obj, (s,o) => o); 
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 
             Environment.ProcessorCount }); 

А потом просто опубликовать делегатов:

foreach (var task in tasks) 
{ 
    actionBlock.Post(() => (object) 1); 
} 
+1

Я не понимал, сколько из этого .NET может сделать для вас. Благодарю. – user1869878