Я новичок на Спарк (Моя версия 1.6.0), и теперь я пытаюсь решить эту проблему ниже:Как выполнить операцию «Поиск» на Спарк dataframes дал несколько условий
Предположим, что являются двумя исходными файлами:
- Первый (A для краткости) - большой, который содержит столбцы с именами A1, B1, C1 и другие 80 столбцов. Есть 230K записей внутри.
- Второй (коротко) (B для краткости) представляет собой небольшую таблицу поиска, которая содержит столбцы с именами A2, B2, C2 и D2. Есть 250 записей внутри.
Теперь нужно вставить новый столбец в А, учитывая логик ниже:
- первого поиск A1, B1 и C1-В (соответствующих столбцах A2, B2 и C2), если успешный , верните D2 в качестве значения нового добавленного столбца. Если ничего не найдено ...
- Затем найдите A1, B1 в B. В случае успеха верните D2. Если ничего не найдено ...
- Устанавливает значение по умолчанию «NA»
Я уже прочитал в файлах и превращали их в кадры данных. Для первой ситуации я получил результат, когда левое внешнее соединение их соединяло. Но я не могу найти хороший способ на следующем шаге.
В настоящее время я пытаюсь создать новый кадр данных, присоединив A и B, используя менее строгие условия. Однако я не знаю, как обновить текущий кадр данных с другого. Или есть ли еще более интуитивный и эффективный способ решения всей проблемы?
Спасибо за все ответы.
----------------------------- Обновление от 20160309 -------------- ------------------
Наконец-то принят ответ @mlk. Еще большое спасибо @ zero323 за его замечательные комментарии относительно UDF и join, поколение кода вольфрама - это еще одна проблема, с которой мы сталкиваемся сейчас. Но так как нам нужно сделать множество поиска и средние 4 условия для каждого поиска, бывший решение является более подходящим ...
Окончательное решение как-то выглядит, как показано ниже фрагмент кода:
```
import sqlContext.implicits._
import com.github.marklister.collections.io._
case class TableType(A: String, B: String, C: String, D: String)
val tableBroadcast = sparkContext.broadcast(CsvParser(TableType).parseFile("..."))
val lkupD = udf {
(aStr: String, bStr: String, cStr: String) =>
tableBroadcast.value.find {
case TableType(a, b, c, _) =>
(a == aStr && b == bStr && c == cStr) ||
(a == aStr && b == bStr)
}.getOrElse(TableType("", "", "", "NA")).D
}
df = df.withColumn("NEW_COL", lkupD($"A", $"B", $"C"))
```
Это, вероятно, путь. Я также предложил альтернативное решение с 'joins'. – zero323
Спасибо mlk. Если таблица поиска большая (500K * 50), хорошо ли ее транслировать? –
И еще мой вопрос: предположим, мне нужно выполнить 30 поисков по разным столбцам и написать 50 UDF, повлияет ли производительность? –