Я очень новичок в Spark, и у меня есть запрос, который выводит данные из двух таблиц Oracle. К таким таблицам необходимо присоединить поле, которое отлично работает с приведенным ниже кодом. Однако мне нужно применять фильтры, как в предложении Oracle where. Например, приведите сотрудников, возраст которых составляет от 25 до 50 лет. Я также должен применять фильтры GroupBy и сортировать окончательные результаты с OrderBy. Дело в том, что единственное действие, которое выполняется правильно, - это извлечение всех данных из таблиц и объединение между ними. Остальные фильтры просто не применяются, и я понятия не имею, почему. Не могли бы вы помочь мне с этим? Я уверен, что у меня что-то отсутствует, потому что не получены ошибки компиляции. Данные загружаются отлично, но предложения «где», похоже, не оказывают никакого влияния на данные, хотя есть сотрудники с возрастом от 25 до 50 лет. Большое спасибо!Фильтры искры никогда не применяются к DataFrame в Java
public static JavaRDD<Row> getResultsFromQuery(String connectionUrl) {
JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf()
.setAppName("SparkJdbcDs").setMaster("local"));
SQLContext sqlContext = new SQLContext(sparkContext);
Map<String, String> options = new HashMap<>();
options.put("driver", "oracle.jdbc.OracleDriver");
options.put("url", connectionUrl);
options.put("dbtable", "EMPLOYEE");
DataFrameReader dataFrameReader = sqlContext.read().format("jdbc")
.options(options);
DataFrame dataFrameFirstTable = dataFrameReader.load();
options.put("dbtable", "DEPARTMENT");
dataFrameReader = sqlContext.read().format("jdbc").options(options);
DataFrame dataFrameSecondTable = dataFrameReader.load();
//JOIN. IT WORKS JUST FINE!!!
DataFrame resultingDataFrame = dataFrameFirstTable.join(dataFrameSecondTable,
"DEPARTMENTID");
//FILTERS. THEY DO NOT THROW ERROR, BUT ARE NOT APPLIED. RESULTS ARE ALWAYS THE SAME, WITHOUT FILTERS
resultingDataFrame.where(resultingDataFrame.col("AGE").geq(25));
resultingDataFrame.where(resultingDataFrame.col("AGE").leq(50));
JavaRDD<Row> resultFromQuery = resultingDataFrame.toJavaRDD();
//HERE I CONFIRM THAT THE NUMBER OF ROWS GOTTEN IS ALWAYS THE SAME, SO THE FILTERS DO NOT WORK.
System.out.println("Number of rows "+resultFromQuery.count());
return resultFromQuery;
}
Джастин, большое спасибо за ваш ответ! Он работал для диапазонов AGE. Однако, когда я использую .groupBy и пытаюсь сделать то же самое, что и результат в DataFrame, он говорит, что полученные данные должны храниться в объекте GroupedData, который отлично работает, когда я это делаю. Проблема в том, что я не знаю, как итерировать такой объект или как его преобразовать в JavaRDD, что и должен вернуть метод в конце. У вас или у кого-нибудь еще есть идея о том, как достичь этого? Я действительно благодарю вас за ваш интерес и помощь. –
Я присоединяю код, который я использую, чтобы сгруппировать данные: 'GroupedData resultGroupData = lessThanGreaterThan.groupBy (resultDataFrame.col (" DEPARTMENT_NAME ")); // Я применяю команду groupBy ' System.out.println ("Number полученных данных: «+ resultGroupData .count(). count()); // здесь я подтверждаю, что он получает 5 групп, которые представляют собой количество отделов, которые существуют в БД –
. У вас есть доступные методы найденных в документах. https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.sql.GroupedData Вероятно, вы хотите использовать agg. Или вы можете перейти к RDD и использовать THAT groupBy, который возвращает RDD [(K, Iterable)] –