Я получаю (потоковые) данные из внешнего источника (поверх Lightstreamer) в свое приложение C#. Приложение My C# получает данные от слушателя. Данные слушателя хранятся в очереди (ConcurrentQueue). Очередь очищается каждые 0,5 секунды с помощью TryDequeue в DataTable. Затем DataTable будет скопирован в базу данных SQL с помощью SqlBulkCopy. База данных SQL обрабатывает новые данные, полученные из промежуточной таблицы в итоговую таблицу. В настоящее время я получаю около 300 000 рядов в день (может увеличиться в течение следующих недель), и моя цель - оставаться менее 1 секунды с момента получения данных до тех пор, пока они не будут доступны в последней таблице SQL. В настоящее время максимальное количество строк в секунду, которое я должен обрабатывать, составляет около 50 строк.C# самый быстрый способ вставить данные в базу данных SQL
К сожалению, с момента получения все большего количества данных моя логика становится медленнее в производительности (все еще далека до 1 секунды, но я хочу продолжать улучшаться). Основным узким местом (пока) является обработка промежуточных данных (в базе данных SQL) в финальную таблицу. Чтобы повысить производительность, я хотел бы переключить промежуточную таблицу в таблицу с оптимизацией памяти. Заключительная таблица уже является оптимизированной для памяти таблицей, поэтому они будут работать вместе точно.
Мои вопросы:
- Есть ли способ использовать SqlBulkCopy (из C#) с памятью оптимизированных таблиц? (насколько я знаю, пока еще нет пути)
- Любые предложения по наиболее быстрому способу записи полученных данных из моего приложения C# в таблицу промежуточного хранения памяти?
РЕДАКТИРОВАТЬ (раствором):
После замечаний/ответов и оценок эффективности я решил отказаться от массовой вставки и использовать SQLCommand для передачи IEnumerable со своими данными, как табличное значение параметра в встроенную скомпилированную хранимую процедуру для хранения данных непосредственно в моей финальной таблице, оптимизированной для памяти (также как и копия в «промежуточную» таблицу, которая теперь служит архивом). Производительность значительно увеличилась (даже я еще не рассматривал возможность распараллеливания вставок (будет на более позднем этапе)).
Вот часть кода:
памяти оптимизированный пользовательский тип таблицы (для передачи данных из C# в SQL (хранимой процедуры):
CREATE TYPE [Staging].[CityIndexIntradayLivePrices] AS TABLE(
[CityIndexInstrumentID] [int] NOT NULL,
[CityIndexTimeStamp] [bigint] NOT NULL,
[BidPrice] [numeric](18, 8) NOT NULL,
[AskPrice] [numeric](18, 8) NOT NULL,
INDEX [IndexCityIndexIntradayLivePrices] NONCLUSTERED
(
[CityIndexInstrumentID] ASC,
[CityIndexTimeStamp] ASC,
[BidPrice] ASC,
[AskPrice] ASC
)
)
WITH (MEMORY_OPTIMIZED = ON)
Native скомпилированных хранимых процедур для вставки данные в итоговую таблицу и постановку (который служит в качестве архива в данном случае):
create procedure [Staging].[spProcessCityIndexIntradayLivePricesStaging]
(
@ProcessingID int,
@CityIndexIntradayLivePrices Staging.CityIndexIntradayLivePrices readonly
)
with native_compilation, schemabinding, execute as owner
as
begin atomic
with (transaction isolation level=snapshot, language=N'us_english')
-- store prices
insert into TimeSeries.CityIndexIntradayLivePrices
(
ObjectID,
PerDateTime,
BidPrice,
AskPrice,
ProcessingID
)
select Objects.ObjectID,
CityIndexTimeStamp,
CityIndexIntradayLivePricesStaging.BidPrice,
CityIndexIntradayLivePricesStaging.AskPrice,
@ProcessingID
from @CityIndexIntradayLivePrices CityIndexIntradayLivePricesStaging,
Objects.Objects
where Objects.CityIndexInstrumentID = CityIndexIntradayLivePricesStaging.CityIndexInstrumentID
-- store data in staging table
insert into Staging.CityIndexIntradayLivePricesStaging
(
ImportProcessingID,
CityIndexInstrumentID,
CityIndexTimeStamp,
BidPrice,
AskPrice
)
select @ProcessingID,
CityIndexInstrumentID,
CityIndexTimeStamp,
BidPrice,
AskPrice
from @CityIndexIntradayLivePrices
end
IEnumerable заполненную с из очереди:
private static IEnumerable<SqlDataRecord> CreateSqlDataRecords()
{
// set columns (the sequence is important as the sequence will be accordingly to the sequence of columns in the table-value parameter)
SqlMetaData MetaDataCol1;
SqlMetaData MetaDataCol2;
SqlMetaData MetaDataCol3;
SqlMetaData MetaDataCol4;
MetaDataCol1 = new SqlMetaData("CityIndexInstrumentID", SqlDbType.Int);
MetaDataCol2 = new SqlMetaData("CityIndexTimeStamp", SqlDbType.BigInt);
MetaDataCol3 = new SqlMetaData("BidPrice", SqlDbType.Decimal, 18, 8); // precision 18, 8 scale
MetaDataCol4 = new SqlMetaData("AskPrice", SqlDbType.Decimal, 18, 8); // precision 18, 8 scale
// define sql data record with the columns
SqlDataRecord DataRecord = new SqlDataRecord(new SqlMetaData[] { MetaDataCol1, MetaDataCol2, MetaDataCol3, MetaDataCol4 });
// remove each price row from queue and add it to the sql data record
LightstreamerAPI.PriceDTO PriceDTO = new LightstreamerAPI.PriceDTO();
while (IntradayQuotesQueue.TryDequeue(out PriceDTO))
{
DataRecord.SetInt32(0, PriceDTO.MarketID); // city index market id
DataRecord.SetInt64(1, Convert.ToInt64((PriceDTO.TickDate.Replace(@"\/Date(", "")).Replace(@")\/", ""))); // @ is used to avoid problem with/as escape sequence)
DataRecord.SetDecimal(2, PriceDTO.Bid); // bid price
DataRecord.SetDecimal(3, PriceDTO.Offer); // ask price
yield return DataRecord;
}
}
Обработка данных каждые 0,5 секунды:
public static void ChildThreadIntradayQuotesHandler(Int32 CityIndexInterfaceProcessingID)
{
try
{
// open new sql connection
using (SqlConnection TimeSeriesDatabaseSQLConnection = new SqlConnection("Data Source=XXX;Initial Catalog=XXX;Integrated Security=SSPI;MultipleActiveResultSets=false"))
{
// open connection
TimeSeriesDatabaseSQLConnection.Open();
// endless loop to keep thread alive
while(true)
{
// ensure queue has rows to process (otherwise no need to continue)
if(IntradayQuotesQueue.Count > 0)
{
// define stored procedure for sql command
SqlCommand InsertCommand = new SqlCommand("Staging.spProcessCityIndexIntradayLivePricesStaging", TimeSeriesDatabaseSQLConnection);
// set command type to stored procedure
InsertCommand.CommandType = CommandType.StoredProcedure;
// define sql parameters (table-value parameter gets data from CreateSqlDataRecords())
SqlParameter ParameterCityIndexIntradayLivePrices = InsertCommand.Parameters.AddWithValue("@CityIndexIntradayLivePrices", CreateSqlDataRecords()); // table-valued parameter
SqlParameter ParameterProcessingID = InsertCommand.Parameters.AddWithValue("@ProcessingID", CityIndexInterfaceProcessingID); // processing id parameter
// set sql db type to structured for table-value paramter (structured = special data type for specifying structured data contained in table-valued parameters)
ParameterCityIndexIntradayLivePrices.SqlDbType = SqlDbType.Structured;
// execute stored procedure
InsertCommand.ExecuteNonQuery();
}
// wait 0.5 seconds
Thread.Sleep(500);
}
}
}
catch (Exception e)
{
// handle error (standard error messages and update processing)
ThreadErrorHandling(CityIndexInterfaceProcessingID, "ChildThreadIntradayQuotesHandler (handler stopped now)", e);
};
}
Посмотрите на TVP (параметр значения таблицы) - вы можете использовать как обратный DataReader. https://lennilobel.wordpress.com/2009/07/29/sql-server-2008-table-valued-parameters-and-c-custom-iterators-a-match-made-in-heaven/ – Paparazzi