2017-02-23 11 views
0

Позвольте мне вначале сообщить всем вам, что я очень новичок в Spark.Вызов параллельного метода в искровом режиме и использование искрового сеанса в переданном методе

мне нужно обработать огромное количество записей в таблице, и когда он группируется по электронной почте она составляет около 1 million.I необходимо выполнить несколько логических вычислений на основе данных, установленных против индивидуальной электронной почты и обновить базу данных на основе логического расчета

Примерно моя структура кода, как

первоначальной загрузки данных ...

импорт sparkSessio n.implicits._ вар tableData = sparkSession.read.jdbc (, connectionProperties) .select ("электронная почта"). где()

// Кадр данных с записями с группировкой по электронной почте рассчитывать больше, чем один

вар recordsGroupedBy = tableData.groupBy ("электронная почта"). COUNT(). withColumnRenamed ("считать", "RecordCount"). фильтр ("RecordCount> 1") .toDF()

Сейчас идет обработка после группировки по электронной почте с использованием метода processDataAgainstEmail()

recordsGroupedBy.collect(). Еогеасп (х => processDataAgainstEmail (x.getAs ("электронная почта"), sparkSession))

Здесь я вижу Еогеасп не выполняется параллельно .I необходимость вызова метода processDataAgainstEmail (,) в параллели. Но если я пытаюсь распараллеливание, делая

Привет я могу получить список, вызвав

вал = dataFrameWithGroupedByMultipleRecords.select адресов электронной ("электронная почта"). Rdd.map (г => г (0). asInstanceOf [String]). собирать(). ToList

уаг РДД = sc.parallelize (адресов электронной)

rdd.foreach (х => processDataAgainstEmail (x.getAs ("электронная почта"), sparkSession))

Это не поддерживается, поскольку я не могу передать sparkSession при использовании функции параллелизации.

Может ли кто-нибудь помочь мне в этом, так как в processDataAgainstEmail (,) будет выполняться несколько операций, связанных с вставкой и обновлением базы данных, а также должны быть выполнены искровые данные и операции искровой sql.

Чтобы summerize мне нужно вызвать processDataAgainstEmail (параллельно,) с sparksession

В случае не все можно пройти искровой сессию, метод не будет в состоянии выполнить что-либо на базе .Я не уверен, что будет альтернативным способом, поскольку параллелизм в электронной почте должен быть для моего сценария.

ответ

0

Метод forEach - это метод, который последовательно действует для каждого элемента списка, поэтому вы действуете на него по одному и передаете его методу processDataAgainstEmail.

После того, как вы получили результирующий список, вы затем вызвать sc.parallelize на распараллеливание создания dataframe из списка записей, созданных/манипулирует в предыдущем шаге. Распараллеливание, как я вижу в pySpark, является свойством создания фрейма данных, а не результатом результата любой операции.

+0

На самом деле я хочу работать параллельно с результатом, который является списком адресов электронной почты, используя метод ** processDataAgainstEmail() **, который будет иметь электронную почту и искровую сессию в качестве двух параметров. Возможно ли это? или любым другим способом? – Soumen

Смежные вопросы