2016-06-19 2 views
3

У меня есть приложение Spark Streaming, которое получает несколько сообщений JSON в секунду, каждый из которых имеет идентификатор, который идентифицирует их источник.Как сгруппировать ключ/значения по разделам в Spark?

Используя этот идентификатор в качестве ключа, я могу выполнить MapPartitionsToPair, создав таким образом JavaPairDStream с RDD пар ключ/значение, одну пару ключей для каждого раздела (так что если я получил 5 сообщений JSON, например, Я получаю RDD с 5 разделами, каждый с идентификатором сообщения в виде ключа, а само сообщение JSON в качестве значения).

Что я хочу сделать сейчас, хочу сгруппировать все значения, имеющие один и тот же ключ в один раздел. Например, если у меня есть 3 раздела с ключом «a» и 2 раздела с ключом «b», я хочу создать новый RDD с 2 разделами вместо 5, каждый раздел содержит все значения, которые имеет один ключ, один для 'a' и один для 'b'.

Как это сделать? Это мой код до сих пор:

JavaReceiverInputDStream<String> streamData2 = ssc.socketTextStream(args[0], Integer.parseInt(args[1]), 
      StorageLevels.MEMORY_AND_DISK_SER); 

JavaPairDStream<String,String> streamGiveKey= streamData2.mapPartitionsToPair(new PairFlatMapFunction<Iterator<String>, String, String>() { 
     @Override 
     public Iterable<Tuple2<String, String>> call(Iterator<String> stringIterator) throws Exception { 

      ArrayList<Tuple2<String,String>>a= new ArrayList<Tuple2<String, String>>(); 

      while (stringIterator.hasNext()){ 
       String c=stringIterator.next(); 
       if(c==null){ 
        return null; 

       } 

       JsonMessage retMap = new Gson().fromJson(c,JsonMessage.class); 
       String key= retMap.getSid(); 
       Tuple2<String,String> b= new Tuple2<String,String>(key,c); 
       a.add(b); 

       System.out.print(b._1+"_"+b._2); 
       // } 
       //break; 
      } 


      return a; 
     } 
    }); 

// создать JavaPairDStream, в котором каждый раздел содержит одну пару ключ/значение.

Я пытался использовать grouByKey(), но независимо от того, что количество сообщений было, я всегда получил номер раздела 2.

Как мне это сделать? Большое вам спасибо.

+0

Почему вы хотели бы 1 элемент каждого раздела? Какую проблему ты пытаешься решить? – maasg

ответ

4

Вы можете использовать

groupByKey(Integer numPartitions) 

и установите numPartitions равно числу различных ключей у вас есть.

Но .. вам нужно будет знать сколько различных ключей у вас есть фронт. У вас есть эта информация? Возможно нет. Итак, вам понадобится сделать дополнительную (/ избыточную) работу. Например. использование

countByKey 

как первый шаг. Это быстрее, чем groupByKey - так, по крайней мере, вы не были удвоение общее время обработки.

Обновление ОП задал вопрос о том, почему они получают по 2 раздела по умолчанию.

по умолчанию groupByKey использует defaultPartitioner() методы

groupByKey(defaultPartitioner(self)) 
  • , который выбирает Partitioner из родительского раздела с наибольшей мощностью.

- или он будет использовать spark.default.parallelism

+0

Спасибо, это определенно решает мою проблему. Однако, только один вопрос: знаете ли вы, почему 'groupByKey()' возвращает 2 раздела по умолчанию? Независимо от того, сколько входов я посылаю за пакетный интервал или выходы, которые у меня есть, кажется, что groupByKey не зависит от всего этого. Он просто возвращает 2, когда я делаю 'getNumPartitions' –

Смежные вопросы