2013-04-25 3 views
1

У меня есть процесс, где мой основной поток читает файл и разбивает его на части. Затем эти части требуют дальнейшей обработки. Я хотел бы использовать любые доступные потоки, чтобы последующая обработка использовала как можно больше ЦП (или как много ядер). Я не хочу создавать избыточное отставание от основного потока, поэтому мне нужно, чтобы основной поток ожидал добавления в очередь, пока не появится еще один доступный поток..NET - Блокировать основной поток до тех пор, пока не будут доступны потоки

Я вижу много статей, как VB.NET 4.0: Looking to execute multiple threads, but wait until all threads are completed before resuming, но они ждут все темы завершить, тогда мне просто нужно любые темы будут доступны

Это то, что я могу решать с Task Parallel Library, или должны Я вручную создаю потоки и контролирую threadpool?

Using Reader As New StreamReader(FileName) 
    Do 
     CurrentBlockSize = Reader.ReadBlock(CurrentBuffer, 0, BufferSize) 

     RunningBuffer &= New String(CurrentBuffer) 

     If RunningBuffer.Contains(RowDelimiter) Then 
      LineParts = RunningBuffer.Split(RowDelimiter) 

      For I As Integer = 0 To LineParts.Count - 1 
       If I < LineParts.Count - 1 Then 

        'Make synchronous call that blocks until' 
        'another thread is available to process the line' 
        AddLineToTheProcessingQueue(CurrentLine) 

       Else 
        RunningBuffer = LineParts(I) 
       End If 
      Next 
     End If 

    Loop While CurrentBlockSize = BufferSize 
End Using 
+0

Это почти наверняка потерянное усилие, такой код почти всегда связан с диском. Простая проверка: перезагрузите компьютер и запустите однопоточную версию. Если загрузка процессора на одно ядро ​​не превышает 50%, то добавление большего количества потоков не может сделать его быстрее. –

+0

@ HansPassant, работа в нисходящем направлении будет включать обработку и отправку больших объемов данных в базу данных по TCP/IP, что, вероятно, будет медленнее, чем на диске. Это часть, которую я хочу многопоточным. –

+0

Затем потяните, не нажимайте данные. Как и большинство программ, они читают файл. –

ответ

1

Вставьте этот код в новое консольное приложение.

Imports System.Threading 

Module Module1 

    ' I just picked 6 randomly, not sure what is a good strategy for picking this number 
    ' also, not sure what is the difference between a Worker Thread and a Completion thread 
    Const MaxWorkerThreads As Integer = 6 
    Const MaxCompletionPortThreads As Integer = 6 

    Sub Main() 

     ThreadPool.SetMaxThreads(MaxWorkerThreads, MaxCompletionPortThreads) 

     Dim availableWorkerThreads As Integer 
     Dim availableCompletionPortThreads As Integer 

     For i As Integer = 0 To 100 

      ' GetAvailableThreads returns results via output parameters 
      ThreadPool.GetAvailableThreads(availableWorkerThreads, availableCompletionPortThreads) 

      Dim tries As Integer = 0 

      Do While (availableWorkerThreads = 0) 
       ' this loop does not execute if there are available threads 
       ' you may want to add a fail-safe to check "tries" in case the child threads get stuck 
       tries += 1 

       Console.WriteLine(String.Format("waiting to start item {0}, attempt {1}, available threads: {2}, {3}", i, tries, availableWorkerThreads, availableCompletionPortThreads)) 

       ' failure to call Sleep will make your program unresponsive 
       Thread.Sleep(1000) 

       ' call GetAvailableThreads again for the next test at the top of the loop 
       ThreadPool.GetAvailableThreads(availableWorkerThreads, availableCompletionPortThreads) 
      Loop 

      ' this is how you pass parameters to a thread created through QueueUserWorkItem 
      Dim parameters As Object() = {i} 
      ThreadPool.QueueUserWorkItem(AddressOf DoWork, parameters) 
      ' According to MSDN, you must Sleep after calling QueueUserWorkItem, or else the current thread will just exit 
      Thread.Sleep(500) 

     Next 

    End Sub 

    Sub DoWork(parameters As Object()) 
     Dim itemNumber = parameters(0) 
     Dim sleepLength = itemNumber * 1000 
     Console.WriteLine(String.Format("Item: {0} - sleeping for {1} miliseconds.", itemNumber, sleepLength)) 
     Thread.Sleep(sleepLength) 
     Console.WriteLine(String.Format("Item: {0} - done sleeping.", itemNumber)) 
    End Sub 

End Module 
0

Я не знаю, почему именно вы хотите сделать это, но вы можете достичь чего-то очень похожее с помощью BlockingCollection или потока данных, блок с BoundedCapacity набором.

Например, если вы установите емкость 1 и ваши потребители заняты в данный момент, вы не сможете добавить второй элемент в очередь, пока один из потребителей не закончит свою текущую работу и не удалит этот элемент из очереди. И обе версии дают вам возможность подождать, пока вы не сможете добавить еще один элемент в очередь.

+0

Да, я начал читать о потоке данных TPL и ограниченной пропускной способности вчера вечером, очень многообещающе. Я отвечу после некоторых тестов. У вас есть какой-либо пример кода блока потока данных с ограниченной пропускной способностью? –

+0

@TomHalladay Вы можете посмотреть [этот набор тестов] (https://github.com/mono/mono/blob/master/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks. Dataflow/BoundedCapacityTest.cs) в моно версии TDF. – svick

Смежные вопросы