Я пишу параллельный код, чтобы перечислить большой набор файлов CSV, каждый из которых содержит исторические данные запаса (более 6500 символов) и рассчитать, имеет ли каждый запас достигла своего рекордного уровня.многопоточное многопоточное ускорение: чтение через CSV с использованием TMemoryStream
Я реализовал класс Thread Pool и TThread descendant, чтобы разделить список символов поровну между Threads, которые затем выделяются для разделов ядра моей машины i7. Я настраиваю потоки для каждого, у кого есть копия всех необходимых им данных, когда они создаются, прежде чем их нет, поэтому нет необходимости в блокировке, если потоки обрабатываются. Как только все потоки закончатся, я суммирую данные результата каждого потока в основной программе.
В настоящее время я проверил свой код, используя несколько многопоточных менеджеров памяти, упомянутых https://stackoverflow.com/questions/6072269/need-multi-threading-memory-manager/6076407#6076407. Пока что SapMM представляется наиболее эффективным без нарушения доступа.
Проблема в том, что добавление большего количества потоков не пропорционально ускоряет время, необходимое для завершения расчета всех максимумов. Использование 2 "core-d" нитей не сокращает время работы на 1/2, но 3 не полностью разрезается на 1/3, а 4 не обрезается около 1/4.
Количество потоков 1, 2 , 3, 4
Прогнозируемые Форсировочная Время (мм: сс) 6:37, 3:17 1/2, 1/3 2:12 , 1:39 1/4
Фактическое время (мм: сс) 6:37, 4:07 , 3:05, 2:51
Я пришел к тому моменту, когда мне нужна дополнительная информация, чтобы получить полное ускорение этой операции. Мне нужно понять, почему многоуровневое ускорение завершается, а не просто «обманывать края проблемы». Итак, что заставляет этот код прекратить получать пропорциональные прибыли и что мне нужно сделать для достижения этих выгод? Короче говоря, есть ли другой подход к ускорению анализа, который я делаю, например. а не использовать TMemoryStream?
Код, который я использую, приведен ниже.
Я использую Delphi XE4 Enterprise.
В каждой нити I петле через каждый символ, и:
- Используйте TMemoryStream.LoadFromFile, чтобы загрузить исторические данные символа.
- Используйте функцию, которую я написал, чтобы получить наивысший максимум данных символа непосредственно из TMemoryStream.
(1) протестирован и не требует времени (менее 1 секунды общего времени, затрачиваемого на загрузку всего 6500, по одному в память). Процедура я использую в (2) является то, что принимает все время, и указано ниже:
unit uTest;
implementation
uses
SysUtils, Math, Classes;
type
TDayIndexData = record
Date: TDate;
Open, High, Low, Close, AdjClose,
Volume: extended;
end;
type
TTimeUnit = (tuDay, tuWeek, tuMonth, tuYear);
TTimePeriod = record
Length: integer;
TimeUnit: TTimeUnit;
end;
//#NO CHANGE
const
AllDataPeriodStr = 'All Data';
type
TRatePeriod = record
PeriodStr: string;
TimePeriod: TTimePeriod;
end;
type
TFieldType = (ftDate, ftOpen, ftHigh, ftLow, ftClose, ftVolume, ftAdjClose);
const CSV_DELIM_CHARSET = [#0..#31, ',',#127];
type
TShallowEquityNewHighInfoRetrievalResults = record
Success: boolean;
High: extended;
end;
function ShallowEquityNewHighInfoRetrieval(
AStream: TStream;
ARatePeriod: TRatePeriod;
AGetNormalData: boolean = False): TShallowEquityNewHighInfoRetrievalResults;
var
vStreamSize: int64;
function EOF: boolean;
begin
Result := AStream.Position >= vStreamSize;//AStream.Size;
end;
procedure GotoEOF;
begin
AStream.Seek(0, soFromEnd);
end;
//#OPTIMIZE
//var
//vBuffer: FileString;
type
FileChar = AnsiChar;
FileString = AnsiString;
const
ResultCharSize = SizeOf(FileChar);
var
MRReadChar: FileChar;
procedure ReadNextChar;
begin
if not EOF then
AStream.Read(MRReadChar, SizeOf(MRReadChar)) else
raise EInvalidOperation.Create('Unexpected end of file found');
end;
var
vPossDelimChars: boolean;
procedure SkipExistingDelimChars;
begin
//*INTENTION: prevents redundant SkipDelimChars calls, which is destructive
if not vPossDelimChars then Exit;
//not requiring DelimChars
if EOF then Exit;
repeat
ReadNextChar;
until EOF or not (MRReadChar in CSV_DELIM_CHARSET);
//#*NOTE: technically can be true if EOF,
//but if EOF then CurChar is never used 3/13/2014
vPossDelimChars := False;
end;
function SOF: boolean;
begin
Result := AStream.Position = 0;
end;
function NextChars(ACount: integer): FileString;
begin
//#OPTIMIZE: condition
if ResultCharSize = 1 then
begin
SetLength(Result, Min(ACount, vStreamSize{AStream.Size} - AStream.Position));
AStream.Read(Pointer(Result)^, Length(Result));
AStream.Seek(-Length(Result), soFromCurrent);
end else
begin
SetLength(Result, Min(ACount, (vStreamSize{AStream.Size} - AStream.Position) div ResultCharSize));
AStream.Read(Pointer(Result)^, Length(Result) * ResultCharSize);
AStream.Seek(-Length(Result) * ResultCharSize, soFromCurrent);
end;
end;
procedure GotoNextChars(ACount: integer);
begin
//#OPTIMIZE: condition
if ResultCharSize = 1 then
AStream.Seek(ACount, soFromCurrent) else
AStream.Seek(ACount*SizeOf(FileChar), soFromCurrent);
end;
procedure GotoPrevChars(ACount: integer);
begin
//#OPTIMIZE: condition
if ResultCharSize = 1 then
AStream.Seek(-ACount, soFromCurrent) else
AStream.Seek(-ACount*SizeOf(FileChar), soFromCurrent);
end;
procedure GotoPreceedingEOLN(ForItem: boolean = False);
var
vOrigPos: integer;
const
NMinRowChars = 17;//Length('3-13-13,1,1,1,1,1')
begin
//assumes will not hit SOF
//assumes ending CRLF taken care of by other places
vOrigPos := AStream.Position;
vPossDelimChars := True;
while (NextChars(2) <> #13#10) or (AStream.Position = vOrigPos) do
if (Length(NextChars(2)) = 2) and (NextChars(2)[2] = #10) and
(AStream.Position < vOrigPos - SizeOf(FileChar)) then
begin
GotoNextChars(1);
Exit;
end else
if (AStream.Position = vOrigPos) and ForItem then
GotoPrevChars(NMinRowChars) else
GotoPrevChars(1);
end;
var
CurField: string;
CurCol: integer;
procedure InitParsingState;
begin
//Initialize Parsing State
CurCol := -1;
vPossDelimChars := True;
SkipExistingDelimChars;
vStreamSize := AStream.Size;
end;
procedure BacktrackTo(APos: integer; ASafeMode: boolean = False);
begin
if ASafeMode then
AStream.Seek(Pred(APos), soFromBeginning) else
AStream.Seek(APos, soFromBeginning);
ReadNextChar;
vPossDelimChars := False;
CurCol := Ord(High(TFieldType));
end;
procedure ReadQuotedText;
var
vHadPrevQuoteChar: boolean;
begin
vHadPrevQuoteChar := False;
while MRReadChar = '"' do
begin
if vHadPrevQuoteChar then
CurField := CurField + MRReadChar;
ReadNextChar;
while MRReadChar <> '"' do
begin
CurField := CurField + MRReadChar;
ReadNextChar;
end;
if EOF then
break;
ReadNextChar;
vHadPrevQuoteChar := True;
end;
end;
procedure GetNextFieldValue;
begin
if EOF then Exit;
CurCol := (CurCol+1) mod Succ(Ord(High(TFieldType)));
CurField := '';
if MRReadChar = '"' then
ReadQuotedText else
begin
repeat
CurField := CurField + MRReadChar;
if not EOF then
ReadNextChar;
until EOF or (MRReadChar in CSV_DELIM_CHARSET);
if EOF then
if not (MRReadChar in CSV_DELIM_CHARSET) then
CurField := CurField + MRReadChar;
end;
vPossDelimChars := True;
SkipExistingDelimChars;
end;
var
ColFieldTypes: array [Ord(Low(TFieldType))..Ord(High(TFieldType))] of TFieldType;
procedure ResolveCurColFieldType;
var
vField: string;
begin
vField := LowerCase(CurField);
if vField = 'date' then
ColFieldTypes[CurCol] := ftDate else
if vField = 'open' then
ColFieldTypes[CurCol] := ftOpen else
if vField = 'high' then
ColFieldTypes[CurCol] := ftHigh else
if vField = 'low' then
ColFieldTypes[CurCol] := ftLow else
if vField = 'close' then
ColFieldTypes[CurCol] := ftClose else
if vField = 'volume' then
ColFieldTypes[CurCol] := ftVolume else
if Pos('close', vField) > 0 then
ColFieldTypes[CurCol] := ftAdjClose else
raise EInvalidOperation.Create('Unrecognized file format: unrecognized column name found.');
end;
procedure WriteItemAsFieldValue(var AData: TDayIndexData);
begin
case ColFieldTypes[CurCol] of
ftDate:AData.Date := ExStrToDate(CurField);
ftOpen:AData.Open := StrToFloat(CurField);
ftHigh:AData.High := StrToFloat(CurField);
ftLow:AData.Low := StrToFloat(CurField);
ftClose:AData.Close := StrToFloat(CurField);
ftVolume:AData.Volume := StrToFloat(CurField);
ftAdjClose:AData.AdjClose := StrToFloat(CurField);
end;
end;
procedure VerifyFields;
var
iField: TFieldType;
iColumn: integer;
IsUsedFlags: array [Low(TFieldType)..High(TFieldType)] of boolean;
begin
//* Set all to false
for iField := Low(TFieldType) to High(TFieldType) do
IsUsedFlags[iField] := False;
//* set found to true
for iColumn := Low(ColFieldTypes) to High(ColFieldTypes) do
IsUsedFlags[ColFieldTypes[iColumn]] := True;
//* throw error on first one not found
for iField := Low(TFieldType) to High(TFieldType) do
if not IsUsedFlags[iField] then
begin
raise EInvalidOperation.Create('Bad file format: one or more column names are missing!');
break;
end;
end;
procedure LoadHeader;
var
iField: TFieldType;
begin
for iField := Low(TFieldType) to High(TFieldType) do
begin
GetNextFieldValue;
ResolveCurColFieldType;
end;
VerifyFields;
if EOF then
raise EInvalidOperation.Create('Cannot complete shallow Equity New High Info Retrieval: Not enough Data')
end;
procedure LoadRowInto(var ADayData: TDayIndexData);
var
iField: TFieldType;
begin
for iField := Low(TFieldType) to High(TFieldType) do
begin
GetNextFieldValue;
WriteItemAsFieldValue(ADayData);
end;
end;
var
OrderReversed: boolean;
vTopDay,
vBottomDay,
vFirstDay,
vEarlierDay,
vLastDay: TDayIndexData;
vBeginDate: TDate;
vBeforeLastDayPos,
vFirstDayPos,
vAfterFirstDayPos: integer;
function HasUnprocessedDays: boolean;
begin
//** use Position of stream because we don't always have the first day in the
// file, due to optimization
Result := (
((AStream.Position > vFirstDayPos) and not OrderReversed) or
(((AStream.Position < AStream.Size - SizeOf(FileChar)*Length(#13#10)) or
(AStream.Position < AStream.Size - SizeOf(FileChar)*Length(#10)))
and OrderReversed));
end;
function NotYetCoveredTimePeriod: boolean;
begin
Result :=
(ARatePeriod.PeriodStr = AllDataPeriodStr)
or
(
(ARatePeriod.PeriodStr <> AllDataPeriodStr) and
(vEarlierDay.Date >= vBeginDate)
);
end;
function FoundAllNeededData: boolean;
begin
Result := (
(ARatePeriod.PeriodStr <> AllDataPeriodStr) and
(vEarlierDay.Date <= vBeginDate)
) or
(ARatePeriod.PeriodStr = AllDataPeriodStr);
end;
procedure GotoLastDay;
begin
//** Goto End of File
GotoEOF;
//** Goto Just before Last Day
GotoPreceedingEOLN;
if (AStream.Position = AStream.Size - SizeOf(FileChar)*Length(#13#10)) or
(AStream.Position = AStream.Size - SizeOf(FileChar)*Length(#10)) then
GotoPreceedingEOLN;
SkipExistingDelimChars;
end;
procedure DetermineDataOrder;
begin
//#ASSUMPTION: assume end day at BOTTOM of file if latest data less than 2 days ago
//Problem when NDays = 2 ?
if Trunc(Now) - Trunc(vBottomDay.Date) >= 2 then
begin
//** Get Top Day
BacktrackTo(vFirstDayPos, True);
LoadRowInto(vTopDay);
//** Determine what order the data is in
OrderReversed := vBottomDay.Date < vTopDay.Date;
if not OrderReversed then
BacktrackTo(vBeforeLastDayPos, True);
if OrderReversed then
vFirstDay := vBottomDay else
vFirstDay := vTopDay;
if OrderReversed then
vLastDay := vTopDay else
vLastDay := vBottomDay;
end else
begin
OrderReversed := False;
//vLastDay := vTopDay;
vLastDay := vBottomDay;
end;
end;
procedure LoadPrevRow;
var
vBeforeDayPos: integer;
begin
GotoPreceedingEOLN(True);
vBeforeDayPos := AStream.Position;
SkipExistingDelimChars;
LoadRowInto(vEarlierDay);
AStream.Seek(vBeforeDayPos, soFromBeginning);
end;
begin
//* Initialize
Result.Success := False;
AStream.Seek(0, soFromBeginning);
InitParsingState;
//** Load CSV Header
LoadHeader;
vFirstDayPos := AStream.Position;
//** Get Last Day
GotoLastDay;
vBeforeLastDayPos := AStream.Position;
LoadRowInto(vBottomDay);
//** IF Only 1 Data Day:
if vFirstDayPos = vBeforeLastDayPos then
begin
//return results
Result.Success := True;
Result.High := vBottomDay.High;
Exit;
end;
//** Go back to Last Day in File
BacktrackTo(vBeforeLastDayPos);
//** Determine what order the data is in
DetermineDataOrder;
//** Determine Date to scan back to if opted for
if ARatePeriod.PeriodStr <> AllDataPeriodStr then
vBeginDate := MoveDateBack(vLastDay.Date, ARatePeriod.TimePeriod);
//* Initialize Loop Variables
Result.High := vLastDay.High;
vEarlierDay := vLastDay;
while HasUnProcessedDays and NotYetCoveredTimePeriod do
begin
//** Goto Previous Day's Row
if OrderReversed then
LoadRowInto(vEarlierDay) else
LoadPrevRow;
//** Update High
if NotYetCoveredTimePeriod then
Result.High := Max(Result.High, vEarlierDay.High);
end;
Result.Success := FoundAllNeededData;
end;
end.
Пример CSV ниже. Обратите внимание, что иногда позиции CSV находятся в обратном порядке в файле (последняя дата сначала).
Date,Open,High,Low,Close,Volume,Adj Close
11/3/2014,12,12.06,11.75,11.98,19700,11.98
11/4/2014,12,12,10.62,11.55,39200,11.55
11/5/2014,11.6,11.85,11.6,11.85,3100,11.85
11/6/2014,11.85,11.85,11.85,11.85,0,11.85
11/7/2014,11.5,11.5,10.35,11,35900,11
11/10/2014,11.12,11.12,11.12,11.12,200,11.12
11/11/2014,11.5,11.5,11.5,11.5,200,11.5
11/12/2014,11.75,11.85,11.15,11.45,3500,11.45
11/13/2014,11.45,11.45,11.45,11.45,0,11.45
11/14/2014,11.45,11.45,11.45,11.45,0,11.45
11/17/2014,11.07,11.28,11.07,11.28,1600,11.28
11/18/2014,11.07,11.74,11.06,11.74,8100,11.74
11/19/2014,11.1,11.5,11,11.5,11600,11.5
11/20/2014,11.1,11.5,11.1,11.5,3100,11.5
11/21/2014,11.49,11.5,11.23,11.25,15100,11.25
11/24/2014,11.25,11.35,11.25,11.25,900,11.25
11/25/2014,11.48,11.5,11.25,11.5,355300,11.5
11/26/2014,11.75,11.75,11.5,11.5,261300,11.5
11/28/2014,11.75,11.8,11.75,11.8,16300,11.8
12/1/2014,11.25,11.8,11.02,11.5,23800,11.5
12/2/2014,11.6,11.6,11.47,11.5,57600,11.5
12/3/2014,11.57,11.75,11.41,11.69,240700,11.69
12/4/2014,11.74,11.75,11.49,11.65,41100,11.65
12/5/2014,11.65,11.85,11.56,11.8,267200,11.8
12/8/2014,11.8,11.85,11.68,11.8,168700,11.8
Файловый ввод-вывод - это более или менее последовательный процесс, не ожидайте параллельного усиления для этого. –
Да. Диск ввода-вывода - это, конечно, горло бутылки. Многопоточность предназначена для задач, связанных с ЦП. Прежде чем инвестировать время разработки на оптимизацию, убедитесь, что вы оптимизируете реальное узкое место. –
Вы также, кажется, смешивали несколько разных аспектов в одной функции. Это затрудняет работу с вашим кодом. Вам действительно нужно определить код, чтобы каждая часть имела дело с одной задачей. –