Использование Спарк SQL, я два dataframes, они созданы из одного, таких как:DataFrame фильтрация на основе второго Dataframe
df = sqlContext.createDataFrame(...);
df1 = df.filter("value = 'abc'"); //[path, value]
df2 = df.filter("value = 'qwe'"); //[path, value]
Я хочу, чтобы фильтровать DF1, если часть его «пути» является любой путь в df2. Итак, если df1 имеет строку с контуром 'a/b/c/d/e', я бы выяснил, является ли в df2 строка, путь которой равен 'a/b/c'. В SQL она должна быть как
SELECT * FROM df1 WHERE udf(path) IN (SELECT path FROM df2)
где ОДС определяется пользователем функция, которая сокращает исходный путь от df1. Наивное решение состоит в том, чтобы использовать JOIN, а затем фильтровать результат, но он медленный, так как df1 и df2 имеют более чем 10 мил строк.
Я также попытался следующий код, но во-первых, я должен был создать переменную широковещательный из df2
static Broadcast<DataFrame> bdf;
bdf = sc.broadcast(df2); //variable 'sc' is JavaSparkContext
sqlContext.createDataFrame(df1.javaRDD().filter(
new Function<Row, Boolean>(){
@Override
public Boolean call(Row row) throws Exception {
String foo = shortenPath(row.getString(0));
return bdf.value().filter("path = '"+foo+"'").count()>0;
}
}
), myClass.class)
проблема у меня в том, что Спарк застрял, когда возвращение было оценено/при фильтрации из df2 была выполнена ,
Я хотел бы знать, как работать с двумя файлами данных для этого. Я действительно хочу избежать ПРИСОЕДИНЕНИЯ. Есть идеи?
EDIT >>
В моем оригинальном коде df1 имеет псевдоним 'первый' и df2 'второй'. Это соединение не является декартивным, и оно также не использует широковещательную передачу.
df1 = df1.as("first");
df2 = df2.as("second");
df1.join(df2, df1.col("first.path").
lt(df2.col("second.path"))
, "left_outer").
filter("isPrefix(first.path, second.path)").
na().drop("any");
isPrefix является UDF
UDF2 isPrefix = new UDF2<String, String, Boolean>() {
@Override
public Boolean call(String p, String s) throws Exception {
//return true if (p.length()+4==s.length()) and s.contains(p)
}};
shortenPath - это сокращение двух последних символов в пути
UDF1 shortenPath = new UDF1<String, String>() {
@Override
public String call(String s) throws Exception {
String[] foo = s.split("/");
String result = "";
for (int i = 0; i < foo.length-2; i++) {
result += foo[i];
if(i<foo.length-3) result+="/";
}
return result;
}
};
Пример записей. Путь уникален.
a/a/a/b/c abc
a/a/a qwe
a/b/c/d/e abc
a/b/c qwe
a/b/b/k foo
a/b/f/a bar
...
Так DF1 consits из
a/a/a/b/c abc
a/b/c/d/e abc
...
и DF2 consits из
Связано с: [Как мы можем подключиться к двум фреймворкам Spark SQL с использованием критерия «LIKE» SQL-esque?] (Http://stackoverflow.com/a/33190103/1560062) – zero323
Вопрос был отредактирован. Btw UNION имеет смысл и для меня. Но Spark не поддерживает вложенный запрос типа «SELECT path FROM blabla WHERE value LIKE« abc »И parent (path) IN (SELECT path FROM blabla WHERE значение LIKE 'qwe')". Он также не поддерживается с помощью DataFrame api. –
Вы пробовали [Фильтр] (http://www.tutorialspoint.com/design_pattern/filter_pattern.htm)? Конечно, вам нужно будет адаптировать данный пример, но я думаю, что это может быть ответ –