2016-10-17 2 views
0

Я хочу сделать что-то очень просто: проверить, что представляет собой содержимое каждого раздела в первом RDD моего DStream. Это то, что я делаю сейчас:Использование mapPartitionsWithIndex для DStream - Spark Streaming

SparkConf sparkConfiguration= new SparkConf().setAppName("DataAnalysis").setMaster("local[*]"); 
    JavaStreamingContext sparkStrContext=new JavaStreamingContext(sparkConfiguration, Durations.seconds(1)); 
    JavaReceiverInputDStream<String> receiveParkingData=sparkStrContext.socketTextStream("localhost",5554); 


Time time=new Time(1000); 

JavaRDD<String>dataRDD= receiveParkingData.compute(time); 

//I get an error in this RDD 

    JavaRDD<String>indexDataRDD=dataRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() { 
     @Override 

     public Iterator<String> call(Integer integer, Iterator<String> stringIterator) throws Exception { 
      return null; 
     } 
    }); 

indexDataRDD.collect(); 

Так что я хочу, чтобы напечатать содержание каждого раздела и его ID. Однако на indexDataRDD я получаю это сообщение в своей IntelliJ IDE: mapPartitionsWithIndex (Function2<Integer, Iterator<String>, Iterator<String>>, boolean) in AbstractJavaRDDLike cannot be applied to (Function2<Integer, Iterator<String>, Iterator<String>>)

Может кто-нибудь помочь мне с этой проблемой? Есть ли другой, более простой способ получить контент в каждом разделе? Я действительно хочу знать конкретный контент каждого раздела. Большое вам спасибо.

+0

В чем причина вызова 'compute (time)'? Вы, как правило, не должны этого делать. Помните, что 'DStream' имеет один« RDD »в своем конвейере для любого заданного интервала пакета. –

ответ

0

Описание программы для mapPartitionsWithIndex для справки.

public class SparkDemo { 
public static void main(String[] args) { 
    SparkConf conf = new SparkConf().setAppName("SparkDemo").setMaster("local"); 
    JavaSparkContext sc = new JavaSparkContext(conf); 
    List<String> data = Arrays.asList("one","two","three","four","five"); 
    JavaRDD<String> javaRDD = sc.parallelize(data, 2); 
    JavaRDD<String> mapPartitionsWithIndexRDD = javaRDD 
      .mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() { 
       @Override 
       public Iterator<String> call(Integer index, Iterator<String> iterator) throws Exception { 
        LinkedList<String> linkedList = new LinkedList<String>(); 
        while (iterator.hasNext()){ 
          linkedList.add(Integer.toString(index) + "-" + iterator.next()); 
         } 
        return linkedList.iterator(); 
       } 
      }, false); 
    System.out.println("mapPartitionsWithIndexRDD " + mapPartitionsWithIndexRDD.collect()); 
    sc.stop(); 
    sc.close(); 
    } 
} 
Смежные вопросы