2015-11-01 3 views
0

Я новичок в scala и изучаю, как обрабатывать твиттер-потоки с помощью scala. Я играл с примером кода ниже и пытался изменить его, чтобы сделать некоторые другие вещи.scala twitter streaming: плавление кортежей кортежей

https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala#L60

У меня есть кортеж кортежей (может быть кортеж не точное имя типа в потоковой но лестнице ..) суммирует каждый твит, как это: (имя пользователя, (кортеж хэштег), (кортеж пользователей упомянутый в этом чириканье))

И ниже - код, который я использовал для этого.

val sparkConf = new SparkConf().setAppName("TwitterPopularTags") 
val ssc = new StreamingContext(sparkConf, Seconds(duration.toInt)) 
val stream = TwitterUtils.createStream(ssc, None) 

// record username, hashtags, and mentioned user 
val distilled = stream.map(status => (status.getUser.getName, status.getText.split(" ").filter(_.startsWith("#")), status.getText.split(" ").filter(_.startsWith("@")))) 

Что я хочу сделать, это расплавить этот кортеж (тег, пользователь, (упомянутые пользователи)). Например, если исходный кортеж был

(Tom, (#apple, #banana), (@Chris, @Bob)) 

Я хочу, чтобы результат

((#apple, Tom, (@Chris, @Bob)), (#banana, Tom, (@Chris, @Bob)) 

Моя цель состоит в том, чтобы запустить reduceByKey на этот результат, используя хэштегом в качестве ключа, чтобы получить

(#apple, (list of users who tweeted this hashtag), (list of users who were mentioned in tweets with this hashtag)) 

Я не уверен, что «расплав» является правильным термином для использования для этой цели, но просто подумайте об этом как о сходстве с функцией расплава в R. Есть ли способ сделать это с использованием .map {case ...} или .fla tMap {case ...}? Или мне нужно определить функцию для выполнения этой работы?


ADDED уменьшить вопрос:

Как я сказал, что хочу, чтобы уменьшить результат с reduceByKeyAndWindow поэтому я написал следующий код:

// record username, hashtags, and mentioned user 
val distilled = stream.map(
    status => (status.getUser.getName, 
    status.getText.split(" ").filter(_.startsWith("#")), 
    status.getText.split(" ").filter(_.startsWith("@"))) 
) 

val byTags = distilled.flatMap{ 
    case (user, tag, mentioned) => tag.map((_ -> List(1, List(user), mentioned))) 
}.reduceByKeyAndWindow({ 
    case (a, b) => List(a._1+b._1, a._2++b._2, a._3++b._3)}, Seconds(60), Seconds(duration.toInt) 
) 

val sorted = byTags.map(_.flatten).map{ 
    case (tag, count, users, mentioned) => (count, tag, users, mentioned) 
}.transform(_.sortByKey(false)) 

// Print popular hashtags 
sorted.foreachRDD(rdd => { 
    val topList = rdd.take(num.toInt) 
    println("\n%d Popular tags in last %d seconds:".format(num.toInt, duration.toInt)) 
    topList.foreach{case (count, tag, users, mentioned) => println("%s (%s tweets), authors: %s, mentioned: %s".for$ 
}) 

Тем не менее, говорит

missing parameter type for expanded function 
[error] The argument types of an anonymous function must be fully known. (SLS 8.5) 
[error] Expected type was: ? 
[error]  }.reduceByKeyAndWindow({ 

Я попытался удалить скобки и футляры, написав (a: List, b: List) =>, но все они дали мне ошибку или связанные с типами. Каков правильный способ уменьшить его, чтобы пользователи и упомянутые были объединены каждые «длительность» секунд в течение 60 секунд?

ответ

0
hashTags.flatMap{ case (user, tags, mentions) => tags.map((_, user,mentions))} 

Самой беда, что в вашем вопросе является нерациональным термином tuple.

В python tuple означает неизменный тип, который может иметь любой размер.

В scala TupleN означает неизменяемый тип с параметрами типа N, содержащий ровно N членов соответствующих типов. Таким образом, Tuple2 - это не то же самое, что и Tuple3.

В Скале, которая полна неизменных типов, любой неизменная коллекция как List, Vector или Stream можно рассматривать как аналог Питона tuple. Но наиболее точными являются, вероятно, подтип immutable.IndexedSeq, например.Vector

Так методы, подобные String.splitAt, никогда не могли вернуть Tuple в смысле scala, просто потому, что во время компиляции количество элементов не могло быть известно.

При этом конкретном случае результат будет просто [Array][5]. И такое предположение я использовал в своем ответе.

Но в случае, если вы действительно коллекция (т.е. RDD) типа (String, (String, String), (String, String)) вы можете использовать эту почти эквивалентную часть кода

hashTags.flatMap { 
    case (user, (tag1, tag2), mentions) => Seq(tag1, tag2).map((_, user, mentions)) 
} 
+0

Спасибо за помощь! –

Смежные вопросы