2016-04-03 4 views
0

Im пытается использовать данные из темы, установив смещение, но получить ошибку утверждение -AssertionError: Unassigned раздел

from kafka import KafkaConsumer 

consumer = KafkaConsumer('foobar1', 
         bootstrap_servers=['localhost:9092']) 
print 'process started' 
print consumer.partitions_for_topic('foobar1') 
print 'done' 
consumer.seek(0,10) 

for message in consumer: 
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, 
              message.offset, message.key, 
              message.value)) 
print 'process ended' 

Ошибка: -

Traceback (most recent call last): 
    File "/Users/pn/Documents/jobs/ccdn/kafka_consumer_1.py", line 21, in <module> 
    consumer.seek(0,10) 
    File "/Users/pn/.virtualenvs/vpsq/lib/python2.7/site-packages/kafka/consumer/group.py", line 549, in seek 
    assert partition in self._subscription.assigned_partitions(), 'Unassigned partition' 
AssertionError: Unassigned partition 

ответ

1

Вы должны вызвать consumer.assign() со списком разделов TopicPartitions перед вызовом seek. Также обратите внимание, что первым аргументом для поиска также является Тема. См KafkaConsumer API

0

В моем случае с Kafka 0.9 и kafka-python, назначение раздел произошло во время for message in consumer. Итак, искомый отпрыск должен после итерации. Я сбросил смещение моей группы по следующему коду:

import kafka 

ps = [] 
for i in xrange(topic_partition_number): 
    ps.append(kafka.TopicPartition(topic, i)) 

consumer = kafka.KafkaConsumer(topic, bootstrap_servers=address, group_id=group) 
for msg in consumer: 
    print msg 
    consumer.seek_to_beginning(*ps) 
    consumer.commit() 
    break 
Смежные вопросы