0

следующее мое кодирование python для производителя kafka, я не уверен, что сообщения могут быть опубликованы в Kafka Broker или нет. Потому что потребительская сторона не получает никаких сообщений. Моя программа python для пользователей работает нормально, а я тестирую ее с помощью команды консоли производителя.Kafka Consumer не получал сообщений от своего продюсера

from __future__ import print_function 

import sys 
from pyspark import SparkContext 
from kafka import KafkaClient, SimpleProducer 

if __name__ == "__main__": 

if len(sys.argv) != 2: 
    print("Usage:spark-submit producer1.py <input file>", file=sys.stderr) 
    exit(-1) 

sc = SparkContext(appName="PythonRegression") 

def sendkafka(messages): 
    ## Set broker port 
    kafka = KafkaClient("localhost:9092") 
    producer = SimpleProducer(kafka, async=True, batch_send_every_n=5, 
batch_send_every_t=10) 
    send_counts = 0 
    for message in messages: 
     try: 
      print(message) 
      ## Set topic name and push messages to the Kafka Broker 
      yield producer.send_messages('test', message.encode('utf-8')) 
     except Exception, e: 
      print("Error: %s" % str(e)) 
     else: 
      send_counts += 1 
    print("The count of prediction results which were sent IN THIS PARTITION 
is %d.\n" % send_counts) 

## Connect and read the file.  
rawData = sc.textFile(sys.argv[1]) 

## Find and skip the first row 
dataHeader = rawData.first() 
data = rawData.filter(lambda x: x != dataHeader) 

## Collect the RDDs. 
sentRDD = data.mapPartitions(sendkafka) 
sentRDD.collect() 

## Stop file connection 
sc.stop() 

Это мой «Потребитель» питон кодирования

from __future__ import print_function 
import sys 
from pyspark import SparkContext 
from pyspark.streaming import StreamingContext 
from pyspark.streaming.kafka import KafkaUtils 

if len(sys.argv) < 3: 
print ("Program to pulls the messages from kafka brokers.") 
print("Usage: consume.py <zk> <topic>", file=sys.stderr) 

else: 
## Flow 
## Loads settings from system properties, for launching of spark-submit. 
sc = SparkContext(appName="PythonStreamingKafkaWordCount") 

## Create a StreamingContext using an existing SparkContext. 
ssc = StreamingContext(sc, 10) 

## Get everything after the python script name 
zkQuorum, topic = sys.argv[1:] 

## Create an input stream that pulls messages from Kafka Brokers. 
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", 
{topic: 1}) 

## 
lines = kvs.map(lambda x: x[1]) 

## Print the messages pulled from Kakfa Brokers 
lines.pprint() 

## Save the pulled messages as file 
## lines.saveAsTextFiles("OutputA") 

## Start receiving data and processing it 
ssc.start() 

## Allows the current process to wait for the termination of the context 
ssc.awaitTermination() 

ответ

0

Я обычно отлаживать такие вопросы, используя Кафка-консоль-потребитель (часть Apache Кафки) потреблять от темы Вы пробовали производящие к. Если потребитель консоли получает сообщения, вы знаете, что они прибыли в Кафку.

Если вы сначала запустили продюсер, позвольте ему закончить, а затем запустите потребителя, то проблема может заключаться в том, что потребитель начинает с конца журнала и ждет дополнительных сообщений. Либо убедитесь, что вы сначала запускаете пользователя, либо настройте его для автоматического запуска в начале (извините, не знаете, как это сделать с вашим клиентом Python).

+0

Пробовал это раньше с собственным ноутбуком не работает Но я попробовал на другом сервере его работу В любом случае, спасибо –

0

Вы можете проверить количество сообщений в теме, если они увеличиваются с запросами Производят:

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell \ 
--broker-list <Kafka_broker_hostname>:<broker_port> --topic Que1 \ 
--time -1 --offsets 1 | awk -F ":" '{sum += $3} END {print sum}' 

Если количество сообщений увеличивается, то это означает, что производитель работает отлично.

0

Хорошо, я думаю, что что-то не так с моим местным Zookeeper или Kafka, потому что я тестирую его на другом сервере, он отлично работает. Тем не менее, спасибо тем, кто отвечает мне;)

Смежные вопросы