2015-07-08 3 views
1

Я очень новичок в Spark, и у меня есть запрос, который выводит данные из двух таблиц Oracle. К таким таблицам необходимо присоединить поле, которое отлично работает с приведенным ниже кодом. Однако мне нужно применять фильтры, как в предложении Oracle where. Например, приведите сотрудников, возраст которых составляет от 25 до 50 лет. Я также должен применять фильтры GroupBy и сортировать окончательные результаты с OrderBy. Дело в том, что единственное действие, которое выполняется правильно, - это извлечение всех данных из таблиц и объединение между ними. Остальные фильтры просто не применяются, и я понятия не имею, почему. Не могли бы вы помочь мне с этим? Я уверен, что у меня что-то отсутствует, потому что не получены ошибки компиляции. Данные загружаются отлично, но предложения «где», похоже, не оказывают никакого влияния на данные, хотя есть сотрудники с возрастом от 25 до 50 лет. Большое спасибо!Фильтры искры никогда не применяются к DataFrame в Java

public static JavaRDD<Row> getResultsFromQuery(String connectionUrl) { 

    JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf() 
      .setAppName("SparkJdbcDs").setMaster("local")); 
    SQLContext sqlContext = new SQLContext(sparkContext); 

    Map<String, String> options = new HashMap<>(); 
    options.put("driver", "oracle.jdbc.OracleDriver"); 
    options.put("url", connectionUrl); 
    options.put("dbtable", "EMPLOYEE"); 

    DataFrameReader dataFrameReader = sqlContext.read().format("jdbc") 
      .options(options); 

    DataFrame dataFrameFirstTable = dataFrameReader.load(); 

    options.put("dbtable", "DEPARTMENT"); 

    dataFrameReader = sqlContext.read().format("jdbc").options(options); 

    DataFrame dataFrameSecondTable = dataFrameReader.load(); 

    //JOIN. IT WORKS JUST FINE!!! 

    DataFrame resultingDataFrame = dataFrameFirstTable.join(dataFrameSecondTable, 
      "DEPARTMENTID"); 


    //FILTERS. THEY DO NOT THROW ERROR, BUT ARE NOT APPLIED. RESULTS ARE ALWAYS THE SAME, WITHOUT FILTERS 
    resultingDataFrame.where(resultingDataFrame.col("AGE").geq(25)); 
    resultingDataFrame.where(resultingDataFrame.col("AGE").leq(50)); 

    JavaRDD<Row> resultFromQuery = resultingDataFrame.toJavaRDD(); 

    //HERE I CONFIRM THAT THE NUMBER OF ROWS GOTTEN IS ALWAYS THE SAME, SO THE FILTERS DO NOT WORK. 
    System.out.println("Number of rows "+resultFromQuery.count()); 

    return resultFromQuery; 

}

ответ

4

where возвращает новый dataframe и не изменяет существующий, так что вам нужно сохранить вывод:

DataFrame greaterThan25 = resultingDataFrame.where(resultingDataFrame.col("AGE").geq(25)); 
DataFrame lessThanGreaterThan = greaterThan25.where(resultingDataFrame.col("AGE").leq(50)); 
JavaRDD<Row> resultFromQuery = lessThanGreaterThan.toJavaRDD(); 

Или вы можете просто приковать его:

DataFrame resultingDataFrame = dataFrameFirstTable.join(dataFrameSecondTable, "DEPARTMENTID") 
    .where(resultingDataFrame.col("AGE").geq(25)) 
    .where(resultingDataFrame.col("AGE").leq(50)); 
+0

Джастин, большое спасибо за ваш ответ! Он работал для диапазонов AGE. Однако, когда я использую .groupBy и пытаюсь сделать то же самое, что и результат в DataFrame, он говорит, что полученные данные должны храниться в объекте GroupedData, который отлично работает, когда я это делаю. Проблема в том, что я не знаю, как итерировать такой объект или как его преобразовать в JavaRDD , что и должен вернуть метод в конце. У вас или у кого-нибудь еще есть идея о том, как достичь этого? Я действительно благодарю вас за ваш интерес и помощь. –

+0

Я присоединяю код, который я использую, чтобы сгруппировать данные: 'GroupedData resultGroupData = lessThanGreaterThan.groupBy (resultDataFrame.col (" DEPARTMENT_NAME ")); // Я применяю команду groupBy ' System.out.println ("Number полученных данных: «+ resultGroupData .count(). count()); // здесь я подтверждаю, что он получает 5 групп, которые представляют собой количество отделов, которые существуют в БД –

+0

. У вас есть доступные методы найденных в документах. https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.sql.GroupedData Вероятно, вы хотите использовать agg. Или вы можете перейти к RDD и использовать THAT groupBy, который возвращает RDD [(K, Iterable)] –

1
people.select("person_id", "first_name").filter(people("person_id") == 2).show 

Это не будет работать, и вы будете получать следующее сообщение об ошибке:

Error: overloaded method value filter with alternatives: (conditionExpr: String)org.apache.spark.sql.DataFrame (condition: org.apache.spark.sql.Column) org.apache.spark.sql.DataFrame cannot be applied to (Boolean)

кажется, что работать с Select пунктами в Спарк dataframe вместе с фильтром, мы не можем передать Boolean.

Эти два запроса используются для выбора одной строки из Spark DataFrame с двумя различными предложениями, где и фильтром.

people.select("person_id", "first_name").filter(people("person_id") === 2).show 

people.select("person_id", "first_name").where(people("person_id") === 2).show 

Используйте один из указанных выше запросов, чтобы выбрать одну строку из Spark DataFrame.

Смежные вопросы