2016-08-07 6 views
3

У меня есть некоторые DStream в Спарк Scala, и я хочу, чтобы отсортировать его затем взять верхнюю Н. Проблема заключается в том, что всякий раз, когда я пытаюсь запустить его я получаю NotSerializableException и сообщение об исключении говорит:Сортировка DStream и принимая TopN

Это связано с тем, что объект DStream упоминается в пределах замыкания.

Проблема заключается в том, что я не знаю, как решить эту проблему:

Вот моя попытка:

package com.badrit.realtime 

import java.util.Date 

import com.badrit.drivers.UnlimitedSpaceTimeDriver 
import com.badrit.model.{CellBuilder, DataReader, Trip} 
import com.badrit.utility.Printer 
import org.apache.spark.SparkConf 
import org.apache.spark.rdd.RDD 
import org.apache.spark.streaming.dstream.{DStream, InputDStream} 
import org.apache.spark.streaming.{Duration, Milliseconds, StreamingContext} 

import scala.collection.mutable 

object StreamingDriver { 
    val appName: String = "HotSpotRealTime" 
    val hostName = "localhost" 
    val port = 5050 
    val constrains = UnlimitedSpaceTimeDriver.constrains; 
    var streamingRate = 1; 
    var windowSize = 8; 
    var slidingInterval = 2; 
    val cellBuilder = new CellBuilder(constrains) 
    val inputFilePath = "/home/ahmedelgamal/Downloads/green_tripdata_2015-02.csv" 

    def prepareTestData(sparkStreamCtx: StreamingContext): InputDStream[Trip] = { 

     val sparkCtx = sparkStreamCtx.sparkContext  
     val textFile: RDD[String] = sparkCtx.textFile(inputFilePath) 
     val data: RDD[Trip] = new DataReader().getTrips(textFile) 
     val groupedData = data.filter(_.pickup.date.before(new Date(2015, 1, 2, 0, 0, 0))) 
      .groupBy(trip => trip.pickup.date.getMinutes).sortBy(_._1).map(_._2).collect() 

     printf("Grouped Data Count is " + groupedData.length) 
     var dataQueue: mutable.Queue[RDD[Trip]] = mutable.Queue.empty; 

     groupedData.foreach(trips => dataQueue += sparkCtx.makeRDD(trips.toArray)) 
     printf("\n\nTest Queue size is " + dataQueue.size) 


     groupedData.zipWithIndex.foreach { case (trips: Iterable[Trip], index: Int) => { 
      println("Items List " + index) 


      val passengers: Array[Int] = trips.map(_.passengers).toArray 
      val cnt = passengers.length 
      println("Sum is " + passengers.sum) 
      println("Cnt is " + cnt) 

      val passengersRdd = sparkCtx.parallelize(passengers) 
      println("Mean " + passengersRdd.mean()) 
      println("Stdv" + passengersRdd.stdev()) 

     } 
     } 
     sparkStreamCtx.queueStream(dataQueue, true) 
    } 


    def cellCreator(trip: Trip) = cellBuilder.cellForCarStop(trip.pickup) 

    def main(args: Array[String]) { 
     if (args.length < 1) { 
      streamingRate = 1; 
      windowSize = 3 //2 hours 60 * 60 * 1000L 
      slidingInterval = 2 //0.5 hour 60 * 60 * 1000L 
     } 
     else { 
      streamingRate = args(0).toInt; 
      windowSize = args(1).toInt 
      slidingInterval = args(2).toInt 
     } 

     val sparkConf = new SparkConf().setAppName(appName).setMaster("local[*]") 
     val sparkStreamCtx = new StreamingContext(sparkConf, Milliseconds(streamingRate)) 
     sparkStreamCtx.sparkContext.setLogLevel("ERROR") 
     sparkStreamCtx.checkpoint("/tmp") 

     val data: InputDStream[Trip] = prepareTestData(sparkStreamCtx) 
     val dataWindow = data.window(new Duration(windowSize), new Duration(slidingInterval)) 

     //my main problem lies in the following line 
     val newDataWindow = dataWindow.transform(rdd => sparkStreamCtx.sparkContext.parallelize(rdd.take(10))) 
     newDataWindow.print 

     sparkStreamCtx.start() 
     sparkStreamCtx.awaitTerminationOrTimeout(1000) 

    } 
} 

Я не против каких-либо других способов сортировки DStream и получить его верхний N, а не мой путь.

+1

Почему вы используете 'sparkStreamCtx.sparkContext.parallelize' внутри' transform'? Почему вы просто не выполняете 'transform (rdd => rdd.take (10))'? –

+0

Причина '.take' возвращает массив, а не rdd, а' .transform' должен принимать rdd в качестве ввода и возвращает rdd (в случае, если не используется 'sparkStreamCtx.sparkContext.parallelize', возвращаемый массив - это не rdd). –

ответ

2

Вы можете использовать метод преобразования в объекте DStream, затем отсортировать входной RDD и взять n его элементов в списке, а затем отфильтровать исходный RDD, который будет содержаться в этом списке.

val n = 10 
val topN = result.transform(rdd =>{ 
    val list = rdd.sortBy(_._1).take(n) 
    rdd.filter(list.contains) 
}) 
topN.print 
Смежные вопросы