2016-01-13 1 views
0

Это мой запрос, если текущие данные ID присутствует или отсутствует в базе данных Cassandra:Почему моя вставка данных в мою базу данных Cassandra иногда стабильна и иногда медленная?

row = session.execute("SELECT * FROM articles where id = %s", [id]) 

Исправленные сообщений в Кафки, то определить, существует ли или нет это сообщение в базе данных Cassandra, если она не существует, то он должен выполнить операцию вставки, если она существует, ее не следует вставлять в данные.

messages = consumer.get_messages(count=25) 

if len(messages) == 0: 
    print 'IDLE' 
    sleep(1) 
    continue 

for message in messages: 
    try: 
     message = json.loads(message.message.value) 
     data = message['data'] 
     if data: 
      for article in data: 
       source = article['source'] 
       id = article['id'] 
       title = article['title'] 
       thumbnail = article['thumbnail'] 
       #url = article['url'] 
       text = article['text'] 
       print article['created_at'],type(article['created_at']) 
       created_at = parse(article['created_at']) 
       last_crawled = article['last_crawled'] 
       channel = article['channel']#userid 
       category = article['category'] 
       #scheduled_for = created_at.replace(minute=created_at.minute + 5, second=0, microsecond=0) 
       scheduled_for=(datetime.utcnow() + timedelta(minutes=5)).replace(second=0, microsecond=0) 
       row = session.execute("SELECT * FROM articles where id = %s", [id]) 
       if len(list(row))==0: 
       #id parse base62 
        ids = [id[0:2],id[2:9],id[9:16]] 
        idstr='' 
        for argv in ids: 
         num = int(argv) 
         idstr=idstr+encode(num) 
        url='http://weibo.com/%s/%s?type=comment' % (channel,idstr) 
        session.execute("INSERT INTO articles(source, id, title,thumbnail, url, text, created_at, last_crawled,channel,category) VALUES (%s,%s, %s, %s, %s, %s, %s, %s, %s, %s)", (source, id, title,thumbnail, url, text, created_at, scheduled_for,channel,category)) 
        session.execute("INSERT INTO schedules(source,type,scheduled_for,id) VALUES (%s, %s, %s,%s) USING TTL 86400", (source,'article', scheduled_for, id)) 
        log.info('%s %s %s %s %s %s %s %s %s %s' % (source, id, title,thumbnail, url, text, created_at, scheduled_for,channel,category)) 

    except Exception, e: 
     log.exception(e) 
     #log.info('error %s %s' % (message['url'],body)) 
     print e 
     continue 

У меня есть один идентификатор, который имеет только одну уникальную строку таблицы, в которой я хочу быть таким. Как только я добавляю разные расписания для уникального идентификатора, моя система выходит из строя. Добавьте это if len(list(row))==0: - это правильная мысль, но после этого моя система очень медленная.

Это мое описание таблицы:

DROP TABLE IF EXISTS schedules; 

CREATE TABLE schedules (
source text, 
type text, 
scheduled_for timestamp, 
id text, 
PRIMARY KEY (source, type, scheduled_for, id) 
); 

Это scheduled_for изменчива. Здесь также конкретный пример:

Hao article 2016-01-12 02:09:00+0800 3930462206848285 
Hao article 2016-01-12 03:09:00+0801 3930462206848285 
Hao article 2016-01-12 04:09:00+0802 3930462206848285 
Hao article 2016-01-12 05:09:00+0803 3930462206848285 

Вот моя статья CQL схема:

CREATE TABLE crawler.articles (
    source text, 
    created_at timestamp, 
    id text, 
    category text, 
    channel text, 
    last_crawled timestamp, 
    text text, 
    thumbnail text, 
    title text, 
    url text, 
    PRIMARY KEY (source, created_at, id) 
) WITH CLUSTERING ORDER BY (created_at DESC, id ASC) 
AND bloom_filter_fp_chance = 0.01 
AND caching = '{"keys":"ALL", "rows_per_partition":"ALL"}' 
AND comment = '' 
AND compaction = {'sstable_size_in_mb': '160', 'enabled': 'true', 'unchecked_tombstone_compaction': 'false', 'tombstone_compaction_interval': '86400', 'tombstone_threshold': '0.2', 'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'} 
AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'} 
AND dclocal_read_repair_chance = 0.1 
AND default_time_to_live = 604800 
AND gc_grace_seconds = 864000 
AND max_index_interval = 2048 
AND memtable_flush_period_in_ms = 0 
AND min_index_interval = 128 
AND read_repair_chance = 0.0 
AND speculative_retry = '99.0PERCENTILE'; 

CREATE INDEX articles_id_idx ON crawler.articles (id); 
CREATE INDEX articles_url_idx ON crawler.articles (url); 
+1

Можете ли вы предоставить схему таблиц для статей? Таким образом, мы можем видеть первичные ключи и все детали. –

+0

Я добавил схему статей в сообщение выше. Спасибо за ваш ответ! – peter

ответ

1

Глядя на Схему и как вы используете его, я мог бы предположить, что вторичный индекс поля идентификатора создает проблемы и замедления запросов. Вы можете проверить более подробную информацию о том, почему вторичные индексы плохи во многих местах, просто приходя в Google (это source - хорошее начало, также DataStax documentation page). В основном, когда вы используете вторичный индекс в 5 узловых кластере, вы должны ударить по каждому узлу, чтобы найти элемент, который вы ищете, и при использовании первичного ключа каждый узел знает, какой узел хранит данные.

Вторичные индексы особенно плохи, если вы используете данные с высокой мощностью (производительность падает при добавлении дополнительных элементов), и вы используете идентификатор, который отличается для каждой статьи. Они в порядке, когда вы используете низкую мощность, например, индексируете некоторые данные по дням недели (вы знаете, что будет только 7 дней в неделю, чтобы вы могли предсказать размер индексной таблицы) или категории в вашем случае, если у вас есть конечное число категорий ,

Я бы посоветовал создать еще одну таблицу, article_by_id которая будет обратным индексом к вашей таблице статей. Вы можете использовать Lightweight Transaction и сделать INSERT ... IF NOT EXISTS первым в эту таблицу, и если операция вернет true (это означает, что вставка прошла, так что запись ранее не была), вы можете сделать обычный INSERT в своей таблице articles, и если он вернет false (это означает, что данные не были вставлены, потому что это уже существует), вы можете пропустить INSERT в таблицу articles.

Вот таблица (я бы предложил использовать UUID вместо текста для ID, но я создал таблицу на основе вашей таблицы статьи):

CREATE TABLE article_by_id (
    id text, 
    source text, 
    created_at timestamp, 
    PRIMARY KEY (id) 
) WITH comment = 'Article by id.'; 

Таким образом, вы всегда можете найти все части ключа на основе просто ID. Если ID - ваш входной параметр, выбор из этой таблицы даст вам источник и created_at.

Вот вставка запрос, который возвращает истину или ложь:

INSERT INTO article_by_id(id, source, created_at) VALUES (%s,%s, %s) IF NOT EXISTS; 

И еще подсказка, если вы можете найти ключ на основе некоторых не изменяемых данных в вашей организации, чем вам не нужно второй таблицы.В примере, если source и created_at уникально идентифицирует статью в вашей системе и никогда не меняются, вы можете удалить идентификатор и использовать свою оригинальную таблицу.

+0

Отличный ответ спасибо. как вы думаете, лучший первичный ключ для статей? – peter

+1

Вы можете оставить его как есть в таблице статей (источник PK, а затем сгруппировать по created_at и id на конце, чтобы обеспечить уникальность). Эта таблица отлично подходит для запросов диапазона, и вы распространяете данные из нескольких источников по нескольким разделам, предотвращая длинные строки и горячие данные. Я просто предлагаю иметь дополнительную таблицу с идентификатором, чтобы найти ее проще. В противном случае вы, вероятно, можете вообще не указывать идентификатор, если URL-адрес уникален для каждой статьи, и используйте URL вместо идентификатора. Затем измените ID на URL во всех местах. –

+0

еще один вопрос, может быть, вы можете помочь: если у нас есть структура таблицы ниже, как я могу запросить «source = 'abc» и created_at> =' 2016-01-01 00:00:00 '?? Это проблема сейчас, так как cassandra не позволяет запрашивать неиндексные поля. CREATE TABLE статьи ( идентификатор текста, источник текста, created_at метка времени, категория текста, текст канала, last_crawled метка времени, текст текст, текст эскизов, текст заголовка, URL текст, PRIMARY KEY (id) ) – peter

Смежные вопросы