2016-10-19 2 views
2

Цель Спарк понимания ...загрузка больших объемов данных из MySQL в Спарк

Я загрузка больших объемов данных из MySQL в Искре, и он продолжает умирать :-(

org.apache.spark.SparkException: Job aborted. 
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:156) 
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108) 
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108) 
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) 
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108) 

Вот мой код

val query = 
    s""" 
    (
     select 
     mod(act.AccountID, ${parts}) part, 
     p.Value name, event.EventTime eventTime, act.AccountID accountID, act.UserGoal goalID,event.ActivityID activityID, id.CountryID countryID, arr.ConsumerID consumerID 
     from DimIdentity as id 
     join FactArrival as arr on arr.IdentityID=id.IdentityID 
     join FactActivityEvent as event on event.ArrivalID=arr.ArrivalID 
     join DimAccount as act on act.AccountID=event.AccountID 
     join DimAccountRoleTypeMatch as role on role.AccountID=act.AccountID 
     join DimDateTime as d on event.DateTimeID=d.DateTimeID 
     join DimProperty as p on p.PropertyID=event.EventTypeID 
     where 
     id.Botness=0 and 
     d.DayOfYear>=${from} and d.DayOfYear<${to} and d.Year=${year} and 
     (role.AccountRoleTypeID=1 or role.AccountRoleTypeID=2) 
) a 
    """.stripMargin 

val events = sqlContext.read.format("jdbc"). 
    option("url", sqlURL). 
    option("driver", "com.mysql.jdbc.Driver"). 
    option("useUnicode", "true"). 
    option("zeroDateTimeBehavior", "round"). 
    option("continueBatchOnError", "true"). 
    option("useSSL", "false"). 
    option("dbtable", query). 
    option("user", sqlUser). 
    option("password", sqlPassword). 
    option("partitionColumn", "part"). 
    option("lowerBound", "0"). 
    option("upperBound", s"${parts - 1}"). 
    option("numPartitions", s"${parts}"). 
    load().as[Activity].toDF 

Обратите внимание, что я использую partitionColumn, LowerBound, UpperBound, numPartitions, как рекомендовано в других ответах

Я пробовал устанавливать разделы от 4 до 512, но он всегда умирает. Чтение того же объема данных из файла или Mongo не вызывает проблем. Это проблема с соединителем MySQL? Есть ли решение?

Обратите внимание, что я нашел один ответ, который предполагает избежать искру, и прочитать запрос в файл на HDFS, а затем загрузить файл

Multiple Partitions in Spark RDD

Является ли это действительно лучший способ?

+1

Извините, не понимал, что должен был это сделать. Я исправлю – user1902291

+0

Если решение работает для вас, вы должны. Это и награда для плаката, и знак для другого пользователя, что это действительное решение. –

ответ

1

Вот ответ, который я получил ...

Для меня ответ, чтобы избежать MySQL-соединения для Спарк :-(Я нашел, что это слишком сложно, чтобы избежать сокрушительных вызвано секционирования. Mysql- подключение требует ручной настройки разделов и не приводит к увеличению скорости. Гораздо проще написать код без искры, который считывает данные в большие текстовые файлы, а также вызов Spark в текстовом файле. Искра очень хороша для большинства данных источники, но не MySQL ... пока по крайней мере, не

+0

Благодарим вас за ответ. Я также сталкиваюсь с той же проблемой, что и вы пытаетесь получить большой объем данных из базы данных mysql (источник), а другая таблица назначения таблицы из улья для сравнения между двумя, но все, что я получил, это 'java.lang.OutOfMemoryError : Верхний предел GC превысил искру. Не могли бы вы дать мне подсказку о том, как писать код без искры, который считывает данные в большие текстовые файлы. – Vignesh

0

Вы можете попробовать увеличить размер выборки без использования динамического разделения для чтения.

sqlContext.read.options(options).jdbc(
url=sqlURL, table=query, columnName="part", 
fetchSize=1000000,connectionProperties=new java.util.Properties())