2016-10-16 2 views
4

У меня над кодом как искровой драйвер, когда я выполняю свою программу, он работает правильно, сохраняя необходимые данные в виде паркетного файла.Spark java Функция карты выполняется дважды

 String indexFile = "index.txt"; 
     JavaRDD<String> indexData = sc.textFile(indexFile).cache(); 
     JavaRDD<String> jsonStringRDD = indexData.map(new Function<String, String>() { 
     @Override 
     public String call(String patientId) throws Exception { 
     return "json array as string" 
     } 
     }); 

//1. Read json string array into a Dataframe (execution 1) 
     DataFrame dataSchemaDF = sqlContext.read().json(jsonStringRDD); 
//2. Save dataframe as parquet file (execution 2) 
     dataSchemaDF.write().parquet("md.parquet"); 

Но я наблюдал мою функцию сопоставителя на РДУ indexData становится выполняется дважды. первым, когда я прочитал jsonStringRdd как DataFrame с использованием SQLContext Во-вторых, когда я пишу dataSchemaDF паркетной файл

Можете ли вы направить меня на этом, как избежать этого повторное выполнение? Есть ли другой лучший способ преобразования строки json в Dataframe?

+0

Где вы видите две карты? RDD оцениваются лениво. Операция 'map' - это преобразование, а не действие, поэтому присвоение' jsonStringRDD' не должно запускаться немедленно. Возможно, пути выполнения для чтения Dataframe и записи паркета требуют сбора RDD. –

+0

У меня есть записи в моей функции mapper, я вижу их дважды в журнале. – blob

ответ

6

Я считаю, что причиной является отсутствие схемы для считывателя JSON. При выполнении:

sqlContext.read().json(jsonStringRDD); 

Спарк должен вывести схему для вновь созданного DataFrame. Для этого он имеет сканирования входной RDD и этот шаг выполняется с нетерпением

Если вы хотите, чтобы избежать этого, вы должны создать StructType, который описывает форму JSON документов:

StructType schema; 
... 

и использовать его при вы создаете DataFrame:

DataFrame dataSchemaDF = sqlContext.read().schema(schema).json(jsonStringRDD); 
Смежные вопросы