6

Я новичок на Спарк (Моя версия 1.6.0), и теперь я пытаюсь решить эту проблему ниже:Как выполнить операцию «Поиск» на Спарк dataframes дал несколько условий

Предположим, что являются двумя исходными файлами:

  • Первый (A для краткости) - большой, который содержит столбцы с именами A1, B1, C1 и другие 80 столбцов. Есть 230K записей внутри.
  • Второй (коротко) (B для краткости) представляет собой небольшую таблицу поиска, которая содержит столбцы с именами A2, B2, C2 и D2. Есть 250 записей внутри.

Теперь нужно вставить новый столбец в А, учитывая логик ниже:

  • первого поиск A1, B1 и C1-В (соответствующих столбцах A2, B2 и C2), если успешный , верните D2 в качестве значения нового добавленного столбца. Если ничего не найдено ...
  • Затем найдите A1, B1 в B. В случае успеха верните D2. Если ничего не найдено ...
  • Устанавливает значение по умолчанию «NA»

Я уже прочитал в файлах и превращали их в кадры данных. Для первой ситуации я получил результат, когда левое внешнее соединение их соединяло. Но я не могу найти хороший способ на следующем шаге.

В настоящее время я пытаюсь создать новый кадр данных, присоединив A и B, используя менее строгие условия. Однако я не знаю, как обновить текущий кадр данных с другого. Или есть ли еще более интуитивный и эффективный способ решения всей проблемы?

Спасибо за все ответы.

----------------------------- Обновление от 20160309 -------------- ------------------

Наконец-то принят ответ @mlk. Еще большое спасибо @ zero323 за его замечательные комментарии относительно UDF и join, поколение кода вольфрама - это еще одна проблема, с которой мы сталкиваемся сейчас. Но так как нам нужно сделать множество поиска и средние 4 условия для каждого поиска, бывший решение является более подходящим ...

Окончательное решение как-то выглядит, как показано ниже фрагмент кода:

``` 
import sqlContext.implicits._ 
import com.github.marklister.collections.io._ 

case class TableType(A: String, B: String, C: String, D: String) 
val tableBroadcast = sparkContext.broadcast(CsvParser(TableType).parseFile("...")) 
val lkupD = udf { 
    (aStr: String, bStr: String, cStr: String) => 
    tableBroadcast.value.find { 
     case TableType(a, b, c, _) => 
     (a == aStr && b == bStr && c == cStr) || 
     (a == aStr && b == bStr) 
    }.getOrElse(TableType("", "", "", "NA")).D 
} 
df = df.withColumn("NEW_COL", lkupD($"A", $"B", $"C")) 
``` 

ответ

4

В Б small Я думаю, что лучший способ сделать это - это широковещательная переменная и пользовательская функция.

// However you get the data... 
case class BType(A2: Int, B2: Int, C2 : Int, D2 : String) 
val B = Seq(BType(1,1,1,"B111"), BType(1,1,2, "B112"), BType(2,0,0, "B200")) 

val A = sc.parallelize(Seq((1,1,1, "DATA"), (1,1,2, "DATA"), (2, 0, 0, "DATA"), (2, 0, 1, "NONE"), (3, 0, 0, "NONE"))).toDF("A1", "B1", "C1", "OTHER") 


// Broadcast B so all nodes have a copy of it. 
val Bbradcast = sc.broadcast(B) 

// A user defined function to find the value for D2. This I'm sure could be improved by whacking it into maps. But this is a small example. 
val findD = udf {(a: Int, b : Int, c: Int) => Bbradcast.value.find(x => x.A2 == a && x.B2 == b && x.C2 == c).getOrElse(Bbradcast.value.find(x => x.A2 == a && x.B2 == b).getOrElse(BType(0,0,0,"NA"))).D2 } 

// Use the UDF in a select 
A.select($"A1", $"B1", $"C1", $"OTHER", findD($"A1", $"B1", $"C1").as("D")).show 
+1

Это, вероятно, путь. Я также предложил альтернативное решение с 'joins'. – zero323

+0

Спасибо mlk. Если таблица поиска большая (500K * 50), хорошо ли ее транслировать? –

+0

И еще мой вопрос: предположим, мне нужно выполнить 30 поисков по разным столбцам и написать 50 UDF, повлияет ли производительность? –

2

Просто для справки решение без UDF,:

val b1 = broadcast(b.toDF("A2_1", "B2_1", "C2_1", "D_1")) 
val b2 = broadcast(b.toDF("A2_2", "B2_2", "C2_2", "D_2")) 

// Match A, B and C 
val expr1 = ($"A1" === $"A2_1") && ($"B1" === $"B2_1") && ($"C1" === $"C2_1") 
// Match A and B mismatch C 
val expr2 = ($"A1" === $"A2_2") && ($"B1" === $"B2_2") && ($"C1" !== $"C2_2") 

val toDrop = b1.columns ++ b2.columns 

toDrop.foldLeft(a 
    .join(b1, expr1, "leftouter") 
    .join(b2, expr2, "leftouter") 
    // If there is match on A, B, C then D_1 should be not NULL 
    // otherwise we fall-back to D_2 
    .withColumn("D", coalesce($"D_1", $"D_2")) 
)((df, c) => df.drop(c)) 

Это предполагает, что существует не более одного совпадения в каждой категории (все три колонки, или первые два) или дублирующие строки в выводе являются желательно.

UDF против РЕГИСТРИРУЙТЕСЬ:

Есть несколько факторов, чтобы рассмотреть и нет простого ответа здесь:

Против:

  • широковещательный joins требуют передачи данных в два раза в рабочих узлов. На данный момент таблицы broadcasted не кэшируются (SPARK-3863), и вряд ли это изменится в ближайшее время (разрешение: позже).
  • join операция применяется дважды, даже если имеется полное совпадение.

Pros:

  • joincoalesce и прозрачны для оптимизатора, а UDFs нет.
  • , работающий непосредственно с выражениями SQL, может извлечь выгоду из всех оптимизаций вольфрама, включая генерацию кода, в то время как UDF не может.
Смежные вопросы