2016-04-12 3 views
0

У меня есть два кадра данных, которые содержат значения для некоторых людей в двух разных временных меток. Возможные изменения для человека до и после перечислены в коде ниже.Как обнаружить изменения в строках из разных фреймов данных

val before = Seq(
(1, "soccer", "1", "2", "3", "4", ""), 
(2, "soccer", "", "", "", "", ""), 
(3, "soccer", "1", "", "", "", ""), 
(4, "soccer", "1", "", "", "", ""), 
(5, "soccer", "1", "", "", "", ""), 
(6, "soccer", "1", "", "", "", "") 
).toDF("id", "sport", "var1", "var2", "var3", "var4", "var5") 

before.show     //> +---+------+----+----+----+----+----+ 
           //| | id| sport|var1|var2|var3|var4|var5| 
           //| +---+------+----+----+----+----+----+ 
           //| | 1|soccer| 1| 2| 3| 4| | 
           //| | 2|soccer| | | | | | 
           //| | 3|soccer| 1| | | | | 
           //| | 4|soccer| 1| | | | | 
           //| | 5|soccer| 1| | | | | 
           //| | 6|soccer| 1| | | | | 
           //| +---+------+----+----+----+----+----+ 
           //| 

val after = Seq(
(1, "soccer", "1", "2", "3", "4", ""), // Same 
(2, "soccer", "1", "", "", "", ""), // Addition 
(3, "soccer", "1", "1", "", "", ""), // Addition 
(4, "soccer", "", "", "", "", ""), // Remove 
(5, "soccer", "2", "1", "", "", ""), // Slide 
(6, "soccer", "2", "", "", "", "") // Change 
).toDF("id", "sport", "var1", "var2", "var3", "var4", "var5") 

after.show     //> +---+------+----+----+----+----+----+ 
           //| | id| sport|var1|var2|var3|var4|var5| 
           //| +---+------+----+----+----+----+----+ 
           //| | 1|soccer| 1| 2| 3| 4| | 
           //| | 2|soccer| 1| | | | | 
           //| | 3|soccer| 1| 1| | | | 
           //| | 4|soccer| | | | | | 
           //| | 5|soccer| 2| 1| | | | 
           //| | 6|soccer| 2| | | | | 
           //| +---+------+----+----+----+----+----+ 
           //| 

Так что все может оставаться неизменным, может быть добавление или удаление, и, наконец, может произойти смена или слайд.

Мой идеальный выход является то, что противостоит каждую строку до и после того, как кадры данных, и прикрепить этикетку:

outcome.show     //> +---+------+------+ 
           //| | id| sport| diff| 
           //| +---+------+------+ 
           //| | 1|soccer| same| 
           //| | 2|soccer| add| 
           //| | 3|soccer| add| 
           //| | 4|soccer|remove| 
           //| | 5|soccer| slide| 
           //| | 6|soccer|change| 
           //| +---+------+------+ 
           //| 

Этот вопрос связан с this один, но дело было просто посчитать, сколько различий было между двумя рядами ... На этот раз я пытаюсь понять эти различия с более тонким зерном, но я застрял в определении различных возможных вариантов.

EDIT

Поскольку я использую DataFrame, я хотел бы придерживаться этой структуры, а не классов прецедентов. Поэтому я пытаюсь адаптировать то, что было предложено @iboss, используя DataFrame.

У меня есть эта UDF, которая должна сделать всю работу:

val diff = udf { (bef:DataFrame, aft:DataFrame) => { 
    "hello" // return just this string for now 
    } : String 
} 

Это UDF будет делать всю работу, как это было предложено @iboss, чтобы произвести вывод в outcome.show, поэтому возможный результат после совпадение двух строк будет строкой, точнее одной из «тех же», «добавить», «удалить», «слайд» или «изменить».

У меня есть то этот код, чтобы объединить два кадра данных и создать новый столбец:

val mydiff = before.join(after, "id") 
    .withColumn("diff", diff(before, after)) 
    .select("id", "diff") 

Однако, у меня есть сообщение об ошибке при вызове диф, говорящее как это:

type mismatch; found : org.apache.spark.sql.DataFrame required: org.apache.spark.sql.Column 

Что Я не понимаю, почему это не нравится DataFrame и как его решить ...

ответ

0

Я не совсем уверен, что это за те вары, но если бы я был вами, я бы сгруппировал их в кортеж или класс класса, который легче для дальнейшего процесса. Это может выглядеть следующим образом:

val before = Seq(
    (1, "soccer", ("1", "2", "3", "4", "")), 
    (2, "soccer", ("", "", "", "", "")), 
    (3, "soccer", ("1", "", "", "", "")), 
    (4, "soccer", ("1", "", "", "", "")), 
    (5, "soccer", ("1", "", "", "", "")), 
    (6, "soccer", ("1", "", "", "", "")) 
).toDF("id", "sport", "vars") 


val after = Seq(
    (1, "soccer", ("1", "2", "3", "4", "")), 
    (2, "soccer", ("1", "", "", "", "")), 
    (3, "soccer", ("1", "1", "", "", "")), 
    (4, "soccer", ("", "", "", "", "")), 
    (5, "soccer", ("2", "1", "", "", "")), 
    (6, "soccer", ("2", "", "", "", "")) 
).toDF("id", "sport", "vars") 

Тогда вы могли бы использовать определенную пользователем функцию, чтобы вычислить ваш дифференциал

type MyVars = (String, String, String, String, String) 

val diff = udf { (before_vars: MyVars, after_vars: MyVars) => 
    // your implementation of diff function 
} 

before 
    .join(after) 
    .withColumn("diff", diff(before("vars"), after("vars"))) 
    .select("id", "sport", "diff") 

Редактировать

Для UDF, как правило, они делают вывод типа для вы можете не указывать свой тип.Но если вы хотите, чтобы определить его, то вы можете сделать это таким образом

udf { (firstName: String, lastName: String) => s"$firstName $lastName": String } 

или с блоком

udf { (name: String) => { 
    val hello = "hello " 
    "hello, " + name 
}: Int } 

и вы также можете использовать def

def getFullName(firstName: String, lastName: String): String = 
    s"$firstName $lastName" 

udf(getFullName _) 

поскольку использование def является не определяя функцию, а метод и udf требуют funcstion. Поэтому нам нужно преобразовать его с помощью частичной заявки на приложение.

Для более подробной информации, вы можете посмотреть на этой Difference between method and function in Scala

Edit 2

Кажется, я понял ваш вопрос немного. diff udf должен применяться к каждой строке отдельно. Таким образом, вы не можете передать весь DataFrame.

Я предлагаю вам сгруппировать эти вары (в каждой строке) в кортеж только потому, что его легче читать. Но если вы все же хотите использовать оригинальную форму, то вы можете сделать это

val diff = udf { (
    beforeVar1: String, 
    beforeVar2: String, 
    beforeVar3: String, 
    beforeVar4: String, 
    beforeVar5: String, 
    afterVar1: String, 
    afterVar2: String, 
    afterVar3: String, 
    afterVar4: String, 
    afterVar5: String 
) => { 
    "hello" // return just this string for now 
    } : String 
} 

before.join(after, "id") 
    .withColumn("diff", diff(
    before("var1"), 
    before("var2"), 
    before("var3"), 
    before("var4"), 
    before("var5"), 
    after("var1"), 
    after("var2"), 
    after("var3"), 
    after("var4"), 
    after("var5"), 
)) 
    .select("id", "diff") 
+0

этих ВАР приходят из двух таблиц MySQL, но мне нравится подход вы предлагаете ... – user299791

+0

я не понимаю, как вернуть Колонка из UDF еще ... у вас есть намек? – user299791

+0

эта строка '.withColumn (" diff ", diff (before (" vars "), after (" vars ")))' определить имя столбца результата "diff" из функции 'diff' – iboss

Смежные вопросы