0

Может кто-нибудь помочь мне со следующим кодом, пожалуйста. в строке:Parallel.Invoke wait, если задачи заняты.

Parallel.Invoke(parallelOptions,() => dosomething(message)); 

Я хочу, чтобы вызвать до 5 параллельных задач (если есть 5 заняты, ждать следующего доступного, а затем запустить его ... если только 4 заняты, начните 5-й и т.д.)

private AutoResetEvent autoResetEvent1 = new AutoResetEvent(false); 
    private ParallelOptions parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = 5 }; 

    private void threadProc() 
    { 
     queue.ReceiveCompleted += MyReceiveCompleted1; 
     Debug.WriteLine("about to start loop"); 
     while (!shutdown) 
     { 
      queue.BeginReceive(); 
      autoResetEvent1.WaitOne(); 
     } 
     queue.ReceiveCompleted -= MyReceiveCompleted1; 
     queue.Dispose(); 
     Debug.WriteLine("we are done"); 
    } 

    private void MyReceiveCompleted1(object sender, ReceiveCompletedEventArgs e) 
    { 
     var message = queue.EndReceive(e.AsyncResult); 
     Debug.WriteLine("number of max tasks: " + parallelOptions.MaxDegreeOfParallelism); 
     Parallel.Invoke(parallelOptions,() => dosomething(message)); 
     autoResetEvent1.Set(); 
    } 

    private void dosomething(Message message) 
    { 
     //dummy body 
     var i = 0; 
     while (true) 
     { 
      Thread.Sleep(TimeSpan.FromSeconds(1)); 
      i++; 
      if (i == 5 || i == 10 || i == 15) Debug.WriteLine("loop number: " + i + " on thread: " + Thread.CurrentThread.ManagedThreadId); 
      if (i == 15) 
       break; 
     } 
     Debug.WriteLine("finished task"); 
    } 

РЕЗУЛЬТАТЫ пОЛУЧИТЬ СЕЙЧАС:

1) с DoSomething(), как вы его видите выше, я получаю только один, в то время (он ждет)

2) с DoSomething() изменено на ниже, я не получаю ни одной остановки x количество tas кс (не ограничиваясь или повинуясь MaxDegreeOfParallelism

private async Task dosomething(Message message) 
    { 
     //dummy body 
     var i = 0; 
     while (true) 
     { 
      await Task.Delay(TimeSpan.FromSeconds(1)); 
      i++; 
      if (i == 5 || i == 10 || i == 15) Debug.WriteLine("loop number: " + i + " on thread: " + Thread.CurrentThread.ManagedThreadId); 
      if (i == 15) 
       break; 
     } 
     Debug.WriteLine("finished task"); 
    } 

Что я делаю не так, чтобы получить то, что я хочу сделать?

РЕЗУЛЬТАТ Я ХОЧУ:

в «MyReceiveCompleted», я хочу, чтобы убедиться, что только 5 одновременных задач обработки сообщений, если есть 5 напряженными, ждут один, чтобы стать доступным.

+0

Возможно, семафоры? –

+0

вам нужно взглянуть на 'TaskScheduler'. Создайте свой собственный планировщик для выполнения только 5 задач за раз. –

+1

Вы предоставляете только одно действие 'Action' для' Parallel.Invoke', поэтому параллельное выполнение невозможно. Вы лучше посмотрите на [System.Threading.Task.Dataflow] (https://msdn.microsoft.com/library/system.threading.tasks.dataflow.aspx). – PetSerAl

ответ

1

Эта строка кода:

Parallel.Invoke(parallelOptions,() => dosomething(message)); 

говорит о TPL, чтобы начать новую параллельную работу с только одна вещь, чтобы сделать. Таким образом, вариант «макс-параллелизма» 5 является своего рода бессмысленным, так как будет только одно дело. autoResetEvent1 гарантирует, что одновременно будет выполняться только одна параллельная операция, и каждая параллельная операция имеет только одно действие, поэтому ожидаемое поведение только одной вещи, выполняемой одновременно, вполне ожидаемо.

Когда вы меняете делегат на асинхронный, то, что на самом деле происходит, является то, что MaxDegreeOfParallelism применяется только к синхронной части в методе . Поэтому, когда он достигает своего первого await, он «покидает» параллельную операцию и больше не ограничивается ею.

Основная проблема заключается в том, что Parallel работает лучше всего, когда количество операций известно заранее - что ваш код не знает; это просто чтение их из очереди и обработка их по мере их поступления. Как кто-то прокомментировал, вы могли бы решить это с помощью динамического параллелизма задач с TaskScheduler, который ограничивает параллелизм.

Однако простейшее решение, вероятно, является потоком данных TPL. Вы можете создать ActionBlock<T> соответствующую опцию дросселирования и посылать ему сообщения по мере их поступления:

private ActionBlock<string> block = new ActionBlock<string>(
    message => dosomething(message), 
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 }); 

private void MyReceiveCompleted1(object sender, ReceiveCompletedEventArgs e) 
{ 
    var message = queue.EndReceive(e.AsyncResult); 
    block.Post(message); 
    autoResetEvent1.Set(); 
} 

Один хороший аспект TPL DataFlow является то, что он также понимает, асинхронные методы, и будет интерпретировать MaxDegreeOfParallelism как максимальная степень параллелизма ,

+0

Спасибо!этот простой ActionBlock - это то, что я искал !! – user2711510

Смежные вопросы