1

У меня есть таблица Кассандры с около 500+ миллионов записей (в 6 узлов), теперь я пытаюсь вставить данные с помощью искрового-Кассандры разъем в Amazon EMRCassandra записи дает очень медленно Perfomance с помощью Спарк

Таблица Структура

CREATE TABLE dmp.dmp_user_profiles_latest (
     pid text PRIMARY KEY, 
     xnid int, 
     day_count map<text, int>, 
     first_seen map<text, timestamp>, 
     last_seen map<text, timestamp>, 
     usage_count map<text, int>, 
     city text, 
     country text, 
     lid set<text>, 

    )WITH bloom_filter_fp_chance = 0.01 
    AND caching = '{"keys":"NONE", "rows_per_partition":"ALL"}' 
    AND comment = '' 
    AND compaction = {'min_threshold': '4', 'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy', 'max_threshold': '32'} 
    AND compression = {'chunk_length_kb': '256', 'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'} 
    AND dclocal_read_repair_chance = 0.1 
    AND default_time_to_live = 0 
    AND gc_grace_seconds = 172800 
    AND max_index_interval = 2048 
    AND memtable_flush_period_in_ms = 0 
    AND min_index_interval = 128 
    AND read_repair_chance = 0.1 
    AND speculative_retry = '99.0PERCENTILE'; 
CREATE INDEX dmp_user_profiles_latest_app_day_count_idx ON dmp.dmp_user_profiles_latest (day_count); 
CREATE INDEX dmp_user_profiles_latest_country_idx ON dmp.dmp_user_profiles_latest (country); 

Следующего мои искровой представить варианты

--class com.mobi.vserv.driver.Query5kPids1 
--conf spark.dynamicAllocation.enabled=true 
--conf spark.yarn.executor.memoryOverhead=1024  
--conf spark.yarn.driver.memoryOverhead=1024 
--executor-memory 1g 
--executor-cores 2 
--driver-memory 4g 

Но в журналах я видел запись в Кассандре занимает около 4-5 минут для загрузки 2 лакха (200000) не записывает (в то время как общее время выполнения 6+ минут)

Я добавил следующее в Спарк конф также

conf.set("spark.cassandra.output.batch.size.rows", "auto"); 
conf.set("spark.cassandra.output.concurrent.writes", "500"); 
conf.set("spark.cassandra.output.batch.size.bytes", "100000"); 
conf.set("spark.cassandra.output.throughput_mb_per_sec","1"); 

Но до сих пор нет никакого улучшения производительности, также увеличивая не жил в Amazon EMR Безразлично» t помочь.

Обратите внимание: в моей таблице Cassandra мы не использовали столбец разделения/кластеризации, поэтому это может послужить причиной такой медленной производительности.

Обратите внимание скорости сети 30 MB PS первичный ключом является буквенно-цифровым значение, например - a9be3eb4-751f-48ee-b593-b3f89e18622d

Cassandra.yaml

cluster_name: 'dmp Cluster' 
num_tokens: 100 
hinted_handoff_enabled: true 
max_hint_window_in_ms: 10800000 # 3 hours 
hinted_handoff_throttle_in_kb: 1024 
max_hints_delivery_threads: 2 
batchlog_replay_throttle_in_kb: 1024 
authenticator: AllowAllAuthenticator 
authorizer: AllowAllAuthorizer 
permissions_validity_in_ms: 2000 
partitioner: org.apache.cassandra.dht.Murmur3Partitioner 
data_file_directories: 
    - /data/cassandra/data 
disk_failure_policy: stop 
commit_failure_policy: stop 

key_cache_size_in_mb: 

key_cache_save_period: 14400 
row_cache_size_in_mb: 0 
row_cache_save_period: 0 
counter_cache_size_in_mb: 
counter_cache_save_period: 7200 
saved_caches_directory: /data/cassandra/saved_caches 
commitlog_sync: periodic 
commitlog_sync_period_in_ms: 10000 
seed_provider: 
- class_name: org.apache.cassandra.locator.SimpleSeedProvider 
    parameters: 
- seeds: "10.142.76.97,10.182.19.301" 

concurrent_reads: 256 
concurrent_writes: 128 
concurrent_counter_writes: 32 

memtable_allocation_type: heap_buffers 
memtable_flush_writers: 8 
index_summary_capacity_in_mb: 
index_summary_resize_interval_in_minutes: 60 
trickle_fsync: false 
trickle_fsync_interval_in_kb: 10240 
storage_port: 7000 
ssl_storage_port: 7001 
listen_address: 10.142.76.97 
start_rpc: true 
rpc_address: 10.23.244.172 
rpc_port: 9160 
rpc_keepalive: true 
rpc_server_type: sync 
thrift_framed_transport_size_in_mb: 15 
incremental_backups: false 
snapshot_before_compaction: false 
auto_snapshot: true 
tombstone_warn_threshold: 1000 
tombstone_failure_threshold: 100000 
column_index_size_in_kb: 64 
batch_size_warn_threshold_in_kb: 5 
concurrent_compactors: 4 
compaction_throughput_mb_per_sec: 64 
sstable_preemptive_open_interval_in_mb: 50 
read_request_timeout_in_ms: 500000 

range_request_timeout_in_ms: 1000000 

write_request_timeout_in_ms: 200000 

counter_write_request_timeout_in_ms: 500000 

cas_contention_timeout_in_ms: 100000 

endpoint_snitch: Ec2Snitch 

dynamic_snitch_update_interval_in_ms: 100 

dynamic_snitch_reset_interval_in_ms: 600000 

dynamic_snitch_badness_threshold: 0.1 

request_scheduler: org.apache.cassandra.scheduler.NoScheduler 

server_encryption_options: 
    internode_encryption: none 
    keystore: conf/.keystore 
    keystore_password: cassandra 
    truststore: conf/.truststore 
    truststore_password: cassandra 

client_encryption_options: 
    enabled: false 
    keystore: conf/.keystore 
    keystore_password: cassandra 

internode_compression: all 

inter_dc_tcp_nodelay: false 
+0

у нас есть структура вашей базы данных? – Whitefret

+0

Пожалуйста, проверьте сейчас. –

+0

У вас есть доступ к узлу? Чтобы увидеть, как ваша база данных разбросана по кластеру? Возможно, все ваши записи идут на один и тот же узел (при этом количество узлов бесполезно) – Whitefret

ответ

1

Как говорил в комментариях, кажется, ваша проблема исходит из вашего индекса на day_count.

Как видно из этого page, индекс не будет эффективным, если вы должны обновлять их все время, и это происходит, когда вы вставляете другое значение в day_count (что возможно каждый раз).

Вы должны переработать свою базу данных, но так как это ваша производственная среда, вы не можете просто DROP INDEX IF EXISTS keyspace.index_name, если этот показатель необходим, но вы можете создать вторичную базу данных, используя day_count в качестве первичного ключа, или использовать day_count как индекс заказа.

+1

Кстати, ** ЕСЛИ ** проблема исходит из вторичного индекса (мы не имеем абсолютной уверенности, если не измерить скорость вставки без этих индексов, ** меру не угадать **). Одним из решений может быть падение индекса для вставки всех данных в Cassandra, а затем воссоздание индекса. Это означало бы, что приложение не может запросить по индексу до тех пор, пока вставка не будет завершена. – doanduyhai

+0

Привет, как обновление, мы не используем этот Индекс для запросов, так что нормально, если я его отброшу?Также вы сказали, что это неэффективно, если его обновление каждый раз, и это происходит, когда мы вставляем другое значение, в моем случае мы каждый раз обновляем значения карты (индекс), а в некоторых случаях добавляем новое значение на карте. –

+0

Пожалуйста, предложите. Поскольку мне нужно удалить индекс. –

Смежные вопросы