Я новичок в scala и изучаю, как обрабатывать твиттер-потоки с помощью scala. Я играл с примером кода ниже и пытался изменить его, чтобы сделать некоторые другие вещи.scala twitter streaming: плавление кортежей кортежей
У меня есть кортеж кортежей (может быть кортеж не точное имя типа в потоковой но лестнице ..) суммирует каждый твит, как это: (имя пользователя, (кортеж хэштег), (кортеж пользователей упомянутый в этом чириканье))
И ниже - код, который я использовал для этого.
val sparkConf = new SparkConf().setAppName("TwitterPopularTags")
val ssc = new StreamingContext(sparkConf, Seconds(duration.toInt))
val stream = TwitterUtils.createStream(ssc, None)
// record username, hashtags, and mentioned user
val distilled = stream.map(status => (status.getUser.getName, status.getText.split(" ").filter(_.startsWith("#")), status.getText.split(" ").filter(_.startsWith("@"))))
Что я хочу сделать, это расплавить этот кортеж (тег, пользователь, (упомянутые пользователи)). Например, если исходный кортеж был
(Tom, (#apple, #banana), (@Chris, @Bob))
Я хочу, чтобы результат
((#apple, Tom, (@Chris, @Bob)), (#banana, Tom, (@Chris, @Bob))
Моя цель состоит в том, чтобы запустить reduceByKey на этот результат, используя хэштегом в качестве ключа, чтобы получить
(#apple, (list of users who tweeted this hashtag), (list of users who were mentioned in tweets with this hashtag))
Я не уверен, что «расплав» является правильным термином для использования для этой цели, но просто подумайте об этом как о сходстве с функцией расплава в R. Есть ли способ сделать это с использованием .map {case ...} или .fla tMap {case ...}? Или мне нужно определить функцию для выполнения этой работы?
ADDED уменьшить вопрос:
Как я сказал, что хочу, чтобы уменьшить результат с reduceByKeyAndWindow поэтому я написал следующий код:
// record username, hashtags, and mentioned user
val distilled = stream.map(
status => (status.getUser.getName,
status.getText.split(" ").filter(_.startsWith("#")),
status.getText.split(" ").filter(_.startsWith("@")))
)
val byTags = distilled.flatMap{
case (user, tag, mentioned) => tag.map((_ -> List(1, List(user), mentioned)))
}.reduceByKeyAndWindow({
case (a, b) => List(a._1+b._1, a._2++b._2, a._3++b._3)}, Seconds(60), Seconds(duration.toInt)
)
val sorted = byTags.map(_.flatten).map{
case (tag, count, users, mentioned) => (count, tag, users, mentioned)
}.transform(_.sortByKey(false))
// Print popular hashtags
sorted.foreachRDD(rdd => {
val topList = rdd.take(num.toInt)
println("\n%d Popular tags in last %d seconds:".format(num.toInt, duration.toInt))
topList.foreach{case (count, tag, users, mentioned) => println("%s (%s tweets), authors: %s, mentioned: %s".for$
})
Тем не менее, говорит
missing parameter type for expanded function
[error] The argument types of an anonymous function must be fully known. (SLS 8.5)
[error] Expected type was: ?
[error] }.reduceByKeyAndWindow({
Я попытался удалить скобки и футляры, написав (a: List, b: List) =>, но все они дали мне ошибку или связанные с типами. Каков правильный способ уменьшить его, чтобы пользователи и упомянутые были объединены каждые «длительность» секунд в течение 60 секунд?
Спасибо за помощь! –