2016-03-24 2 views
0

Я использую искрообразование, чтобы непрерывно считывать данные с kafka и выполнять некоторую статистику. Я потоплю каждую секунду.Как найти сумму всех значений в RDD за DStream?

Итак, у меня есть одна секунда (dstreams). Каждый RDD внутри этого dstream содержит JSON.

Вот как у меня dstream:

kafkaStream = KafkaUtils.createDirectStream(stream, ['livedata'], {"metadata.broker.list": 'localhost:9092'}) 
raw = kafkaStream.map(lambda kafkaS: kafkaS[1]) 
clean = raw.map(lambda xs:json.loads(xs)) 

ОДИН ИЗ РДУ в моей чистой dstream выглядит следующим образом:

{u'epochseconds': 1458841451, u'protocol': 6, u'source_ip': u'192.168.1.124', \ 
u'destination_ip': u'149.154.167.120', u'datetime': u'2016-03-24 17:44:11', \ 
u'length': 1589, u'partitionkey': u'partitionkey', u'packetcount': 10,\ 
u'source_port': 43375, u'destination_port': 443} 

И я, как 30-150 таких РДУ в каждый DStream.

Теперь, что я пытаюсь сделать, это получить общую сумму «lengths» или сказать «packetcounts» в каждом DStream. То есть,

rdd1.length + rdd2.length + ... + LastRDDInTheOneSecondBatch.length 

Что я пробовал:

add=clean.map(lambda xs: (xs['length'],1)).reduceByKey(lambda a, b: a+b) 

Что я получил:

Частота вместо суммы.

(17, 6) 
(6, 24) 

Что мне делать, чтобы иметь общую сумму вместо частоты ключей?

ответ

1

Это потому, что вы используете значение «длина» в качестве ключа, попробуйте следующее:

add=clean.map(lambda xs: ('Lenght',xs['length'])).reduceByKey(lambda a, b: a+b) 

Вы должны установить тот же ключ для всех пар (ключ, значение). значением может быть полевая длина или другое поле для агрегирования ...

+0

Работает, спасибо! Еще один дополнительный вопрос: я хочу добавить еще 2 параметра из чистого в add, скажем ('partitionkey', 'timestamp'), вместе с параметром length только что вычисленным. Как мне это сделать? – HackCode

Смежные вопросы