2012-01-26 7 views
7

Прежде всего, я все еще знакомлюсь с многопоточными и не знаю много терминов. Мне нужно убедиться, что я делаю это правильно, потому что это чувствительная тема.Управление динамическим числом потоков

Технические характеристики

Что я строю это компонент, который будет содержать динамический число потоков. Каждый из этих потоков повторно используется для выполнения нескольких запросов. Я могу предоставить все необходимые детали для потока, когда я его создаю, и перед тем, как выполнить его, а также предоставить обработчики событий. Как только он будет выполнен, я в значительной степени выполнил один запрос, и я подаю другой запрос. Запросы подаются в эти потоки из другого автономного фонового потока, который постоянно обрабатывает очередь запросов. Таким образом, эта система имеет два списка: 1) Список записей запроса и 2) Список указателей потоков.

Я использую потомки класса TThread (по крайней мере, это метод потоков, с которым я знаком). Я получаю обратную связь от потоков, синхронизируя триггеры событий, которые я назначил при создании потоков. Потоки загружают и сохраняют данные в фоновом режиме, и когда они завершатся, они перезагружаются, чтобы обработать следующий запрос.

Проблема

Теперь проблема начинается при принятии решения о том, как обрабатывать событие изменения числа разрешенных потоков (через свойство компонента ActiveThreads: TActiveThreadRange который TActiveThreadRange = 1..20). Следовательно, между 1 и 20 потоками, создаваемыми за один раз, может быть где угодно. Но когда, скажем, приложение, использующее этот компонент, изменит это свойство с 5 на 3. В это время уже создано 5 потоков, и я не хочу принудительно освобождать этот поток, если он оказывается занятым. Мне нужно подождать, пока это не будет сделано, прежде чем я освобожу его. А с другой стороны, если свойство изменено с 3 на 5, мне нужно создать 2 новых потока. Мне нужно знать правильный подход к «отслеживанию» этих потоков в этом сценарии.

Возможности

Вот некоторые из возможных способов, которыми я могу думать о том, чтобы «следе» эти нити ...

  • Ведите TList, содержащий каждую созданную нить - легко управлять
  • Создать a TList обертка или потомок, содержащий каждый созданный поток - проще в управлении, но больше работы
  • Держите array contai ning каждый созданный поток - будет ли это лучше, чем TList?
  • Создать обертку массива, содержащей каждую созданной нить

Но затем вернуться к моему первоначальному вопросу - Что делать с существующими занятыми нитями, когда свойство ActiveThreads уменьшается? Создание их не проблема, но освобождение их становится запутанным. Я обычно делаю потоки, которые освобождают себя, но это первый раз, когда я создал тот, который повторно используется. Мне просто нужно знать правильный метод уничтожения этих потоков, не прерывая их задач.

Update

На основе обратной связи, я приобрел и начал реализацию OmniThreadLibrary (а также долго необходимой FastMM). Я также изменил свой подход немного - способ, что я могу создать эти резьбовые процессы без управления ими и без другого потока для обработки очереди ...

  • 1 мастер-метод, чтобы породить новый процесс
    • function NewProcess(const Request: TProcessRequest): TProcessInfo;
    • TProcessRequest представляет собой запись с характеристиками того, что должно быть сделано (имя файла, параметры и т.д.)
    • TProcessInfo является запись, которая передает обратно информацию о состоянии.
  • Подача обработчика события для случая «выполнения» с его задачей при создании нового процесса. Когда компонент получает это сообщение, он проверяет очередь.
    • Если команда находится в очереди, он будет сравнивать активный предельный процесс с текущим процессом подсчитывать
    • > Если превышает предел, только остановится и следующий завершен процесс будет делать выполнить одну проверку
    • > Если в пределе, пинают от другого новый процесс (после обеспечения предыдущего процесса делается)
    • Если ни одна из команд не выстраиваются в очередь, а затем просто остановить
  • Каждый процесс может умереть в одиночку после того, как он выполнил свою задачу (без поддержания активности потоковая)
  • не придется беспокоиться о другом таймер или поток постоянно через петлю
    • Вместо того, чтобы каждый процесс разрушает его личность и проверяет наличие новых запросов, прежде чем делать так

Очередное обновление

Я действительно вернусь к использованию TThread, так как OTL очень неудобно использовать. Мне нравится держать вещи обернутыми и организованными в своем классе.

+0

То, о чем вы просите, известно как «пул потоков»; возможно, это поможет вам найти ресурсы. –

+5

См. [Delphi-threaded-list-of-thread-jobs-queueing] (http://stackoverflow.com/questions/1805633/delphi-threaded-list-of-thread-jobs-queueing). И не сверните свой собственный пул потоков, посмотрите на [OTL-OmniThreadLibrary] (http://code.google.com/p/omnithreadlibrary/). –

+0

Мой личный опыт заключается в том, что проще работать с API Windows напрямую, чем полагаться на 'TThread', если вы выполняете более сложную работу. На самом деле, очень легко начать потоки с использованием Windows API. Начните с простых экспериментов с ['CreateThread'] (http://msdn.microsoft.com/en-us/library/windows/desktop/ms682453 (v = vs.85) .aspx). Просто остерегайтесь, что вы, как разработчик Delphi, вероятно, должны использовать оболочку 'System.BeginThread' вместо' CreateThread', но, конечно же, документация MSDN «CreateThread» остается в силе. –

ответ

3

О вашей проблеме с уменьшением активных потоков: извините, но вам просто нужно решить для себя. Либо освободите ненужные потоки немедленно (что завершает их в самый ранний возможный момент), либо позволяют им работать до тех пор, пока они не будут завершены (что завершает их после завершения всей работы). Это твой выбор. Конечно, вам нужно отделить переменную для желаемого числа от числа фактического количества потоков. Проблема обновления фактического количества переменных потоков (может быть просто List.Count) одинакова, поскольку для любого решения потребуется некоторое время.

И по управлению несколькими потоками: вы можете изучить this answer, который хранит потоки в TList. Он нуждается в небольшой настройке для вашего конкретного списка пожеланий, хотя, пожалуйста, кричите в случае необходимости помощи с этим. Кроме того, есть, конечно, более возможные реализации, которые могут быть получены из использования TThread по умолчанию. И обратите внимание, что существуют другие (многопоточные) потоковые библиотеки, но у меня никогда не было необходимости их использовать.

+1

Нет никакой разницы между освобождением немедленно и до тех пор, пока они не будут закончены. 'Destroy' называет' WaitFor'. –

+2

FreeOnTerminate: = true. Передайте задачу ядовитой таблетки в очередь задач. Нет флага, нет TThread.WaitFor, нет необходимости обращаться к экземпляру TThread, поэтому нет необходимости в списке потоков, поэтому никакого управления потоками не требуется, поэтому большинство извращающего кода блокировки блокировки устранены. –

+0

@David Это зависит от задачи Thread. Если поток имеет цикл, который хорошо проверяет флаг «Завершение», то есть. Если поток выполняет только одно, а параметр «Завершение до False» не влияет на время работы, значит, вы правы, но тогда я также не понимаю вопрос OP. – NGLN

5

Как объясняется @NGLN и т. Д., Вам необходимо слить несколько потоков и принять, что самый простой способ управления номерами потоков - развести фактическое количество потоков с нужного числа. Добавление потоков в пул легко - просто создайте еще несколько экземпляров (передавая очередь ввода-запроса производителя-потребителя в качестве параметра, чтобы поток знал, что ждать). Если желаемое количество потоков меньше, чем существующее в настоящее время, вы можете поставить достаточно «ядовитых таблеток», чтобы убить лишние потоки.

Не держите список указателей на потоки - это сложная проблема с микроконтролем, которая просто не нужна (и, вероятно, будет ошибкой). Все, что вам нужно, это подсчет количества нужных потоков в пуле, чтобы вы знали, какое действие нужно предпринять, когда что-то изменяет свойство poolDepth.

Триггеры событий лучше всего загружаются в задания, которые выдаются пулу - сбрасывают их все из некоторого класса TpooledTask, который принимает событие в качестве параметра конструктора и сохраняет его в некотором «FonComplete» TNotifyEvent. Поток, который запускает задачу, может вызвать FonComplete, когда он выполнил задание (с TpooledTask в качестве параметра отправителя) - вам не нужно знать, какой поток выполнял задачу.

Пример:

unit ThreadPool; 

    interface 

    uses 
     Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms, 
     Dialogs, StdCtrls, contnrs, syncobjs; 


    type 

    TpooledTask=class(TObject) 
    private 
     FonComplete:TNotifyEvent; 
    protected 
     Fparam:TObject; 
     procedure execute; virtual; abstract; 
    public 
     constructor create(onComplete:TNotifyEvent;param:TObject); 
    end; 

    TThreadPool=class(TObjectQueue) 
    private 
     access:TcriticalSection; 
     taskCounter:THandle; 
     threadCount:integer; 
    public 
     constructor create(initThreads:integer); 
     procedure addTask(aTask:TpooledTask); 
    end; 

    TpoolThread=class(Tthread) 
    private 
     FmyPool:TThreadPool; 
    protected 
     procedure Execute; override; 
    public 
     constructor create(pool:TThreadPool); 
    end; 

    implementation 

    { TpooledTask } 

    constructor TpooledTask.create(onComplete: TNotifyEvent; param: TObject); 
    begin 
     FonComplete:=onComplete; 
     Fparam:=param; 
    end; 

    { TThreadPool } 

    procedure TThreadPool.addTask(aTask: TpooledTask); 
    begin 
     access.acquire; 
     try 
     push(aTask); 
     finally 
     access.release; 
     end; 
     releaseSemaphore(taskCounter,1,nil); // release one unit to semaphore 
    end; 

    constructor TThreadPool.create(initThreads: integer); 
    begin 
     inherited create; 
     access:=TcriticalSection.create; 
     taskCounter:=createSemaphore(nil,0,maxInt,''); 
     while(threadCount<initThreads) do 
     begin 
     TpoolThread.create(self); 
     inc(threadCount); 
     end; 
    end; 

    { TpoolThread } 

    constructor TpoolThread.create(pool: TThreadPool); 
    begin 
     inherited create(true); 
     FmyPool:=pool; 
     FreeOnTerminate:=true; 
     resume; 
    end; 

procedure TpoolThread.execute; 
var thisTask:TpooledTask; 
begin 
    while (WAIT_OBJECT_0=waitForSingleObject(FmyPool.taskCounter,INFINITE)) do 
    begin 
    FmyPool.access.acquire; 
    try 
     thisTask:=TpooledTask(FmyPool.pop); 
    finally 
     FmyPool.access.release; 
    end; 
    thisTask.execute; 
    if assigned(thisTask.FonComplete) then thisTask.FonComplete(thisTask); 
    end; 
end; 

end. 
+1

@Jerry - в ответ на ваш следующий вопрос: «Как я могу вызвать TThread.Sychronize, если у меня нет экземпляра потока?» - Не делай этого. Если вам нужны результаты основного потока, отправьте задачу PostMessage в обработчик OnComplete. –

+0

Большая часть этого заключается в том, что я должен повторно использовать эти потоки. Каждый поток будет создан и работает без остановок. Если у него нет запросов на обработку, он просто будет мертв. Но когда ему дана задача выполнить, она будет продолжать использоваться повторно для каждой заданной ей задачи. Как уже упоминалось выше, ваш метод «забыть» о них - это то, к чему я уже привык, но концепция повторного использования их - это то место, где я теряюсь. –

+0

Джерри, у меня обычно есть безопасный список потоков (или очередь, если хотите). Каждый поток имеет ссылку на эту очередь и спит, когда он пуст. Пока есть элементы для удаления из очереди, потоки заняты работой. –

4

Вы можете реализовать FreeNotify сообщение в очереди запросов и когда рабочий поток receieve это сообщение освобождаться. В вашем примере, когда вы уменьшаете количество потоков от 5 до 3, просто поместите 2 бесплатных сообщения FreeNotify в свою очередь, и 2 рабочих потока будут бесплатными.

+1

Да - самый простой способ: Без ограничений/WaitFor/OnTerminate (т. Е. Без тупика :) –

+0

+1 Звучит очень вероятно, справедливая идея. Все, что мне нужно было бы выяснить, как решить *, которые * 2 из 5 наиболее применимо для уничтожения. Другими словами, если у меня есть 5 потоков, 3 из них заняты, а 2 бездействуют, тогда мне захочется уничтожить те, которые простаивают. Не нужна помощь с этим, просто чтобы указать это ... –

+1

Вам не нужно вычислять ружения. Если у вас 5 нитей, 3 заняты, и вы хотите убить 3, нажмите на 3 ядовитых таблетки. Обе нити, которые простаивают, немедленно получат запрос о самоубийстве и нюхают его. Другая таблетка остается в очереди, пока другая нить не закончит свою задачу, получает последнюю дозу и умирает. Это оставляет 2 потока - работа выполнена! –

Смежные вопросы