2014-12-11 2 views
3

Я пишу параллельный код, чтобы перечислить большой набор файлов CSV, каждый из которых содержит исторические данные запаса (более 6500 символов) и рассчитать, имеет ли каждый запас достигла своего рекордного уровня.многопоточное многопоточное ускорение: чтение через CSV с использованием TMemoryStream

Я реализовал класс Thread Pool и TThread descendant, чтобы разделить список символов поровну между Threads, которые затем выделяются для разделов ядра моей машины i7. Я настраиваю потоки для каждого, у кого есть копия всех необходимых им данных, когда они создаются, прежде чем их нет, поэтому нет необходимости в блокировке, если потоки обрабатываются. Как только все потоки закончатся, я суммирую данные результата каждого потока в основной программе.

В настоящее время я проверил свой код, используя несколько многопоточных менеджеров памяти, упомянутых https://stackoverflow.com/questions/6072269/need-multi-threading-memory-manager/6076407#6076407. Пока что SapMM представляется наиболее эффективным без нарушения доступа.

Проблема в том, что добавление большего количества потоков не пропорционально ускоряет время, необходимое для завершения расчета всех максимумов. Использование 2 "core-d" нитей не сокращает время работы на 1/2, но 3 не полностью разрезается на 1/3, а 4 не обрезается около 1/4.

Количество потоков 1, 2 , 3, 4

Прогнозируемые Форсировочная Время (мм: сс) 6:37, 3:17 1/2, 1/3 2:12 , 1:39 1/4

Фактическое время (мм: сс) 6:37, 4:07 , 3:05, 2:51

Я пришел к тому моменту, когда мне нужна дополнительная информация, чтобы получить полное ускорение этой операции. Мне нужно понять, почему многоуровневое ускорение завершается, а не просто «обманывать края проблемы». Итак, что заставляет этот код прекратить получать пропорциональные прибыли и что мне нужно сделать для достижения этих выгод? Короче говоря, есть ли другой подход к ускорению анализа, который я делаю, например. а не использовать TMemoryStream?

Код, который я использую, приведен ниже.

Я использую Delphi XE4 Enterprise.

В каждой нити I петле через каждый символ, и:

  1. Используйте TMemoryStream.LoadFromFile, чтобы загрузить исторические данные символа.
  2. Используйте функцию, которую я написал, чтобы получить наивысший максимум данных символа непосредственно из TMemoryStream.

(1) протестирован и не требует времени (менее 1 секунды общего времени, затрачиваемого на загрузку всего 6500, по одному в память). Процедура я использую в (2) является то, что принимает все время, и указано ниже:

unit uTest; 

implementation 

uses 
    SysUtils, Math, Classes; 

type 
    TDayIndexData = record 
     Date: TDate; 
     Open, High, Low, Close, AdjClose, 
     Volume: extended; 
    end; 

type 
    TTimeUnit = (tuDay, tuWeek, tuMonth, tuYear); 

    TTimePeriod = record 
    Length: integer; 
    TimeUnit: TTimeUnit; 
    end; 

//#NO CHANGE 
const 
    AllDataPeriodStr = 'All Data'; 

type 
    TRatePeriod = record 
    PeriodStr: string; 
    TimePeriod: TTimePeriod; 
    end; 

type 
    TFieldType = (ftDate, ftOpen, ftHigh, ftLow, ftClose, ftVolume, ftAdjClose); 

const CSV_DELIM_CHARSET = [#0..#31, ',',#127]; 

type 
    TShallowEquityNewHighInfoRetrievalResults = record 
    Success: boolean; 
    High: extended; 
    end; 

function ShallowEquityNewHighInfoRetrieval(
    AStream: TStream; 
    ARatePeriod: TRatePeriod; 
    AGetNormalData: boolean = False): TShallowEquityNewHighInfoRetrievalResults; 

var 
    vStreamSize: int64; 

    function EOF: boolean; 
    begin 
    Result := AStream.Position >= vStreamSize;//AStream.Size; 
    end; 

    procedure GotoEOF; 
    begin 
    AStream.Seek(0, soFromEnd); 
    end; 

//#OPTIMIZE 
//var 
    //vBuffer: FileString; 

    type 
    FileChar = AnsiChar; 
    FileString = AnsiString; 

    const 
    ResultCharSize = SizeOf(FileChar); 

    var 
    MRReadChar: FileChar; 

    procedure ReadNextChar; 
    begin 
    if not EOF then 
     AStream.Read(MRReadChar, SizeOf(MRReadChar)) else 
     raise EInvalidOperation.Create('Unexpected end of file found'); 
    end; 

    var 
    vPossDelimChars: boolean; 

    procedure SkipExistingDelimChars; 
    begin 
    //*INTENTION: prevents redundant SkipDelimChars calls, which is destructive 
    if not vPossDelimChars then Exit; 

    //not requiring DelimChars 

    if EOF then Exit; 

    repeat 
     ReadNextChar; 
    until EOF or not (MRReadChar in CSV_DELIM_CHARSET); 

    //#*NOTE: technically can be true if EOF, 
    //but if EOF then CurChar is never used 3/13/2014 
    vPossDelimChars := False; 
    end; 

    function SOF: boolean; 
    begin 
    Result := AStream.Position = 0; 
    end; 

    function NextChars(ACount: integer): FileString; 
    begin 
    //#OPTIMIZE: condition 
    if ResultCharSize = 1 then 
     begin 
     SetLength(Result, Min(ACount, vStreamSize{AStream.Size} - AStream.Position)); 
     AStream.Read(Pointer(Result)^, Length(Result)); 
     AStream.Seek(-Length(Result), soFromCurrent); 
     end else 
     begin 
      SetLength(Result, Min(ACount, (vStreamSize{AStream.Size} - AStream.Position) div ResultCharSize)); 
      AStream.Read(Pointer(Result)^, Length(Result) * ResultCharSize); 
      AStream.Seek(-Length(Result) * ResultCharSize, soFromCurrent); 
     end; 
    end; 

    procedure GotoNextChars(ACount: integer); 
    begin 
    //#OPTIMIZE: condition 
    if ResultCharSize = 1 then 
     AStream.Seek(ACount, soFromCurrent) else 
     AStream.Seek(ACount*SizeOf(FileChar), soFromCurrent); 
    end; 

    procedure GotoPrevChars(ACount: integer); 
    begin 
    //#OPTIMIZE: condition 
    if ResultCharSize = 1 then 
     AStream.Seek(-ACount, soFromCurrent) else 
     AStream.Seek(-ACount*SizeOf(FileChar), soFromCurrent); 
    end; 

    procedure GotoPreceedingEOLN(ForItem: boolean = False); 
    var 
    vOrigPos: integer; 

    const 
    NMinRowChars = 17;//Length('3-13-13,1,1,1,1,1') 

    begin 
    //assumes will not hit SOF 
    //assumes ending CRLF taken care of by other places 
    vOrigPos := AStream.Position; 

    vPossDelimChars := True; 

    while (NextChars(2) <> #13#10) or (AStream.Position = vOrigPos) do 
     if (Length(NextChars(2)) = 2) and (NextChars(2)[2] = #10) and 
     (AStream.Position < vOrigPos - SizeOf(FileChar)) then 
      begin 
      GotoNextChars(1); 

      Exit; 
      end else 
     if (AStream.Position = vOrigPos) and ForItem then 
      GotoPrevChars(NMinRowChars) else 
      GotoPrevChars(1); 
    end; 

    var 
    CurField: string; 
    CurCol: integer; 

    procedure InitParsingState; 
    begin 
    //Initialize Parsing State 
    CurCol := -1; 
    vPossDelimChars := True; 
    SkipExistingDelimChars; 
    vStreamSize := AStream.Size; 
    end; 

    procedure BacktrackTo(APos: integer; ASafeMode: boolean = False); 
    begin 
    if ASafeMode then 
     AStream.Seek(Pred(APos), soFromBeginning) else 
     AStream.Seek(APos, soFromBeginning); 

    ReadNextChar; 
    vPossDelimChars := False; 
    CurCol := Ord(High(TFieldType)); 
    end; 

    procedure ReadQuotedText; 
    var 
    vHadPrevQuoteChar: boolean; 
    begin 
    vHadPrevQuoteChar := False; 
    while MRReadChar = '"' do 
     begin 
     if vHadPrevQuoteChar then 
      CurField := CurField + MRReadChar; 
     ReadNextChar; 

     while MRReadChar <> '"' do 
      begin 
      CurField := CurField + MRReadChar; 
      ReadNextChar; 
      end; 

     if EOF then 
      break; 

     ReadNextChar; 
     vHadPrevQuoteChar := True; 
     end; 
    end; 

    procedure GetNextFieldValue; 
    begin 
    if EOF then Exit; 

    CurCol := (CurCol+1) mod Succ(Ord(High(TFieldType))); 
    CurField := ''; 
    if MRReadChar = '"' then 
     ReadQuotedText else 
     begin 
      repeat 
      CurField := CurField + MRReadChar; 
      if not EOF then 
       ReadNextChar; 
      until EOF or (MRReadChar in CSV_DELIM_CHARSET); 
      if EOF then 
      if not (MRReadChar in CSV_DELIM_CHARSET) then 
       CurField := CurField + MRReadChar; 
     end; 
    vPossDelimChars := True; 

    SkipExistingDelimChars; 
    end; 

    var 
    ColFieldTypes: array [Ord(Low(TFieldType))..Ord(High(TFieldType))] of TFieldType; 

    procedure ResolveCurColFieldType; 
    var 
    vField: string; 
    begin 
    vField := LowerCase(CurField); 
    if vField = 'date' then 
     ColFieldTypes[CurCol] := ftDate else 
    if vField = 'open' then 
     ColFieldTypes[CurCol] := ftOpen else 
    if vField = 'high' then 
     ColFieldTypes[CurCol] := ftHigh else 
    if vField = 'low' then 
     ColFieldTypes[CurCol] := ftLow else 
    if vField = 'close' then 
     ColFieldTypes[CurCol] := ftClose else 
    if vField = 'volume' then 
     ColFieldTypes[CurCol] := ftVolume else 
    if Pos('close', vField) > 0 then 
     ColFieldTypes[CurCol] := ftAdjClose else 
     raise EInvalidOperation.Create('Unrecognized file format: unrecognized column name found.'); 
    end; 

    procedure WriteItemAsFieldValue(var AData: TDayIndexData); 
    begin 
    case ColFieldTypes[CurCol] of 
     ftDate:AData.Date := ExStrToDate(CurField); 
     ftOpen:AData.Open := StrToFloat(CurField); 
     ftHigh:AData.High := StrToFloat(CurField); 
     ftLow:AData.Low := StrToFloat(CurField); 
     ftClose:AData.Close := StrToFloat(CurField); 
     ftVolume:AData.Volume := StrToFloat(CurField); 
     ftAdjClose:AData.AdjClose := StrToFloat(CurField); 
    end; 
    end; 

    procedure VerifyFields; 
    var 
    iField: TFieldType; 
    iColumn: integer; 

    IsUsedFlags: array [Low(TFieldType)..High(TFieldType)] of boolean; 

    begin 
    //* Set all to false 
    for iField := Low(TFieldType) to High(TFieldType) do 
     IsUsedFlags[iField] := False; 

    //* set found to true 
    for iColumn := Low(ColFieldTypes) to High(ColFieldTypes) do 
     IsUsedFlags[ColFieldTypes[iColumn]] := True; 

    //* throw error on first one not found 
    for iField := Low(TFieldType) to High(TFieldType) do 
     if not IsUsedFlags[iField] then 
     begin 
      raise EInvalidOperation.Create('Bad file format: one or more column names are missing!'); 
      break; 
     end; 
    end; 

    procedure LoadHeader; 
    var 
    iField: TFieldType; 

    begin 
    for iField := Low(TFieldType) to High(TFieldType) do 
     begin 
     GetNextFieldValue; 
     ResolveCurColFieldType; 
     end; 

    VerifyFields; 

    if EOF then 
     raise EInvalidOperation.Create('Cannot complete shallow Equity New High Info Retrieval: Not enough Data') 
    end; 

    procedure LoadRowInto(var ADayData: TDayIndexData); 
    var 
    iField: TFieldType; 
    begin 
    for iField := Low(TFieldType) to High(TFieldType) do 
     begin 
     GetNextFieldValue; 
     WriteItemAsFieldValue(ADayData); 
     end; 
    end; 

    var 
    OrderReversed: boolean; 

    vTopDay, 
    vBottomDay, 

    vFirstDay, 
    vEarlierDay, 
    vLastDay: TDayIndexData; 

    vBeginDate: TDate; 

    vBeforeLastDayPos, 
    vFirstDayPos, 
    vAfterFirstDayPos: integer; 

    function HasUnprocessedDays: boolean; 
    begin 
    //** use Position of stream because we don't always have the first day in the 
    // file, due to optimization 
    Result := (
     ((AStream.Position > vFirstDayPos) and not OrderReversed) or 

     (((AStream.Position < AStream.Size - SizeOf(FileChar)*Length(#13#10)) or 
     (AStream.Position < AStream.Size - SizeOf(FileChar)*Length(#10))) 
     and OrderReversed)); 
    end; 

    function NotYetCoveredTimePeriod: boolean; 
    begin 
    Result := 
     (ARatePeriod.PeriodStr = AllDataPeriodStr) 
     or 
     (
     (ARatePeriod.PeriodStr <> AllDataPeriodStr) and 
     (vEarlierDay.Date >= vBeginDate) 
    ); 
    end; 

    function FoundAllNeededData: boolean; 
    begin 
    Result := (
     (ARatePeriod.PeriodStr <> AllDataPeriodStr) and 
     (vEarlierDay.Date <= vBeginDate) 
    ) or 
    (ARatePeriod.PeriodStr = AllDataPeriodStr); 
    end; 

    procedure GotoLastDay; 
    begin 
    //** Goto End of File 
    GotoEOF; 

    //** Goto Just before Last Day 
    GotoPreceedingEOLN; 
    if (AStream.Position = AStream.Size - SizeOf(FileChar)*Length(#13#10)) or 
     (AStream.Position = AStream.Size - SizeOf(FileChar)*Length(#10)) then 
     GotoPreceedingEOLN; 

    SkipExistingDelimChars; 
    end; 

    procedure DetermineDataOrder; 
    begin 
    //#ASSUMPTION: assume end day at BOTTOM of file if latest data less than 2 days ago 
    //Problem when NDays = 2 ? 

    if Trunc(Now) - Trunc(vBottomDay.Date) >= 2 then 
     begin 
     //** Get Top Day 
     BacktrackTo(vFirstDayPos, True); 
     LoadRowInto(vTopDay); 

     //** Determine what order the data is in 
     OrderReversed := vBottomDay.Date < vTopDay.Date; 

     if not OrderReversed then 
      BacktrackTo(vBeforeLastDayPos, True); 

     if OrderReversed then 
      vFirstDay := vBottomDay else 
      vFirstDay := vTopDay; 

     if OrderReversed then 
      vLastDay := vTopDay else 
      vLastDay := vBottomDay; 
     end else 
     begin 
      OrderReversed := False; 

      //vLastDay := vTopDay; 
      vLastDay := vBottomDay; 
     end; 
    end; 

    procedure LoadPrevRow; 
    var 
    vBeforeDayPos: integer; 

    begin 
    GotoPreceedingEOLN(True); 

    vBeforeDayPos := AStream.Position; 

    SkipExistingDelimChars; 
    LoadRowInto(vEarlierDay); 

    AStream.Seek(vBeforeDayPos, soFromBeginning); 
    end; 

begin 
    //* Initialize 
    Result.Success := False; 
    AStream.Seek(0, soFromBeginning); 
    InitParsingState; 

    //** Load CSV Header 
    LoadHeader; 
    vFirstDayPos := AStream.Position; 

    //** Get Last Day 
    GotoLastDay; 
    vBeforeLastDayPos := AStream.Position; 
    LoadRowInto(vBottomDay); 

    //** IF Only 1 Data Day: 
    if vFirstDayPos = vBeforeLastDayPos then 
    begin 
     //return results 
     Result.Success := True; 
     Result.High := vBottomDay.High; 
     Exit; 
    end; 

    //** Go back to Last Day in File 
    BacktrackTo(vBeforeLastDayPos); 

    //** Determine what order the data is in 
    DetermineDataOrder; 

    //** Determine Date to scan back to if opted for 
    if ARatePeriod.PeriodStr <> AllDataPeriodStr then 
    vBeginDate := MoveDateBack(vLastDay.Date, ARatePeriod.TimePeriod); 

    //* Initialize Loop Variables 
    Result.High := vLastDay.High; 
    vEarlierDay := vLastDay; 

    while HasUnProcessedDays and NotYetCoveredTimePeriod do 
    begin 
     //** Goto Previous Day's Row 
     if OrderReversed then 
     LoadRowInto(vEarlierDay) else 
      LoadPrevRow; 

     //** Update High 
     if NotYetCoveredTimePeriod then 
     Result.High := Max(Result.High, vEarlierDay.High); 
    end; 

    Result.Success := FoundAllNeededData; 
end; 

end. 

Пример CSV ниже. Обратите внимание, что иногда позиции CSV находятся в обратном порядке в файле (последняя дата сначала).

Date,Open,High,Low,Close,Volume,Adj Close 
11/3/2014,12,12.06,11.75,11.98,19700,11.98 
11/4/2014,12,12,10.62,11.55,39200,11.55 
11/5/2014,11.6,11.85,11.6,11.85,3100,11.85 
11/6/2014,11.85,11.85,11.85,11.85,0,11.85 
11/7/2014,11.5,11.5,10.35,11,35900,11 
11/10/2014,11.12,11.12,11.12,11.12,200,11.12 
11/11/2014,11.5,11.5,11.5,11.5,200,11.5 
11/12/2014,11.75,11.85,11.15,11.45,3500,11.45 
11/13/2014,11.45,11.45,11.45,11.45,0,11.45 
11/14/2014,11.45,11.45,11.45,11.45,0,11.45 
11/17/2014,11.07,11.28,11.07,11.28,1600,11.28 
11/18/2014,11.07,11.74,11.06,11.74,8100,11.74 
11/19/2014,11.1,11.5,11,11.5,11600,11.5 
11/20/2014,11.1,11.5,11.1,11.5,3100,11.5 
11/21/2014,11.49,11.5,11.23,11.25,15100,11.25 
11/24/2014,11.25,11.35,11.25,11.25,900,11.25 
11/25/2014,11.48,11.5,11.25,11.5,355300,11.5 
11/26/2014,11.75,11.75,11.5,11.5,261300,11.5 
11/28/2014,11.75,11.8,11.75,11.8,16300,11.8 
12/1/2014,11.25,11.8,11.02,11.5,23800,11.5 
12/2/2014,11.6,11.6,11.47,11.5,57600,11.5 
12/3/2014,11.57,11.75,11.41,11.69,240700,11.69 
12/4/2014,11.74,11.75,11.49,11.65,41100,11.65 
12/5/2014,11.65,11.85,11.56,11.8,267200,11.8 
12/8/2014,11.8,11.85,11.68,11.8,168700,11.8 
+4

Файловый ввод-вывод - это более или менее последовательный процесс, не ожидайте параллельного усиления для этого. –

+2

Да. Диск ввода-вывода - это, конечно, горло бутылки. Многопоточность предназначена для задач, связанных с ЦП. Прежде чем инвестировать время разработки на оптимизацию, убедитесь, что вы оптимизируете реальное узкое место. –

+0

Вы также, кажется, смешивали несколько разных аспектов в одной функции. Это затрудняет работу с вашим кодом. Вам действительно нужно определить код, чтобы каждая часть имела дело с одной задачей. –

ответ

0

Во-первых, попробуйте взять Intel Threading Building Blocks в качестве менеджера памяти. Это весы довольно хорошо по крайней мере до 16 ядер (у меня была аналогичная проблема здесь в Why multithreaded memory allocate/deallocate intensive application does not scale with number of threads?

В общем, даже с Intel TBB в главном избегать контура выполнения нити динамически выделять/освобождать память. Эти операции масштабирования всегда плохо.

  • Выделяют достаточно памяти (фиксированный) для обработки до начала цикла, и освободить за петлей.
  • Внутри чтения/цикл обработки просто ограничить операции по указателю связанных. Копирование памяти следует только в случае необходимости.

Входные данные могут быть разделены между различными жесткими дисками, даже лучше, если они подключены к различным контроллерам. Входные данные могут отображаться в памяти, если вы знаете размер заранее, т. Е. Перед обработкой в ​​поточном контуре.

Оптимизируйте обработку отдельных потоков как можно больше (профилирование), затем определите части, которые не масштабируются с количеством потоков (количество потоков в качестве параметра). Эти части должны быть переписаны. Операции чтения/вывода могут кэшироваться и/или считывать данные в n * кластерных фрагментах.

Эти довольно общие рекомендации основаны на моем опыте, собранном во время обработки входных данных размера TB на Windows 7 (до 12 ht ядер и до 128 GB RAM) параллельно в потоках.

Смежные вопросы