Цель Спарк понимания ...загрузка больших объемов данных из MySQL в Спарк
Я загрузка больших объемов данных из MySQL в Искре, и он продолжает умирать :-(
org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:156)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
Вот мой код
val query =
s"""
(
select
mod(act.AccountID, ${parts}) part,
p.Value name, event.EventTime eventTime, act.AccountID accountID, act.UserGoal goalID,event.ActivityID activityID, id.CountryID countryID, arr.ConsumerID consumerID
from DimIdentity as id
join FactArrival as arr on arr.IdentityID=id.IdentityID
join FactActivityEvent as event on event.ArrivalID=arr.ArrivalID
join DimAccount as act on act.AccountID=event.AccountID
join DimAccountRoleTypeMatch as role on role.AccountID=act.AccountID
join DimDateTime as d on event.DateTimeID=d.DateTimeID
join DimProperty as p on p.PropertyID=event.EventTypeID
where
id.Botness=0 and
d.DayOfYear>=${from} and d.DayOfYear<${to} and d.Year=${year} and
(role.AccountRoleTypeID=1 or role.AccountRoleTypeID=2)
) a
""".stripMargin
val events = sqlContext.read.format("jdbc").
option("url", sqlURL).
option("driver", "com.mysql.jdbc.Driver").
option("useUnicode", "true").
option("zeroDateTimeBehavior", "round").
option("continueBatchOnError", "true").
option("useSSL", "false").
option("dbtable", query).
option("user", sqlUser).
option("password", sqlPassword).
option("partitionColumn", "part").
option("lowerBound", "0").
option("upperBound", s"${parts - 1}").
option("numPartitions", s"${parts}").
load().as[Activity].toDF
Обратите внимание, что я использую partitionColumn, LowerBound, UpperBound, numPartitions, как рекомендовано в других ответах
Я пробовал устанавливать разделы от 4 до 512, но он всегда умирает. Чтение того же объема данных из файла или Mongo не вызывает проблем. Это проблема с соединителем MySQL? Есть ли решение?
Обратите внимание, что я нашел один ответ, который предполагает избежать искру, и прочитать запрос в файл на HDFS, а затем загрузить файл
Multiple Partitions in Spark RDD
Является ли это действительно лучший способ?
Извините, не понимал, что должен был это сделать. Я исправлю – user1902291
Если решение работает для вас, вы должны. Это и награда для плаката, и знак для другого пользователя, что это действительное решение. –