У меня есть приложение Spark Streaming, которое получает несколько сообщений JSON в секунду, каждый из которых имеет идентификатор, который идентифицирует их источник.Как сгруппировать ключ/значения по разделам в Spark?
Используя этот идентификатор в качестве ключа, я могу выполнить MapPartitionsToPair
, создав таким образом JavaPairDStream с RDD пар ключ/значение, одну пару ключей для каждого раздела (так что если я получил 5 сообщений JSON, например, Я получаю RDD с 5 разделами, каждый с идентификатором сообщения в виде ключа, а само сообщение JSON в качестве значения).
Что я хочу сделать сейчас, хочу сгруппировать все значения, имеющие один и тот же ключ в один раздел. Например, если у меня есть 3 раздела с ключом «a» и 2 раздела с ключом «b», я хочу создать новый RDD с 2 разделами вместо 5, каждый раздел содержит все значения, которые имеет один ключ, один для 'a' и один для 'b'.
Как это сделать? Это мой код до сих пор:
JavaReceiverInputDStream<String> streamData2 = ssc.socketTextStream(args[0], Integer.parseInt(args[1]),
StorageLevels.MEMORY_AND_DISK_SER);
JavaPairDStream<String,String> streamGiveKey= streamData2.mapPartitionsToPair(new PairFlatMapFunction<Iterator<String>, String, String>() {
@Override
public Iterable<Tuple2<String, String>> call(Iterator<String> stringIterator) throws Exception {
ArrayList<Tuple2<String,String>>a= new ArrayList<Tuple2<String, String>>();
while (stringIterator.hasNext()){
String c=stringIterator.next();
if(c==null){
return null;
}
JsonMessage retMap = new Gson().fromJson(c,JsonMessage.class);
String key= retMap.getSid();
Tuple2<String,String> b= new Tuple2<String,String>(key,c);
a.add(b);
System.out.print(b._1+"_"+b._2);
// }
//break;
}
return a;
}
});
// создать JavaPairDStream, в котором каждый раздел содержит одну пару ключ/значение.
Я пытался использовать grouByKey()
, но независимо от того, что количество сообщений было, я всегда получил номер раздела 2.
Как мне это сделать? Большое вам спасибо.
Почему вы хотели бы 1 элемент каждого раздела? Какую проблему ты пытаешься решить? – maasg