2015-09-14 3 views
2

Я хочу подсчитать отличное значение некоторых типов идентификаторов, представленных как RDD.Spark Streaming - DStream не имеет отчетливых()

В случае без потокового устройства это довольно просто. Скажем, IDs - это идентификатор удостоверений личности, считываемых из плоского файла.

print ("number of unique IDs %d" % (IDs.distinct().count())) 

Но я не могу сделать то же самое в потоковом футляре. Скажем, у нас есть streamIDs - DStream идентификаторов, считанных из сети.

print ("number of unique IDs from stream %d" % (streamIDs.distinct().count())) 

дает мне эту ошибку

AttributeError: 'TransformedDStream' object has no attribute 'distinct' 

Что я делаю неправильно? Как распечатать количество отдельных идентификаторов, появившихся во время этой партии?

ответ

8

С RDD у вас есть единственный результат, но с DStreams у вас есть ряд результатов с результатом на микро-пакет. Таким образом, вы не можете напечатать число уникальных идентификаторов раз, но вместо этого вы должны зарегистрировать действия, чтобы напечатать уникальные идентификаторы для каждого микро партии, которая является РДД, на котором вы можете использовать отчетливый:

streamIDs.foreachRDD(rdd => println(rdd.distinct().count())) 

помните, вы можете используйте window для создания трансформированного dstream с большими партиями:

streamIDs.window(Duration(1000)).foreachRDD(rdd => println(rdd.distinct().count())) 
Смежные вопросы