2015-12-17 3 views
0

Как я новичок, чтобы Спарк Scala API У меня есть следующая проблема:Apache Спарк Scala API: ReduceByKeyAndWindow в Scala

В моем Java коде, который я сделал преобразование reduceByKeyAndWindow, но теперь я увидел, что есть только reduceByWindow (так как в Scala также нет PairDStream). Тем не менее, у меня есть первые шаги в Scala, которые работают сейчас:

import org.apache.hadoop.conf.Configuration; 
import [...] 

val serverIp = "xxx.xxx.xxx.xxx" 
val receiverInstances = 2 
val batchIntervalSec = 2 
val windowSize1hSek = 60 * 60 
val slideDurationSek = batchIntervalSec 

val streamingCtx = new StreamingContext(sc, Seconds(batchIntervalSec)) 

val hadoopConf = sc.hadoopConfiguration 
hadoopConf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem") 
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem") 
hadoopConf.set("fs.s3n.awsAccessKeyId", "xxx") 
hadoopConf.set("fs.s3n.awsSecretAccessKey", "xxx") 

// ReceiverInputDStream 
val receiver1 = streamingCtx.socketTextStream(serverIp, 7777) 
val receiver2 = streamingCtx.socketTextStream(serverIp, 7778) 

// DStream 
val inputDStream = receiver1.union(receiver2) 

// h.hh.plug.ts.val 
case class DebsEntry(house: Integer, household: Integer, plug: Integer, ts: Long, value: Float) 

// h.hh.plug.val 
case class DebsEntryWithoutTs(house: Integer, household: Integer, plug: Integer, value: Float) 

// h.hh.plug.1 
case class DebsEntryWithoutTsCount(house: Integer, household: Integer, plug: Integer, count: Long) 

val debsPairDStream = inputDStream.map(s => s.split(",")).map(s => DebsEntry(s(6).toInt, s(5).toInt, s(4).toInt, s(1).toLong, s(2).toFloat)) //.foreachRDD(rdd => rdd.toDF().registerTempTable("test")) 

val debsPairDStreamWithoutDuplicates = debsPairDStream.transform(s => s.distinct()) 

val debsPairDStreamConsumptionGreater0 = debsPairDStreamWithoutDuplicates.filter(s => s.value > 100.0) 

debsPairDStreamConsumptionGreater0.foreachRDD(rdd => rdd.toDF().registerTempTable("test3")) 

val debsPairDStreamConsumptionGreater0withoutTs = debsPairDStreamConsumptionGreater0.map(s => DebsEntryWithoutTs(s.house, s.household, s.plug, s.value)) 

// 5.) Average per Plug 
// 5.1) Create a count-prepared PairDStream (house, household, plug, 1) 
val countPreparedPerPlug1h = debsPairDStreamConsumptionGreater0withoutTs.map(s => DebsEntryWithoutTsCount(s.house, s.household, s.plug, 1)) 

// 5.2) ReduceByKeyAndWindow 
val countPerPlug1h = countPreparedPerPlug1h.reduceByWindow(...???...) 

До этапа 5.1 все работает нормально. В 5.2. Теперь я хочу подвести итоги 1 из countPreparedPerPlug1h, но только в том случае, если другие атрибуты (домашний, домашний, плагин) равны. - Цель состоит в том, чтобы получить количество записей в одной (домашней, домашней, пробковой) комбинации. Может кто-нибудь помочь? Спасибо!

EDIT - ПЕРВЫЙ TRY

Я пытался на шаге 5.2 следующее:

// 5.2) 
val countPerPlug1h = countPreparedPerPlug1h.reduceByKeyAndWindow((a,b) => a+b, Seconds(windowSize1hSek), Seconds(slideDurationSek)) 

Но здесь я получаю следующее сообщение об ошибке:

<console>:69: error: missing parameter type 
    val countPerPlug1h = countPreparedPerPlug1h.reduceByKeyAndWindow((a,b) => a+b, Seconds(windowSize1hSek), Seconds(slideDurationSek)) 
                    ^

Кажется, что я использую reduceByKeyAndWindow неверное преобразование, но где ошибка? Типы суммируемых значений - Int, см. CountPreparedPerPlug1h на шаге 5.1 выше.

ответ

1

У меня есть оно, похоже, похоже на следующий код:

val countPerPlug1h = countPreparedPerPlug1h.reduceByKeyAndWindow({(x, y) => x + y}, {(x, y) => x - y}, Seconds(windowSize1hSek), Seconds(slideDurationSek)) 

Спасибо за ваши подсказки, @Justin Pihony

2

Вы можете использовать reduceByKeyAndWindow еще проще в Scala, чем в вашей версии Java. У вас нет PairDStream, поскольку пары неявно определяются, и вы можете напрямую обращаться к парным методам. Неявное разрешение переходит к PairDStreamFunctions

Например:

val myPairDStream: DStream[KeyType, ValueType] = ... 
myPairDStream.reduceByKeyAndWindow(...) 

, который на самом деле следующие за кадром:

new PairDStreamFunctions(myPairDStream).reduceByKeyAndWindow(...) 

Эта обертка PairDStreamFunctions добавляется к любому DStream, которая состоит из a Tuple2

+0

Спасибо за ответ! Но как я могу использовать преобразование reduceByKeyAndWindow? Моя попытка не сработала: val countPerPlug1h = countPreparedPerPlug1h.reduceByKeyAndWindow ((a, b) => a + b, Seconds (windowSize1hSek), Seconds (slideDurationSek)) –

+0

Каков тип 'countPreparedPerPlug1h' и/или что такое ошибка, которую вы получаете? –

+0

Я добавил свой код и сообщение об ошибке на вопрос. –

Смежные вопросы