Позвольте мне вначале сообщить всем вам, что я очень новичок в Spark.Вызов параллельного метода в искровом режиме и использование искрового сеанса в переданном методе
мне нужно обработать огромное количество записей в таблице, и когда он группируется по электронной почте она составляет около 1 million.I необходимо выполнить несколько логических вычислений на основе данных, установленных против индивидуальной электронной почты и обновить базу данных на основе логического расчета
Примерно моя структура кода, как
первоначальной загрузки данных ...
импорт sparkSessio n.implicits._ вар tableData = sparkSession.read.jdbc (, connectionProperties) .select ("электронная почта"). где()
// Кадр данных с записями с группировкой по электронной почте рассчитывать больше, чем один
вар recordsGroupedBy = tableData.groupBy ("электронная почта"). COUNT(). withColumnRenamed ("считать", "RecordCount"). фильтр ("RecordCount> 1") .toDF()
Сейчас идет обработка после группировки по электронной почте с использованием метода processDataAgainstEmail()
recordsGroupedBy.collect(). Еогеасп (х => processDataAgainstEmail (x.getAs ("электронная почта"), sparkSession))
Здесь я вижу Еогеасп не выполняется параллельно .I необходимость вызова метода processDataAgainstEmail (,) в параллели. Но если я пытаюсь распараллеливание, делая
Привет я могу получить список, вызвав
вал = dataFrameWithGroupedByMultipleRecords.select адресов электронной ("электронная почта"). Rdd.map (г => г (0). asInstanceOf [String]). собирать(). ToList
уаг РДД = sc.parallelize (адресов электронной)
rdd.foreach (х => processDataAgainstEmail (x.getAs ("электронная почта"), sparkSession))
Это не поддерживается, поскольку я не могу передать sparkSession при использовании функции параллелизации.
Может ли кто-нибудь помочь мне в этом, так как в processDataAgainstEmail (,) будет выполняться несколько операций, связанных с вставкой и обновлением базы данных, а также должны быть выполнены искровые данные и операции искровой sql.
Чтобы summerize мне нужно вызвать processDataAgainstEmail (параллельно,) с sparksession
В случае не все можно пройти искровой сессию, метод не будет в состоянии выполнить что-либо на базе .Я не уверен, что будет альтернативным способом, поскольку параллелизм в электронной почте должен быть для моего сценария.
На самом деле я хочу работать параллельно с результатом, который является списком адресов электронной почты, используя метод ** processDataAgainstEmail() **, который будет иметь электронную почту и искровую сессию в качестве двух параметров. Возможно ли это? или любым другим способом? – Soumen