2014-01-14 5 views
0

Я пытаюсь использовать symmetricds для настройки mysql до postgres Синхронизация с преобразованиями. У меня очень низкая производительность при начальной загрузке с 100% загрузкой процессора на postgres. Когда я смотрю на журнал postgres, я обнаружил, что он использовал INSERT. Это нормально для нормальной работы, но не для инициализации, потому что у меня есть миллионы записей. Я основал PostgresBulkDatabaseWriter в исходном коде, который использует COPY вместо INSERT, и это похоже на хорошее решение (COPY sql-запрос работает очень хорошо для меня), но я не нашел, как я могу его использовать.Массовая начальная загрузка для postgresql в SymmetricDS

Так мои вопросы:

Как лучше сделать начальную загрузку с symmetricds для миллионов записей?

Как включить PostgresBulkDatabaseWriter для начальной (обратной начальной) нагрузки?

Благодаря

UPD:

Источник таблицы mysql:

CREATE TABLE `companies` (
    `id` int(11) NOT NULL AUTO_INCREMENT, 
    `cid` int(11) NOT NULL, 
    `universalName` text NOT NULL, 
    `name` text NOT NULL, 
    `country` text NOT NULL, 
    `city` text NOT NULL, 
    `street` text NOT NULL, 
    `phone` text NOT NULL, 
    `foundedYear` text NOT NULL, 
    `employeeCountRange` text NOT NULL, 
    `specialties` text NOT NULL, 
    `websiteUrl` text NOT NULL, 
    `twitterId` text NOT NULL, 
    `check` tinyint(4) NOT NULL DEFAULT '0', 
    `date` datetime NOT NULL, 
    PRIMARY KEY (`id`) 
); 

CREATE TABLE `search_results` (
    `id` int(11) NOT NULL AUTO_INCREMENT, 
    `cid` int(11) NOT NULL, 
    `title` text NOT NULL, 
    `description` text NOT NULL, 
    `link` text NOT NULL, 
    `raw` text NOT NULL, 
    `date` datetime NOT NULL, 
    PRIMARY KEY (`id`) 
); 

Основные таблицы postgres:

CREATE TABLE res_country (
    id integer NOT NULL, 
    create_uid integer, 
    create_date timestamp without time zone, 
    write_date timestamp without time zone, 
    write_uid integer, 
    address_format text, 
    currency_id integer, 
    code character varying(2), 
    name character varying(64) NOT NULL 
); 

INSERT INTO res_country VALUES (1, 1, '2013-11-16 06:53:31.030363', '2013-11-16 06:53:31.030363', 1, '%(street)s 
%(street2)s 
%(city)s %(state_code)s %(zip)s 
%(country_name)s', 1, 'AD', 'Andorra, Principality of'); 

CREATE TABLE res_partner (
    id integer NOT NULL, 
    name character varying(128) NOT NULL, 
    lang character varying(64), 
    company_id integer, 
    create_uid integer, 
    create_date timestamp without time zone, 
    write_date timestamp without time zone, 
    write_uid integer, 
    comment text, 
    ean13 character varying(13), 
    color integer, 
    image bytea, 
    use_parent_address boolean, 
    active boolean, 
    street character varying(128), 
    supplier boolean, 
    city character varying(128), 
    user_id integer, 
    zip character varying(24), 
    title integer, 
    function character varying(128), 
    country_id integer, 
    parent_id integer, 
    employee boolean, 
    type character varying, 
    email character varying(240), 
    vat character varying(32), 
    website character varying(64), 
    fax character varying(64), 
    street2 character varying(128), 
    phone character varying(64), 
    credit_limit double precision, 
    date date, 
    tz character varying(64), 
    customer boolean, 
    image_medium bytea, 
    mobile character varying(64), 
    ref character varying(64), 
    image_small bytea, 
    birthdate character varying(64), 
    is_company boolean, 
    state_id integer, 
    notification_email_send character varying NOT NULL, 
    opt_out boolean, 
    signup_type character varying, 
    signup_expiration timestamp without time zone, 
    signup_token character varying, 
    last_reconciliation_date timestamp without time zone, 
    debit_limit double precision, 
    display_name character varying, 
    vat_subjected boolean, 
    section_id integer 
); 


CREATE TABLE my_res_partner_companies (
    id INT8 NOT NULL PRIMARY KEY, 
    cid INT8 NOT NULL, 
    universalName VARCHAR NOT NULL, 
    employeeCountRange VARCHAR NOT NULL, 
    specialties VARCHAR NOT NULL, 
    twitterId VARCHAR NOT NULL, 
    "check" INT4 NOT NULL DEFAULT '0', 
    date TIMESTAMP NOT NULL 
); 

CREATE TABLE my_res_partner_search_result (
    id INT8 NOT NULL PRIMARY KEY, 
    link VARCHAR NOT NULL, 
    raw VARCHAR NOT NULL, 
    date TIMESTAMP NOT NULL 
); 

Источник свойства:

engine.name=source-001 

# The class name for the JDBC Driver 
db.driver=com.mysql.jdbc.Driver 

# The JDBC URL used to connect to the database 
db.url=jdbc:mysql://localhost:3307/data?tinyInt1isBit=false 

# The user to login as who can create and update tables 
db.user=root 

# The password for the user to login as 
db.password= 

# The HTTP URL of the root node to contact for registration 
registration.url=http://localhost:8080/sync/core-000 
#auto.reload.reverse=true 

# Do not change these for running the demo 
group.id=source 
external.id=001 

# This is how often the routing job will be run in milliseconds 
job.routing.period.time.ms=5000 
# This is how often the push job will be run. 
job.push.period.time.ms=10000 
# This is how often the pull job will be run. 
job.pull.period.time.ms=10000 

Основные свойства:

engine.name=core-000 

# The class name for the JDBC Driver 
db.driver=org.postgresql.Driver 

# The JDBC URL used to connect to the database 
db.url=jdbc:postgresql://localhost:5432/data2?stringtype=unspecified 

# The user to login as who can create and update tables 
db.user=admin 

# The password for the user to login as 
db.password=admin 

registration.url= 
sync.url=http://localhost:8080/sync/core-000 
auto.reload.reverse=true 
datareload.batch.insert.transactional=true 

# Do not change these for running the demo 
group.id=core 
external.id=000 

# Don't muddy the waters with purge logging 
job.purge.period.time.ms=7200000 

# This is how often the routing job will be run in milliseconds 
job.routing.period.time.ms=5000 
# This is how often the push job will be run. 
job.push.period.time.ms=10000 
# This is how often the pull job will be run. 
job.pull.period.time.ms=10000 

Главная symmetric конфигурации:

-- Nodes 
insert into sym_node_group (node_group_id, description) 
values ('core', 'Core Storage'); 
insert into sym_node_group (node_group_id, description) 
values ('source', 'Source Storage'); 

insert into sym_node_group_link (source_node_group_id, target_node_group_id, data_event_action) 
values ('source', 'core', 'P'); 
insert into sym_node_group_link (source_node_group_id, target_node_group_id, data_event_action) 
values ('core', 'source', 'W'); 

insert into sym_node (node_id, node_group_id, external_id, sync_enabled) 
values ('000', 'core', '000', 1); 
insert into sym_node_security (node_id,node_password,registration_enabled,registration_time,initial_load_enabled,initial_load_time,initial_load_id,initial_load_create_by,rev_initial_load_enabled,rev_initial_load_time,rev_initial_load_id,rev_initial_load_create_by,created_at_node_id) 
values ('000','changeme',1,current_timestamp,1,current_timestamp,null,null,0,null,null,null,'000'); 
insert into sym_node_identity values ('000'); 

-- Channels 
insert into sym_channel 
(channel_id, processing_order, max_batch_size, enabled, description) 
values('source__acc', 1, 100000, 1, 'accounting synchronisation'); 

-- Triggers 
insert into sym_trigger 
(trigger_id,source_table_name,channel_id,last_update_time,create_time) 
values('source__companies','companies','source__acc',current_timestamp,current_timestamp); 

insert into sym_trigger 
(trigger_id,source_table_name,channel_id,last_update_time,create_time) 
values('source__search_results','search_results','source__acc',current_timestamp,current_timestamp); 

-- Routers 
insert into sym_router 
(router_id,source_node_group_id,target_node_group_id,router_type,create_time,last_update_time) 
values('source_2_core', 'source', 'core', 'default',current_timestamp, current_timestamp); 

-- Trigger Router Links 
insert into sym_trigger_router 
(trigger_id,router_id,initial_load_order, INITIAL_LOAD_BATCH_COUNT,last_update_time,create_time) 
values('source__companies','source_2_core', 100, 0, current_timestamp, current_timestamp); 

insert into sym_trigger_router 
(trigger_id,router_id,initial_load_order, INITIAL_LOAD_BATCH_COUNT,last_update_time,create_time) 
values('source__search_results','source_2_core', 200, 0, current_timestamp, current_timestamp); 

Основные преобразования:

-- Transform 
insert into SYM_TRANSFORM_TABLE 
    (transform_id, source_node_group_id, target_node_group_id, transform_point, source_table_name, target_table_name, delete_action, column_policy) 
values 
    ('source__companies__main', 'source', 'core', 'LOAD', 'companies', 'res_partner', 'DEL_ROW', 'SPECIFIED'); 
-- ('source__companies__main', 'source', 'core', 'EXTRACT', 'companies', 'res_partner', 'DEL_ROW', 'SPECIFIED'); 

insert into SYM_TRANSFORM_COLUMN 
    (transform_id, include_on, source_column_name, target_column_name, pk, transform_type, TRANSFORM_EXPRESSION) 
values 
    ('source__companies__main', '*', 'id', 'id', 1, 'bsh', 'return Integer.parseInt(currentValue) + 1000000;'), 
    ('source__companies__main', '*', 'name', 'name', 0, 'copy', NULL), 
    ('source__companies__main', '*', 'country', 'country_id', 0, 'const', '1'), 
    ('source__companies__main', '*', 'city', 'city', 0, 'copy', NULL), 
    ('source__companies__main', '*', 'street', 'street', 0, 'copy', NULL), 
    ('source__companies__main', '*', 'phone', 'phone', 0, 'copy', NULL), 
    ('source__companies__main', '*', 'websiteUrl', 'website', 0, 'copy', NULL), 
    ('source__companies__main', '*', NULL, 'notification_email_send', 0, 'const', '0'), 
    ('source__companies__main', '*', NULL, 'is_company', 0, 'const', '1'); 

insert into SYM_TRANSFORM_TABLE 
    (transform_id, source_node_group_id, target_node_group_id, transform_point, source_table_name, target_table_name, delete_action, column_policy) 
values 
    ('source__companies__residue', 'source', 'core', 'LOAD', 'companies', 'my_res_partner_companies', 'DEL_ROW', 'SPECIFIED'); 
    -- ('source__companies__residue', 'source', 'core', 'EXTRACT', 'companies', 'my_res_partner_companies', 'DEL_ROW', 'SPECIFIED'); 

insert into SYM_TRANSFORM_COLUMN 
    (transform_id, include_on, source_column_name, target_column_name, pk, transform_type, TRANSFORM_EXPRESSION) 
values 
    ('source__companies__residue', '*', 'id', 'id', 1, 'bsh', 'return Integer.parseInt(currentValue) + 1000000;'), 
    ('source__companies__residue', '*', 'cid', 'cid', 0, 'copy', NULL), 
    ('source__companies__residue', '*', 'universalName', 'universalName', 0, 'copy', NULL), 
    ('source__companies__residue', '*', 'employeeCountRange', 'employeeCountRange', 0, 'copy', NULL), 
    ('source__companies__residue', '*', 'specialties', 'specialties', 0, 'copy', NULL), 
    ('source__companies__residue', '*', 'twitterId', 'twitterId', 0, 'copy', NULL), 
    ('source__companies__residue', '*', 'check', 'check', 0, 'copy', NULL), 
    ('source__companies__residue', '*', 'date', 'date', 0, 'copy', NULL); 


insert into SYM_TRANSFORM_TABLE 
    (transform_id, source_node_group_id, target_node_group_id, transform_point, 
    source_table_name, target_table_name, delete_action, column_policy) 
values 
    ('source__search_results__main', 'source', 'core', 'LOAD', 'search_results', 'res_partner', 'DEL_ROW', 'SPECIFIED'); 
    -- ('source__search_results__main', 'source', 'core', 'EXTRACT', 'search_results', 'res_partner', 'DEL_ROW', 'SPECIFIED'); 

insert into SYM_TRANSFORM_COLUMN 
    (transform_id, include_on, source_column_name, target_column_name, pk, transform_type, TRANSFORM_EXPRESSION) 
values 
    ('source__search_results__main', '*', 'id', 'id', 1, 'bsh', 'return Integer.parseInt(currentValue) + 2000000;'), 
    ('source__search_results__main', '*', 'cid', 'parent_id', 0, 'bsh', 'return Integer.parseInt(currentValue) + 1000000;'), 
    ('source__search_results__main', '*', 'title', 'name', 0, 'copy', NULL), 
    ('source__search_results__main', '*', 'description', 'comment', 0, 'copy', NULL), 
    ('source__search_results__main', '*', NULL, 'use_parent_address', 0, 'const', '1'), 
    ('source__search_results__main', '*', NULL, 'notification_email_send', 0, 'const', '0'), 
    ('source__search_results__main', '*', NULL, 'is_company', 0, 'const', '0'); 

insert into SYM_TRANSFORM_TABLE 
    (transform_id, source_node_group_id, target_node_group_id, transform_point, 
    source_table_name, target_table_name, delete_action, column_policy) 
values 
    ('source__search_results__residue', 'source', 'core', 'LOAD', 'search_results', 'my_res_partner_search_result', 'DEL_ROW', 'SPECIFIED'); 
    -- ('source__search_results__residue', 'source', 'core', 'EXTRACT', 'search_results', 'my_res_partner_search_result', 'DEL_ROW', 'SPECIFIED'); 

insert into SYM_TRANSFORM_COLUMN 
    (transform_id, include_on, source_column_name, target_column_name, pk, transform_type, TRANSFORM_EXPRESSION) 
values 
    ('source__search_results__residue', '*', 'id', 'id', 1, 'bsh', 'return Integer.parseInt(currentValue) + 2000000;'), 
    ('source__search_results__residue', '*', 'link', 'link', 0, 'copy', NULL), 
    ('source__search_results__residue', '*', 'raw', 'raw', 0, 'copy', NULL), 
    ('source__search_results__residue', '*', 'date', 'date', 0, 'copy', NULL); 

упрощенный преобразования:

-- Transform 
insert into SYM_TRANSFORM_TABLE 
    (transform_id, source_node_group_id, target_node_group_id, transform_point, source_table_name, target_table_name, delete_action, column_policy) 
values 
    ('source__companies__main', 'source', 'core', 'LOAD', 'companies', 'res_partner', 'DEL_ROW', 'SPECIFIED'); 
-- ('source__companies__main', 'source', 'core', 'EXTRACT', 'companies', 'res_partner', 'DEL_ROW', 'SPECIFIED'); 

insert into SYM_TRANSFORM_COLUMN 
    (transform_id, include_on, source_column_name, target_column_name, pk, transform_type, TRANSFORM_EXPRESSION) 
values 
    ('source__companies__main', '*', 'id', 'id', 1, 'bsh', 'return Integer.parseInt(currentValue) + 1000000;'), 
    ('source__companies__main', '*', 'name', 'name', 0, 'copy', NULL), 
    ('source__companies__main', '*', 'country', 'country_id', 0, 'const', '1'), 
    ('source__companies__main', '*', 'city', 'city', 0, 'copy', NULL), 
    ('source__companies__main', '*', 'street', 'street', 0, 'copy', NULL), 
    ('source__companies__main', '*', 'phone', 'phone', 0, 'copy', NULL), 
    ('source__companies__main', '*', 'websiteUrl', 'website', 0, 'copy', NULL), 
    ('source__companies__main', '*', NULL, 'notification_email_send', 0, 'const', '0'), 
    ('source__companies__main', '*', NULL, 'is_company', 0, 'const', '1'); 


insert into SYM_TRANSFORM_TABLE 
    (transform_id, source_node_group_id, target_node_group_id, transform_point, 
    source_table_name, target_table_name, delete_action, column_policy) 
values 
    ('source__search_results__main', 'source', 'core', 'LOAD', 'search_results', 'res_partner', 'DEL_ROW', 'SPECIFIED'); 
    -- ('source__search_results__main', 'source', 'core', 'EXTRACT', 'search_results', 'res_partner', 'DEL_ROW', 'SPECIFIED'); 

insert into SYM_TRANSFORM_COLUMN 
    (transform_id, include_on, source_column_name, target_column_name, pk, transform_type, TRANSFORM_EXPRESSION) 
values 
    ('source__search_results__main', '*', 'id', 'id', 1, 'bsh', 'return Integer.parseInt(currentValue) + 2000000;'), 
    ('source__search_results__main', '*', 'cid', 'parent_id', 0, 'bsh', 'return Integer.parseInt(currentValue) + 1000000;'), 
    ('source__search_results__main', '*', 'title', 'name', 0, 'copy', NULL), 
    ('source__search_results__main', '*', 'description', 'comment', 0, 'copy', NULL), 
    ('source__search_results__main', '*', NULL, 'use_parent_address', 0, 'const', '1'), 
    ('source__search_results__main', '*', NULL, 'notification_email_send', 0, 'const', '0'), 
    ('source__search_results__main', '*', NULL, 'is_company', 0, 'const', '0'); 

установка Ядра:

update sym_channel set DATA_LOADER_TYPE = 'postgres_bulk' where channel_id = 'reload'; 

Посмотрите, как symmetric вставки с COPY по одному записи с основными преобразованиями (LOAD и ЭКСТРАКТИВНЫМ) и упрощенными преобразованиями (LOAD и ЭКСТРАКТИВНЫМ).

+1

От [их веб-сайт] (http://www.symmetricds.org/download): «SymmetricDS Pro - это репликация корпоративного класса, построенная на основе открытых источников SymmetricDS и других проверенных компонентов OSS. Повысьте свой опыт и производительность с помощью веб-интерфейса что упрощает настройку, мониторинг и устранение неполадок. Ядро расширено благодаря специальным функциям, включая быстрые, *** загружаемые загрузчики данных *** и поддержку синхронизации с устройствами Android ». Итак, похоже, что функция, которую вы используете, недоступна в версии OSS ... – fvu

+0

Только что основано http://www.symmetricds.org/doc/3.5/html-single/user-guide.html#ap02 -postgresql, вероятно, объемная загрузка существует в стандартной версии. – tbicr

+0

@fvu О, я люблю полуоткрытую приманку. В самом деле. –

ответ

6

PostgresBulkDataLoaderFactory ваш ответ.

Если вы просто хотите использовать объемный писатель для начальных нагрузок и перезагрузок, я предлагаю вам настроить канал перезагрузки только для использования объемного писателя.

На вашей таблице каналов (по умолчанию sym_channel) обновите столбец data_loader_type канала перезагрузки до 'postgres_bulk'.

users guide вкратце объясняет, как реализовать DatabaseWriters.

+0

Действительно 'update sym_channel set DATA_LOADER_TYPE = 'postgres_bulk', где channel_id = 'reload';' на узле postgres включить его. Но, похоже, он делает 'COPY' для каждой записи отдельно, а не массовым. Наверное, проблемы с моими преобразованиями. Моя загрузка процессора: 35% по postgres, 75% - симметрично. – tbicr

+0

Postgres массовая работа невероятно быстро без преобразований. С преобразованиями выглядят как вставки записей один за другим с 'COPY' для' EXTRACT' и 'LOAD' transform_point. Есть ли способ проверить мою теорию? Если это попытка, могу ли я настроить преобразования для массового вставки? – tbicr

+0

Можете ли вы рассказать, какие типы преобразований вы используете? Вы запрашиваете БД? –

0

Ключом к хорошей вставке больших наборов данных в postgresql является выполнение всего этого за несколько транзакций. По умолчанию postgresql использует автоматическую фиксацию для каждой вставки, поэтому каждая сама вставляет ее в транзакцию. Просто оберните все вставки с начала; совершить; может обеспечить гораздо более высокую пропускную способность.

Причины быстрого копирования копии поэтому в два раза. 1: одна большая транзакция и 2: отсутствие синтаксического разбора отдельных операторов вставки. Разница в затратах минимальна, но транзакционные накладные расходы, относительно говоря, огромны для отдельных заявлений.

Есть ли способ сделать это сделать это:

begin; 
insert ...; 
insert ...; 
insert ...; 
commit; 
+0

Операции с использованием SymmetricDS (по умолчанию 5000 записей на транзакцию), как я понял. Однако 'COPY' будет работать быстрее даже' INSERT' в транзакции. – tbicr

Смежные вопросы