2016-11-28 1 views
3

У меня есть работа ETL в Spark, которая также подключается к MySQL, чтобы захватить некоторые данные. Исторически сложилось так, что я делал это следующим образом:Spark ETL job выполнить mysql только один раз

hiveContext.read().jdbc(
    dbProperties.getProperty("myDbInfo"), 
    "(SELECT id, name FROM users) r", 
    new Properties()).registerTempTable("tmp_users"); 

Row[] res = hiveContext.sql("SELECT " 
    + " u.name, " 
    + " SUM(s.revenue) AS revenue " 
    + "FROM " 
    + " stats s " 
    + " INNER JOIN tmp_users u " 
    + "  ON u.id = s.user_id 
    + "GROUP BY " 
    + " u.name " 
    + "ORDER BY " 
    + " revenue DESC 
    + "LIMIT 10").collect(); 

String ids = ""; 
// now grab me some info for users that are in tmp_user_stats 
for (i = 0; i < res.length; i++) { 
    s += (!s.equals("") ? "," : "") + res[i](0); 
} 

hiveContext.jdbc(
dbProperties.getProperty("myDbInfo"), 
"(SELECT name, surname, home_address FROM users WHERE id IN ("+ids+")) r", 
new Properties()).registerTempTable("tmp_users_prises"); 

Однако при масштабировании это несколько узлов рабочих, всякий раз, когда я использую tmp_users таблицу, он выполняет запрос и он запускается на выполнение (по крайней мере) один раз в узел , который сводится к тому, что наш администратор db работает с офисами с ножом.

Каков наилучший способ справиться с этим? Могу ли я запустить задание на 3 машинах, ограничив их тремя запросами, а затем записать данные в Hadoop для других узлов, чтобы использовать его или что?

По существу - как предложено в комментариях - я мог бы выполнить запрос за пределами задания ETL, который может подготовить данные со стороны MySQL и импортировать их в Hadoop. Тем не менее, могут быть последующие запросы, которые предлагают решение более в строке с установкой соединения Spark и JDBC.

Я приму решение Sqoop, поскольку оно по крайней мере дает более рациональное решение, хотя я все еще не уверен, что он выполнит эту работу. Если я что-то найду, я снова отредактирую вопрос.

ответ

1

Вы можете кэшировать данные:

val initialDF = hiveContext.read().jdbc(
    dbProperties.getProperty("myDbInfo"), 
    "(SELECT id, name FROM users) r", 
    new Properties()) 
initialDF.cache(); 
initialDF.registerTempTable("tmp_users"); 

После первого чтения, данные будут сохраняться в памяти

Альтернатива (это не больно DBA;)) является использование Sqoop с параметром --num-mappers=3, а затем импортировать файл результата в Spark

Смежные вопросы