У меня есть фрагмент Java, который считывает записи из удаленной базы данных Oracle (по крайней мере, 65 тыс. Записей). По сути, мы пытаемся передать почасовой фильтр в dataframe для извлечения записей на часовом разделе x 24.Spark DataFrame - Last Partition Collect Slow
Исходный код основан на таблице с миллионами записей.
Проблема, с которой мы сталкиваемся, заключается в том, что Spark (на YARN или как кластер SPARK) обрабатывает 22 из 24 разделов менее чем за 3 минуты. Последние 2 раздела занимают более 5 часов.
Есть ли способ ускорить это с помощью DataFrames?
HashMap<String, String> options = new HashMap<>();
sqlContext.setConf("spark.sql.shuffle.partition", "50");
options.put("dbtable", "(select * from "+VIEW_NAME+" where 1=1)");
options.put("driver", "oracle.jdbc.OracleDriver");
options.put("url", JDBC_URL);
options.put("partitionColumn", "hrs");
options.put("lowerBound", "00");
options.put("upperBound", "23");
options.put("numPartitions", "24");
DataFrame dk = sqlContext.load("jdbc", options).cache();
dk.registerTempTable(VIEW_NAME);
dk.printSchema();
DateTime dt = new DateTime(2015, 5, 8, 10, 0, 0);
String s = SQL_DATE_FORMATTER.print(dt);
dt = dt.plusHours(24);
String t = SQL_DATE_FORMATTER.print(dt);
System.out.println("S is " + s + "and t is "+ t);
Stream<Row> rows = dk.filter("DATETIME >= '" + s + "' and DATETIME <= '" + t + "'").collectAsList().parallelStream();
System.out.println("Collected" + rows.count());
Любые обновления на этом? Вы когда-нибудь нашли исправление? – zengr
Нет. Нет обновлений, но я нашел что-то, Он выполняет 00 временный раздел до 23-секундного раздела, а затем делает один раздел M/R на все время (00-23), следовательно, он не работает. – RvK
Как работа, мы должны изменить dt.plusHours (24) .minusSecond (1) – RvK