2015-04-14 2 views
1

gabr's answer to another question показывает пример использования Parallel.Pipeline для обработки данных.
На данный момент мне нужно знать, когда был запущен Pipeline и когда все его этапы завершены. Я прочитал ответ другого gabr для этой проблемы How to monitor Pipeline stages in OmniThreadLibrary?. Я попытался сделать это следующим образом (измененная в соответствии с answer):Как узнать состояние этапов трубопровода в OmniThreadLibrary?

unit Unit1; 

interface 

uses 
    Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms, 
    Dialogs, StdCtrls, superobject, 
    OtlCommon, OtlCollections, OtlParallel, OtlComm, OtlTask, ExtCtrls; 

const 
    WM_STARTED = WM_USER; 
    WM_ENDED = WM_USER + 1; 

type 
    TForm1 = class(TForm) 
    btnStart: TButton; 
    btnStop: TButton; 
    lbLog: TListBox; 
    procedure btnStartClick(Sender: TObject); 
    procedure btnStopClick(Sender: TObject); 
    private 
    FCounterTotal: IOmniCounter; 
    FCounterProcessed: IOmniCounter; 
    FIsBusy: boolean; 
    FPipeline: IOmniPipeline; 
    procedure WMStarted(var msg: TOmniMessage); message WM_STARTED; 
    procedure WMEnded(var msg: TOmniMessage); message WM_ENDED; 
    strict protected 
    procedure Async_Files(const input, output: IOmniBlockingCollection; const task: IOmniTask); 
    procedure Async_Parse(const input: TOmniValue; var output: TOmniValue); 
    procedure Async_JSON(const input, output: IOmniBlockingCollection; const task: IOmniTask); 
    end; 

var 
    Form1: TForm1; 

    procedure GetJSON_(const AData: PChar; var Output: WideString); stdcall; external 'my.dll'; 

implementation 

uses IOUtils; 

{$R *.dfm} 

procedure TForm1.Async_Files(const input, output: IOmniBlockingCollection; const task: IOmniTask); 
var 
    i, cnt: integer; 
    f: string; 
begin 
    while not input.IsCompleted do begin 

    task.Comm.Send(WM_STARTED); // message is sent once every 1 min 
    cnt := 0; 

    for f in TDirectory.GetFiles(ExtractFilePath(Application.ExeName), '*.txt') do 
    begin 
     output.TryAdd(f); 
     Inc(cnt); 
     Sleep(1000); // simulate a work 
    end; 
    FCounterTotal.Value := cnt; 

    // I need to continously check a specified folder for new files, with 
    // a period of 1 minute (60 sec) for an unlimited period of time. 
    i := 60; 
    repeat 
     Sleep(1000); // Check if we should stop every second (if Stop button is pushed) 
     if input.IsCompleted then Break; 
     dec(i); 
    until i < 0; 
    end; 
end; 

procedure TForm1.Async_Parse(const input: TOmniValue; var output: TOmniValue); 
var 
    sl: TStringList; 
    ws: WideString; 
begin 
    sl := TStringList.Create; 
    try 
    sl.LoadFromFile(input.AsString); 
    GetJSON_(PChar(sl.Text), ws); // output as ISuperObject --- DLL procedure 
    output := SO(ws); 
//  TFile.Delete(input.AsString); // For testing purposes only - Continue without Deleting Processed File 
    finally 
    sl.Free; 
    end; 
end; 

procedure TForm1.Async_JSON(const input, output: IOmniBlockingCollection; const task: IOmniTask); 
var 
    value: TOmniValue; 
    JSON: ISuperObject; 
    cnt: integer; 
begin 
    for value in input do begin 
    JSON := value.AsInterface as ISuperObject; 
    // do something with JSON 

    cnt := FCounterProcessed.Increment; 
    if FCounterTotal.Value = cnt then 
     task.Comm.Send(WM_ENDED); // !!! message is not sent 
    end; 
end; 

// 
procedure TForm1.btnStartClick(Sender: TObject); 
begin 
    btnStart.Enabled := False; 

    FCounterTotal := CreateCounter(-1); 
    FCounterProcessed := CreateCounter(0); 

    FPipeline := Parallel.Pipeline 
    .Stage(Async_Files, Parallel.TaskConfig.OnMessage(Self)) 
    .Stage(Async_Parse) 
    .Stage(Async_JSON, Parallel.TaskConfig.OnMessage(Self)) 
    .Run; 
end; 

procedure TForm1.btnStopClick(Sender: TObject); 
begin 
    if Assigned(FPipeline) then begin 
    FPipeline.Input.CompleteAdding; 
    FPipeline := nil; 
    end; 

    btnStart.Enabled := True; 
end; 

// 
procedure TForm1.WMEnded(var msg: TOmniMessage); 
begin 
    FIsBusy := False; 
    lbLog.ItemIndex := lbLog.Items.Add(Format('%s - Pipeline stage 3 ended', [DateTimeToStr(Now)])); 
end; 

procedure TForm1.WMStarted(var msg: TOmniMessage); 
begin 
    FIsBusy := True; 
    lbLog.ItemIndex := lbLog.Items.Add(Format('%s - Pipeline stage 1 starting', [DateTimeToStr(Now)])); 
end; 

end. 

С task.Comm.Send(WM_STARTED) все в порядке, но линия task.Comm.Send(WM_ENDED) никогда не выполняется. Как узнать, когда последний этап был завершен? Каков правильный путь?

+0

Ну, стрельба 'WM_STARTED' каждую секунду, даже если нет файла, это нормально? И вы только запускаете 'WM_ENDED', если очередь завершена. Но после обработки элемента он не завершен. Если вы этого хотите, вы должны запустить сообщение внутри цикла. –

+0

@SirRufo Нет, 'WM_STARTED' срабатывает только один раз после нажатия кнопки START. – LuFang

+0

Если это так, то, пожалуйста, покажите нам свой ** настоящий код **. Этот код не компилируется, и после небольшого изменения 'uses'' WM_STARTED' запускается каждую секунду. –

ответ

2

Я благодарю gabr, совет которого использует специальное значение sentinel, помогло мне найти решение для моей проблемы. Этот код работает, как ожидалось:

unit Unit1; 

interface 

uses 
    Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms, 
    Dialogs, StdCtrls, superobject, 
    OtlCommon, OtlCollections, OtlParallel, OtlComm, OtlTask, ExtCtrls; 

const 
    WM_STARTED = WM_USER; 
    WM_ENDED = WM_USER + 1; 

type 
    TForm1 = class(TForm) 
    btnStart: TButton; 
    btnStop: TButton; 
    lbLog: TListBox; 
    procedure btnStartClick(Sender: TObject); 
    procedure btnStopClick(Sender: TObject); 
    private 
    FIsBusy: boolean; 
    FPipeline: IOmniPipeline; 
    procedure WMStarted(var msg: TOmniMessage); message WM_STARTED; 
    procedure WMEnded(var msg: TOmniMessage); message WM_ENDED; 
    strict protected 
    procedure Async_Files(const input, output: IOmniBlockingCollection; const task: IOmniTask); 
    procedure Async_Parse(const input: TOmniValue; var output: TOmniValue); 
    procedure Async_JSON(const input, output: IOmniBlockingCollection; const task: IOmniTask); 
    end; 

var 
    Form1: TForm1; 

    procedure GetJSON_(const AData: PChar; var Output: WideString); stdcall; external 'my.dll'; 

implementation 

uses IOUtils; 

{$R *.dfm} 

procedure TForm1.Async_Files(const input, output: IOmniBlockingCollection; const task: IOmniTask); 
var 
    i: integer; 
    f: string; 
begin 
    while not input.IsCompleted do begin 

    task.Comm.Send(WM_STARTED); // message is sent once every 1 min 

    for f in TDirectory.GetFiles(ExtractFilePath(Application.ExeName), '*.txt') do 
    begin 
     output.TryAdd(f); 
     Sleep(1000); // simulate a work 
    end; 
    output.TryAdd(0); // to send a special 'sentinel' value 

    // I need to continously check a specified folder for new files, with 
    // a period of 1 minute (60 sec) for an unlimited period of time. 
    i := 60; 
    repeat 
     Sleep(1000); // Check if we should stop every second (if Stop button is pushed) 
     if input.IsCompleted then Break; 
     dec(i); 
    until i < 0; 
    end; 
end; 

procedure TForm1.Async_Parse(const input: TOmniValue; var output: TOmniValue); 
var 
    sl: TStringList; 
    ws: WideString; 
begin 
    if input.IsInteger and (input.AsInteger = 0) then begin 
    output := 0; // if we got 'sentinel' value send it to the next stage 
    Exit; 
    end; 

    sl := TStringList.Create; 
    try 
    sl.LoadFromFile(input.AsString); 
    GetJSON_(PChar(sl.Text), ws); // output as ISuperObject --- DLL procedure 
    output := SO(ws); 
//  TFile.Delete(input.AsString); // For testing purposes only - Continue without Deleting Processed File 
    finally 
    sl.Free; 
    end; 
end; 

procedure TForm1.Async_JSON(const input, output: IOmniBlockingCollection; const task: IOmniTask); 
var 
    value: TOmniValue; 
    JSON: ISuperObject; 
begin 
    for value in input do begin 

    if value.IsInteger and (value.AsInteger = 0) then begin 
     task.Comm.Send(WM_ENDED); // if we got 'sentinel' value 
     Continue; 
    end; 

    JSON := value.AsInterface as ISuperObject; 
    // do something with JSON 
    end; 
end; 

// 
procedure TForm1.btnStartClick(Sender: TObject); 
begin 
    btnStart.Enabled := False; 

    FPipeline := Parallel.Pipeline 
    .Stage(Async_Files, Parallel.TaskConfig.OnMessage(Self)) 
    .Stage(Async_Parse) 
    .Stage(Async_JSON, Parallel.TaskConfig.OnMessage(Self)) 
    .Run; 
end; 

procedure TForm1.btnStopClick(Sender: TObject); 
begin 
    if Assigned(FPipeline) then begin 
    FPipeline.Input.CompleteAdding; 
    FPipeline := nil; 
    end; 

    btnStart.Enabled := True; 
end; 

// 
procedure TForm1.WMEnded(var msg: TOmniMessage); 
begin 
    FIsBusy := False; 
    lbLog.ItemIndex := lbLog.Items.Add(Format('%s - Pipeline stage 3 ended', [DateTimeToStr(Now)])); 
end; 

procedure TForm1.WMStarted(var msg: TOmniMessage); 
begin 
    FIsBusy := True; 
    lbLog.ItemIndex := lbLog.Items.Add(Format('%s - Pipeline stage 1 starting', [DateTimeToStr(Now)])); 
end; 

end. 

Альтернативой с использованием Exception в качестве дозорных (еще не работал, но я, вероятно, делать что-то неправильно):

unit Unit1; 

interface 

uses 
    Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms, 
    Dialogs, StdCtrls, superobject, 
    OtlCommon, OtlCollections, OtlParallel, OtlComm, OtlTask, ExtCtrls; 

const 
    WM_STARTED = WM_USER; 
    WM_ENDED = WM_USER + 1; 

type 
    ESentinelException = class(Exception); 

    TForm1 = class(TForm) 
    btnStart: TButton; 
    btnStop: TButton; 
    lbLog: TListBox; 
    procedure btnStartClick(Sender: TObject); 
    procedure btnStopClick(Sender: TObject); 
    private 
    FIsBusy: boolean; 
    FPipeline: IOmniPipeline; 
    procedure WMStarted(var msg: TOmniMessage); message WM_STARTED; 
    procedure WMEnded(var msg: TOmniMessage); message WM_ENDED; 
    strict protected 
    procedure Async_Files(const input, output: IOmniBlockingCollection; const task: IOmniTask); 
    procedure Async_Parse(const input: TOmniValue; var output: TOmniValue); 
    procedure Async_JSON(const input, output: IOmniBlockingCollection; const task: IOmniTask); 
    end; 

var 
    Form1: TForm1; 

    procedure GetJSON_(const AData: PChar; var Output: WideString); stdcall; external 'my.dll'; 

implementation 

uses IOUtils; 

{$R *.dfm} 

procedure TForm1.Async_Files(const input, output: IOmniBlockingCollection; const task: IOmniTask); 
var 
    i: integer; 
    f: string; 
begin 
    while not input.IsCompleted do begin 

    task.Comm.Send(WM_STARTED); // message is sent once every 1 min 

    for f in TDirectory.GetFiles(ExtractFilePath(Application.ExeName), '*.txt') do 
    begin 
     output.TryAdd(f); 
     Sleep(1000); // simulate a work 
    end; 

    raise ESentinelException.Create('sentinel'); 

    // I need to continously check a specified folder for new files, with 
    // a period of 1 minute (60 sec) for an unlimited period of time. 
    i := 60; 
    repeat 
     Sleep(1000); // Check if we should stop every second (if Stop button is pushed) 
     if input.IsCompleted then Break; 
     dec(i); 
    until i < 0; 
    end; 
end; 

procedure TForm1.Async_Parse(const input: TOmniValue; var output: TOmniValue); 
var 
    sl: TStringList; 
    ws: WideString; 
begin 
    sl := TStringList.Create; 
    try 
    sl.LoadFromFile(input.AsString); 
    GetJSON_(PChar(sl.Text), ws); // output as ISuperObject --- DLL procedure 
    output := SO(ws); 
//  TFile.Delete(input.AsString); // For testing purposes only - Continue without Deleting Processed File 
    finally 
    sl.Free; 
    end; 
end; 

procedure TForm1.Async_JSON(const input, output: IOmniBlockingCollection; const task: IOmniTask); 
var 
    value: TOmniValue; 
    JSON: ISuperObject; 
begin 
    for value in input do begin 

    if value.IsException and (value.AsException is ESentinelException) then begin 
     task.Comm.Send(WM_ENDED); // if we got 'sentinel' Exception 
     value.AsException.Free; 
    end 
    else begin 
     JSON := value.AsInterface as ISuperObject; 
     // do something with JSON 
    end; 
    end; 
end; 

// 
procedure TForm1.btnStartClick(Sender: TObject); 
begin 
    btnStart.Enabled := False; 

    FPipeline := Parallel.Pipeline 
    .Stage(Async_Files, Parallel.TaskConfig.OnMessage(Self)) 
    .Stage(Async_Parse) 
    .Stage(Async_JSON, Parallel.TaskConfig.OnMessage(Self)) 
    .HandleExceptions 
    .Run; 
end; 

procedure TForm1.btnStopClick(Sender: TObject); 
begin 
    if Assigned(FPipeline) then begin 
    FPipeline.Input.CompleteAdding; 
    FPipeline := nil; 
    end; 

    btnStart.Enabled := True; 
end; 

// 
procedure TForm1.WMEnded(var msg: TOmniMessage); 
begin 
    FIsBusy := False; 
    lbLog.ItemIndex := lbLog.Items.Add(Format('%s - Pipeline stage 3 ended', [DateTimeToStr(Now)])); 
end; 

procedure TForm1.WMStarted(var msg: TOmniMessage); 
begin 
    FIsBusy := True; 
    lbLog.ItemIndex := lbLog.Items.Add(Format('%s - Pipeline stage 1 starting', [DateTimeToStr(Now)])); 
end; 

end. 
+0

Решение на основе исключения работает здесь очень хорошо, если я удалю вызов GetJSON_, который существует в DLL, которого у меня нет. – gabr

+0

@gabr Это очень странная вещь ... Я прокомментировал все связанные с dll и json строки, но проблема все еще сохраняется. Сообщения «WM_STARTED» и «WM_ENDED» отправляются только один раз после нажатия кнопки «Пуск». Я смущен. – LuFang

2

Ваш подход (который я изначально предложил) имеет условие гонки, которое мешает ему работать. (К сожалению, это был недостаток в моей первоначальной конструкции.)

В основном, что случается:

  • Async_Files посылает последний файл в трубопровод.
  • Блок Async_Files (имитирующий некоторую рабочую нагрузку).
  • Async_JSON получает и обрабатывает последний файл.
  • Async_Files теперь устанавливает счетчик FCounterTotal.

В тот момент Async_JSON уже ждет следующих данных, которые никогда не появляются, и больше не проверяет значение FCounterTotal.

Альтернативный подход заключается в отправке специального значения sentinel в конвейер в качестве последнего элемента.

Исключение может также использоваться в качестве часового. Если вы возбудите исключение на первом этапе, он «пройдет» по конвейеру до конца, где вы сможете его обработать. Никакая специальная работа не должна выполняться на каком-либо конкретном этапе - по умолчанию этап будет просто отменять исключение.

+0

Я пробовал, но это не работает или что-то я делаю неправильно. Можете ли вы любезно взглянуть на настоящий исходный код, чтобы хотя бы догадаться, какая из них создает эту проблему? [test_OTL_SO_with_counter.zip] (https://drive.google.com/file/d/0B4YFAQ236k7lMHpWVHNrclFuclk/view?usp=sharing) – LuFang

+1

@LuFang Что значит «настоящий исходный код»? Почему вы спрашиваете об одном фрагменте кода, а затем комментируете другой фрагмент кода.Вы должны предоставить MCVE в вопросе, и тогда никто не тратит время на работу с поддельным кодом. –

+0

@gabr Я изменил свой вопрос, как сказал Дэвид (заменил мой псевдо-фальшивый код на реальный код). Я был бы признателен, если бы вы могли изучить этот код и предложить, что я делаю неправильно. – LuFang

Смежные вопросы