2

Я получаю поток данных, которые я могу полностью сканировать. Все данные попадают в Кафку, а затем отправляются в Кассандру. Теперь потребитель кафки очень медленный, намного медленнее, чем производитель. Я хочу, чтобы они были точно такими же. Что я могу сделать для достижения этого результата или что не так с моим кодом?Почему мой потребитель Кафки намного медленнее, чем мой производитель кафки?

Вот мой Кафка потребительский код в Python:

import logging 
from cassandra.cluster import Cluster 
from kafka.consumer.kafka import KafkaConsumer 
from kafka.consumer.multiprocess import MultiProcessConsumer 
from kafka.client import KafkaClient 
from kafka.producer.simple import SimpleProducer 
import json 
from datetime import datetime, timedelta 
from cassandra import ConsistencyLevel 
from dateutil.parser import parse 
logging.basicConfig(filename='consumer.log', format='[%(asctime)-15s] %(name)s %(levelname)s %(message)s', level=logging.DEBUG) 
class Whitelist(logging.Filter): 
    def __init__(self, *whitelist): 
     self.whitelist = [logging.Filter(name) for name in whitelist] 
    def filter(self, record): 
     return any(f.filter(record) for f in self.whitelist) 
for handler in logging.root.handlers: 
    handler.addFilter(Whitelist('consumer')) 
log = logging.getLogger('consumer') 
try: 
    cluster = Cluster(['localhost']); session = cluster.connect(keyspace) 
    kafka = KafkaClient('localhost') 
    consumer = MultiProcessConsumer(kafka, b'default',kafkatopic,num_procs=16, max_buffer_size=None) 
    article_lookup_stmt = session.prepare("SELECT * FROM articles WHERE id in ?") 
    article_lookup_stmt.consistency_level = ConsistencyLevel.QUORUM 
    article_insert_stmt = session.prepare("INSERT INTO articles(id, thumbnail, title, url, created_at, scheduled_for, source, category, channel,genre) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") 
    article_by_created_at_insert_stmt = session.prepare("INSERT INTO article_by_created_at(source, created_at, article) VALUES (?, ?, ?)") 
    article_by_url_insert_stmt = session.prepare("INSERT INTO article_by_url(url, article) VALUES (?, ?)") 
    schedules_insert_stmt = session.prepare("INSERT INTO schedules(source,type,scheduled_for,id) VALUES (?,?,?,?)") 
    axes_insert_stmt = session.prepare("INSERT INTO axes(article,at,comments,likes,reads,shares) VALUES (?, ?, ?, ?, ?, ?)") 
    while True: 
     messages = consumer.get_messages(count=16) 
     if len(messages) == 0: 
      print 'IDLE' 
      continue 
     for message in messages: 
      try: 
       response = json.loads(message.value) 
       data = json.loads(response['body']) 
       print response['body'] 
       articles = data['articles'] 
       idlist = [r['id'] for r in articles] 
       if len(idlist)>0: 
        article_rows = session.execute(article_lookup_stmt,[idlist]) 
        rows = [r.id for r in article_rows] 
        for article in articles: 
         try: 
          if not article['id'] in rows: 
           article['created_at'] = parse(article['created_at']) 
           scheduled_for=(article['created_at'] + timedelta(minutes=60)).replace(second=0, microsecond=0) 
           session.execute(article_insert_stmt, (article['id'], article['thumbnail'], article['title'], article['url'], article['created_at'], scheduled_for, article['source'], article['category'], article['channel'],article['genre'])) 
           session.execute(article_by_created_at_insert_stmt, (article['source'], article['created_at'], article['id'])) 
           session.execute(article_by_url_insert_stmt, (article['url'], article['id'])) 
           session.execute(schedules_insert_stmt,(article['source'],'article',scheduled_for,article['id'])) 
           log.debug('%s %s' % (article['id'],article['created_at'])) 
          session.execute(axes_insert_stmt,(article['id'],datetime.utcnow(),article['axes']['comments'],article['axes']['likes'],0,article['axes']['shares'])) 
         except Exception as e: 
          print 'error==============:',e 
          continue 
      except Exception as e: 
       print 'error is:',e 
       log.exception(e.message) 
except Exception as e: 
    log.exception(e.message) 

EDIT:

Я также добавил мои результаты профиля и медленная линия кода, кажется,

article_rows = session.execute(article_lookup_stmt,[idlist]) 

Sun Feb 14 16:01:01 2016 consumer.out 

     395793 function calls (394232 primitive calls) in 23.074 seconds 

    Ordered by: internal time 

    ncalls tottime percall cumtime percall filename:lineno(function) 
     141 10.695 0.076 10.695 0.076 {select.select} 
    7564 10.144 0.001 10.144 0.001 {method 'acquire' of 'thread.lock' objects} 
     1 0.542 0.542 23.097 23.097 consumer.py:5(<module>) 
    1510 0.281 0.000 0.281 0.000 {method 'recv' of '_socket.socket' objects} 
     38 0.195 0.005 0.195 0.005 /usr/local/lib/python2.7/json/decoder.py:371(raw_decode) 
     13 0.078 0.006 0.078 0.006 {time.sleep} 
    2423 0.073 0.000 0.137 0.000 /usr/local/lib/python2.7/logging/__init__.py:242(__init__) 
    22112 0.063 0.000 0.095 0.000 /usr/local/lib/python2.7/site-packages/kafka/util.py:73(relative_unpack) 
     3 0.052 0.017 0.162 0.054 /usr/local/lib/python2.7/site-packages/kafka/protocol.py:386(decode_metadata_response) 
2006/2005 0.047 0.000 0.055 0.000 /usr/local/lib/python2.7/site-packages/cassandra/policies.py:350(make_query_plan) 
    1270 0.032 0.000 0.034 0.000 /usr/local/lib/python2.7/threading.py:259(__init__) 
     3 0.024 0.008 0.226 0.075 /usr/local/lib/python2.7/site-packages/kafka/client.py:456(load_metadata_for_topics) 
     33 0.024 0.001 0.031 0.001 /usr/local/lib/python2.7/collections.py:288(namedtuple) 
    15374 0.024 0.000 0.024 0.000 {built-in method new of type object at 0x788ee0} 
     141 0.023 0.000 11.394 0.081 /usr/local/lib/python2.7/site-packages/kafka/client.py:153(_send_broker_aware_request) 
     288 0.020 0.000 0.522 0.002 /usr/local/lib/python2.7/site-packages/kafka/conn.py:84(_read_bytes) 
    2423 0.018 0.000 0.029 0.000 /usr/local/lib/python2.7/logging/__init__.py:1216(findCaller) 
     115 0.018 0.000 11.372 0.099 /usr/local/lib/python2.7/site-packages/kafka/consumer/kafka.py:303(fetch_messages) 
    2423 0.018 0.000 0.059 0.000 /usr/local/lib/python2.7/logging/__init__.py:1303(callHandlers) 
    24548 0.017 0.000 0.017 0.000 {_struct.unpack} 
44228/43959 0.016 0.000 0.016 0.000 {len} 

Спасибо за ваш ответ.

+1

Как уже указано, на ваш вопрос не хватает деталей, необходимых для правильного ответа. Используйте профилировщик, чтобы узнать, какие части вашего скрипта работают медленно, а затем попробуйте переписать эти части, чтобы сделать их быстрее. Подробнее см. Https://docs.python.org/2/library/profile.html. – liori

+0

часть моего скрипта, которая медленна для сообщений в сообщениях. – peter

+1

Ваши проблемы с потребителем 5 запросов cassandra - нет никаких указаний на то, что делает ваш потребитель, но похоже, что 5 синхронных запросов CQL потенциально могут занимать больше времени, чем тривиальный производитель. –

ответ

2

Вы можете попробовать запустить пользователя без сохранения на C *, чтобы вы могли наблюдать, насколько это сильно отличается.
Если окажется, что сохранение в C * является точкой затвора (что я предполагаю, что это так), у вас может быть пул потоков (более 16 потоков, которые ваш потребитель порождает), единственной обязанностью которых является запись на C *.

Таким образом, вы выгрузили бы медленную часть кода, которая оставила бы только тривиальные части в потребительском коде.
Вы можете использовать from multiprocessing import Pool.
Подробнее here.

+0

Спасибо! Что, если я уже делаю это, но все еще медленно? Вы правильно пишете, что Кассандра - это затвор. Но не знаю, почему это так медленно, когда данные увеличиваются. – peter

+1

Если вы используете код, который вы опубликовали, то вы еще этого не делаете :) Но если вы это сделаете, и заметите, что независимо от количества потоков в пуле потоков, вы по-прежнему примерно равны пропускной способности , то вам придется настроить C *. Поскольку мы знаем, что самая заметная особенность Кассандры состоит в том, чтобы всегда иметь возможность принимать практически неограниченное количество данных (по мере увеличения шкалы), это должно быть выполнимым, и вы должны отправить этот вопрос в тег C *. –

+0

Большое спасибо. Извините за многие последующие вопросы, у меня просто еще один :): Если я уже использую многопроцессорный потребитель, почему мне все еще нужно использовать многопоточность? thanks – peter

Смежные вопросы