2016-04-19 2 views
5

Я получаю (потоковые) данные из внешнего источника (поверх Lightstreamer) в свое приложение C#. Приложение My C# получает данные от слушателя. Данные слушателя хранятся в очереди (ConcurrentQueue). Очередь очищается каждые 0,5 секунды с помощью TryDequeue в DataTable. Затем DataTable будет скопирован в базу данных SQL с помощью SqlBulkCopy. База данных SQL обрабатывает новые данные, полученные из промежуточной таблицы в итоговую таблицу. В настоящее время я получаю около 300 000 рядов в день (может увеличиться в течение следующих недель), и моя цель - оставаться менее 1 секунды с момента получения данных до тех пор, пока они не будут доступны в последней таблице SQL. В настоящее время максимальное количество строк в секунду, которое я должен обрабатывать, составляет около 50 строк.C# самый быстрый способ вставить данные в базу данных SQL

К сожалению, с момента получения все большего количества данных моя логика становится медленнее в производительности (все еще далека до 1 секунды, но я хочу продолжать улучшаться). Основным узким местом (пока) является обработка промежуточных данных (в базе данных SQL) в финальную таблицу. Чтобы повысить производительность, я хотел бы переключить промежуточную таблицу в таблицу с оптимизацией памяти. Заключительная таблица уже является оптимизированной для памяти таблицей, поэтому они будут работать вместе точно.

Мои вопросы:

  1. Есть ли способ использовать SqlBulkCopy (из C#) с памятью оптимизированных таблиц? (насколько я знаю, пока еще нет пути)
  2. Любые предложения по наиболее быстрому способу записи полученных данных из моего приложения C# в таблицу промежуточного хранения памяти?

РЕДАКТИРОВАТЬ (раствором):

После замечаний/ответов и оценок эффективности я решил отказаться от массовой вставки и использовать SQLCommand для передачи IEnumerable со своими данными, как табличное значение параметра в встроенную скомпилированную хранимую процедуру для хранения данных непосредственно в моей финальной таблице, оптимизированной для памяти (также как и копия в «промежуточную» таблицу, которая теперь служит архивом). Производительность значительно увеличилась (даже я еще не рассматривал возможность распараллеливания вставок (будет на более позднем этапе)).

Вот часть кода:

памяти оптимизированный пользовательский тип таблицы (для передачи данных из C# в SQL (хранимой процедуры):

CREATE TYPE [Staging].[CityIndexIntradayLivePrices] AS TABLE(
    [CityIndexInstrumentID] [int] NOT NULL, 
    [CityIndexTimeStamp] [bigint] NOT NULL, 
    [BidPrice] [numeric](18, 8) NOT NULL, 
    [AskPrice] [numeric](18, 8) NOT NULL, 
    INDEX [IndexCityIndexIntradayLivePrices] NONCLUSTERED 
(
    [CityIndexInstrumentID] ASC, 
    [CityIndexTimeStamp] ASC, 
    [BidPrice] ASC, 
    [AskPrice] ASC 
) 
) 
WITH (MEMORY_OPTIMIZED = ON) 

Native скомпилированных хранимых процедур для вставки данные в итоговую таблицу и постановку (который служит в качестве архива в данном случае):

create procedure [Staging].[spProcessCityIndexIntradayLivePricesStaging] 
(
    @ProcessingID int, 
    @CityIndexIntradayLivePrices Staging.CityIndexIntradayLivePrices readonly 
) 
with native_compilation, schemabinding, execute as owner 
as 
begin atomic 
with (transaction isolation level=snapshot, language=N'us_english') 


    -- store prices 

    insert into TimeSeries.CityIndexIntradayLivePrices 
    (
     ObjectID, 
     PerDateTime, 
     BidPrice, 
     AskPrice, 
     ProcessingID 
    ) 
    select Objects.ObjectID, 
    CityIndexTimeStamp, 
    CityIndexIntradayLivePricesStaging.BidPrice, 
    CityIndexIntradayLivePricesStaging.AskPrice, 
    @ProcessingID 
    from @CityIndexIntradayLivePrices CityIndexIntradayLivePricesStaging, 
    Objects.Objects 
    where Objects.CityIndexInstrumentID = CityIndexIntradayLivePricesStaging.CityIndexInstrumentID 


    -- store data in staging table 

    insert into Staging.CityIndexIntradayLivePricesStaging 
    (
     ImportProcessingID, 
     CityIndexInstrumentID, 
     CityIndexTimeStamp, 
     BidPrice, 
     AskPrice 
    ) 
    select @ProcessingID, 
    CityIndexInstrumentID, 
    CityIndexTimeStamp, 
    BidPrice, 
    AskPrice 
    from @CityIndexIntradayLivePrices 


end 

IEnumerable заполненную с из очереди:

private static IEnumerable<SqlDataRecord> CreateSqlDataRecords() 
{ 


    // set columns (the sequence is important as the sequence will be accordingly to the sequence of columns in the table-value parameter) 

    SqlMetaData MetaDataCol1; 
    SqlMetaData MetaDataCol2; 
    SqlMetaData MetaDataCol3; 
    SqlMetaData MetaDataCol4; 

    MetaDataCol1 = new SqlMetaData("CityIndexInstrumentID", SqlDbType.Int); 
    MetaDataCol2 = new SqlMetaData("CityIndexTimeStamp", SqlDbType.BigInt); 
    MetaDataCol3 = new SqlMetaData("BidPrice", SqlDbType.Decimal, 18, 8); // precision 18, 8 scale 
    MetaDataCol4 = new SqlMetaData("AskPrice", SqlDbType.Decimal, 18, 8); // precision 18, 8 scale 


    // define sql data record with the columns 

    SqlDataRecord DataRecord = new SqlDataRecord(new SqlMetaData[] { MetaDataCol1, MetaDataCol2, MetaDataCol3, MetaDataCol4 }); 


    // remove each price row from queue and add it to the sql data record 

    LightstreamerAPI.PriceDTO PriceDTO = new LightstreamerAPI.PriceDTO(); 

    while (IntradayQuotesQueue.TryDequeue(out PriceDTO)) 
    { 

     DataRecord.SetInt32(0, PriceDTO.MarketID); // city index market id 
     DataRecord.SetInt64(1, Convert.ToInt64((PriceDTO.TickDate.Replace(@"\/Date(", "")).Replace(@")\/", ""))); // @ is used to avoid problem with/as escape sequence) 
     DataRecord.SetDecimal(2, PriceDTO.Bid); // bid price 
     DataRecord.SetDecimal(3, PriceDTO.Offer); // ask price 

     yield return DataRecord; 

    } 


} 

Обработка данных каждые 0,5 секунды:

public static void ChildThreadIntradayQuotesHandler(Int32 CityIndexInterfaceProcessingID) 
{ 


    try 
    { 

     // open new sql connection 

     using (SqlConnection TimeSeriesDatabaseSQLConnection = new SqlConnection("Data Source=XXX;Initial Catalog=XXX;Integrated Security=SSPI;MultipleActiveResultSets=false")) 
     { 


      // open connection 

      TimeSeriesDatabaseSQLConnection.Open(); 


      // endless loop to keep thread alive 

      while(true) 
      { 


       // ensure queue has rows to process (otherwise no need to continue) 

       if(IntradayQuotesQueue.Count > 0) 
       { 


        // define stored procedure for sql command 

        SqlCommand InsertCommand = new SqlCommand("Staging.spProcessCityIndexIntradayLivePricesStaging", TimeSeriesDatabaseSQLConnection); 


        // set command type to stored procedure 

        InsertCommand.CommandType = CommandType.StoredProcedure; 


        // define sql parameters (table-value parameter gets data from CreateSqlDataRecords()) 

        SqlParameter ParameterCityIndexIntradayLivePrices = InsertCommand.Parameters.AddWithValue("@CityIndexIntradayLivePrices", CreateSqlDataRecords()); // table-valued parameter 
        SqlParameter ParameterProcessingID = InsertCommand.Parameters.AddWithValue("@ProcessingID", CityIndexInterfaceProcessingID); // processing id parameter 


        // set sql db type to structured for table-value paramter (structured = special data type for specifying structured data contained in table-valued parameters) 

        ParameterCityIndexIntradayLivePrices.SqlDbType = SqlDbType.Structured; 


        // execute stored procedure 

        InsertCommand.ExecuteNonQuery(); 


       } 


       // wait 0.5 seconds 

       Thread.Sleep(500); 


      } 

     } 

    } 
    catch (Exception e) 
    { 

     // handle error (standard error messages and update processing) 

     ThreadErrorHandling(CityIndexInterfaceProcessingID, "ChildThreadIntradayQuotesHandler (handler stopped now)", e); 

    }; 


} 
+0

Посмотрите на TVP (параметр значения таблицы) - вы можете использовать как обратный DataReader. https://lennilobel.wordpress.com/2009/07/29/sql-server-2008-table-valued-parameters-and-c-custom-iterators-a-match-made-in-heaven/ – Paparazzi

ответ

2

Использование SQL Server 2016 (это еще не RTM, но это уже гораздо лучше, чем 2014, когда речь идет о памяти, оптимизированные таблицы). Затем используйте либо memory-optimized table variable, либо просто взорвите целых native stored procedure вызовов в транзакции, каждая из которых делает одну вставку, в зависимости от того, что быстрее в вашем сценарии (это зависит от ситуации).Несколько вещей, на которые следует обратить внимание:

  • Выполнение нескольких вставок за одну транзакцию имеет жизненно важное значение для сохранения в сетевых кругооборотах. Хотя операции с оперативной памятью выполняются очень быстро, SQL Server по-прежнему необходимо подтвердить каждую операцию.
  • В зависимости от того, как вы создаете данные, вы можете обнаружить, что распараллеливание вставок может ускорить работу (не переусердствуйте, вы быстро попадете в точку насыщения). Не пытайся быть очень умным здесь; плечо async/await и/или Parallel.ForEach.
  • Если вы передаете табличный параметр, самый простой способ сделать это - передать значение DataTable в качестве значения параметра, но это не самый эффективный способ сделать это - это будет передача IEnumerable<SqlDataRecord>. Вы можете использовать метод итератора для генерации значений, поэтому выделяется только постоянный объем памяти.

Вам нужно немного поэкспериментировать, чтобы найти оптимальный способ передачи данных; это сильно зависит от размера ваших данных и того, как вы его получаете.

+0

Я сейчас используя SQL Server 2014, и я смог реализовать свое решение с ним (хотя мне приходилось делать небольшие компромиссы). Но я собираюсь рассмотреть SQL Server 2016 как можно скорее. IEnumerable работает отлично, быстрее, чем DataTable. Использование оптимизированного с учетом памяти параметра таблицы и собственной скомпилированной хранимой процедуры значительно сократило нагрузку на базу данных. – Reboon

0

Загрузите данные из промежуточной таблицы в итоговую таблицу в строке менее 5k, я обычно использую 4k и не вставляю их в транзакцию. Вместо этого при необходимости реализуйте программные транзакции. Оставаясь под 5k вставленными рядами, количество блокировок строк возрастает до блокировки таблицы, которая должна ждать, пока все остальные не выйдут из таблицы.

0

Вы уверены, что ваша логика замедляется, а не фактические транзакции в базе данных? Например, платформа Entity Framework является «чувствительной» из-за отсутствия лучшего термина при попытке вставить тонну строк и становится довольно медленной.

Там есть библиотека третьей стороны, BulkInsert, на Codeplex, которые я использовал, и это очень приятно делать массовую вставку данных: https://efbulkinsert.codeplex.com/

Вы также можете написать свой собственный метод расширения на DbContext, если EF, что делает это тоже может быть основано на подсчете записей. Все, что находится под 5000 строк, используют функцию «Сохранить» (Save), что угодно, что вы можете использовать свою собственную логику объемной вставки.

Смежные вопросы