0

У меня есть очередь заданий, которые могут быть заполнены несколькими потоками (ConcurrentQueue<MyJob>). Мне нужно реализовать непрерывное выполнение этих заданий асинхронно (не по основному потоку), но только одним потоком в то же время. Я пытался что-то вроде этого:Многопоточная очередь заданий

public class ConcurrentLoop { 
    private static ConcurrentQueue<MyJob> _concurrentQueue = new ConcurrentQueue<MyJob>(); 

    private static Task _currentTask; 
    private static object _lock = new object(); 

    public static void QueueJob(Job job) 
    { 
     _concurrentQueue.Enqueue(job); 
     checkLoop(); 
    } 

    private static void checkLoop() 
    { 
     if (_currentTask == null || _currentTask.IsCompleted) 
     { 
      lock (_lock) 
      { 
       if (_currentTask == null || _currentTask.IsCompleted) 
       { 
        _currentTask = Task.Run(() => 
        { 
          MyJob current; 
          while(_concurrentQueue.TryDequeue(out current)) 
           //Do something              
        }); 
       } 
      } 
     } 
    } 
} 

Этот код на мой взгляд, есть проблема: если задача finnishing выполнить (TryDequeue возвращает ложь, но задача не была помечена как доделаю), и в этот момент я получаю новая работа, она не будет выполнена. Я прав? Если да, то как это исправить?

+0

Есть ли настоящая причина, по которой вы пытаетесь ограничить количество потоков? Лучше разрешить оптимизацию обработки фреймов для вас большую часть времени. – konkked

+0

Используйте Lock на блоке кода, который вы хотите исполнять по одному потоку за раз. Это должно решить проблему. – Reddy

+0

замок с 'ConcurrentQueue'? Я верю, что есть лучший способ – xalz

ответ

3

Заявление о проблеме выглядит как проблема producer-consumer, с оговоркой, что вы хотите только одного потребителя.

Нет необходимости переопределять такую ​​функциональность вручную. Вместо этого я предлагаю использовать BlockingCollection - внутри он использует ConcurrentQueue и отдельный поток для потребления. Обратите внимание, что это может быть или не быть подходящим для вашего прецедента.

Что-то вроде:

_blockingCollection = new BlockingCollection<your type>(); // you may want to create bounded or unbounded collection 
_consumingThread = new Thread(() => 
{ 
    foreach (var workItem in _blockingCollection.GetConsumingEnumerable()) // blocks when there is no more work to do, continues whenever a new item is added. 
    { 
     // do work with workItem 
    } 
}); 
_consumingThread.Start(); 

Несколько производителей (задачи или потоки) не могут добавлять рабочие элементы к _blockingCollection никаких проблем, и нет необходимости беспокоиться о синхронизации производителей/потребителей.

Когда вы закончите выполнять задание, позвоните по телефону _blockingCollection.CompleteAdding() (этот метод не является потокобезопасным, поэтому рекомендуется заранее остановить всех производителей). Возможно, вы также должны сделать _consumingThread.Join() где-то, чтобы прервать вашу потребляющую нить.

0

Для этого я бы воспользовался реактивными расширениями группы реактивной платформы Microsoft (NuGet «System.Reactive»). Это прекрасная абстракция.

public class ConcurrentLoop 
{ 
    private static Subject<MyJob> _jobs = new Subject<MyJob>(); 

    private static IDisposable _subscription = 
     _jobs 
      .Synchronize() 
      .ObserveOn(Scheduler.Default) 
      .Subscribe(job => 
      { 
       //Do something 
      }); 

    public static void QueueJob(MyJob job) 
    { 
     _jobs.OnNext(job); 
    } 
} 

Это прекрасно синхронизирует все входящие задания в единый поток и выталкивает выполнение на к Scheduler.Default (который в основном поток бассейн), а потому, что он сериализовать весь ввод только один может произойти в то время. Самое приятное в том, что он освобождает поток, если между значениями существует значительный разрыв. Это очень сухое решение.

Для очистки вам просто нужно позвонить либо _jobs.OnCompleted();, либо _subscription.Dispose();.

Смежные вопросы