3

Использование Спарк SQL, я два dataframes, они созданы из одного, таких как:DataFrame фильтрация на основе второго Dataframe

df = sqlContext.createDataFrame(...); 
df1 = df.filter("value = 'abc'"); //[path, value] 
df2 = df.filter("value = 'qwe'"); //[path, value] 

Я хочу, чтобы фильтровать DF1, если часть его «пути» является любой путь в df2. Итак, если df1 имеет строку с контуром 'a/b/c/d/e', я бы выяснил, является ли в df2 строка, путь которой равен 'a/b/c'. В SQL она должна быть как

SELECT * FROM df1 WHERE udf(path) IN (SELECT path FROM df2) 

где ОДС определяется пользователем функция, которая сокращает исходный путь от df1. Наивное решение состоит в том, чтобы использовать JOIN, а затем фильтровать результат, но он медленный, так как df1 и df2 имеют более чем 10 мил строк.

Я также попытался следующий код, но во-первых, я должен был создать переменную широковещательный из df2

static Broadcast<DataFrame> bdf; 
bdf = sc.broadcast(df2); //variable 'sc' is JavaSparkContext 

sqlContext.createDataFrame(df1.javaRDD().filter(
     new Function<Row, Boolean>(){ 
      @Override 
      public Boolean call(Row row) throws Exception { 
       String foo = shortenPath(row.getString(0)); 
       return bdf.value().filter("path = '"+foo+"'").count()>0; 
      } 
      } 
    ), myClass.class) 

проблема у меня в том, что Спарк застрял, когда возвращение было оценено/при фильтрации из df2 была выполнена ,

Я хотел бы знать, как работать с двумя файлами данных для этого. Я действительно хочу избежать ПРИСОЕДИНЕНИЯ. Есть идеи?


EDIT >>

В моем оригинальном коде df1 имеет псевдоним 'первый' и df2 'второй'. Это соединение не является декартивным, и оно также не использует широковещательную передачу.

df1 = df1.as("first"); 
df2 = df2.as("second"); 

    df1.join(df2, df1.col("first.path"). 
           lt(df2.col("second.path")) 
             , "left_outer"). 
        filter("isPrefix(first.path, second.path)"). 
        na().drop("any"); 

isPrefix является UDF

UDF2 isPrefix = new UDF2<String, String, Boolean>() { 
     @Override 
     public Boolean call(String p, String s) throws Exception { 
      //return true if (p.length()+4==s.length()) and s.contains(p) 
     }}; 

shortenPath - это сокращение двух последних символов в пути

UDF1 shortenPath = new UDF1<String, String>() { 
     @Override 
     public String call(String s) throws Exception { 
      String[] foo = s.split("/"); 
      String result = ""; 
      for (int i = 0; i < foo.length-2; i++) { 
       result += foo[i]; 
       if(i<foo.length-3) result+="/"; 
      } 
      return result; 
     } 
    }; 

Пример записей. Путь уникален.

a/a/a/b/c abc 
a/a/a  qwe 
a/b/c/d/e abc 
a/b/c  qwe 
a/b/b/k foo 
a/b/f/a bar 
... 

Так DF1 consits из

a/a/a/b/c abc 
a/b/c/d/e abc 
... 

и DF2 consits из

+1

Связано с: [Как мы можем подключиться к двум фреймворкам Spark SQL с использованием критерия «LIKE» SQL-esque?] (Http://stackoverflow.com/a/33190103/1560062) – zero323

+0

Вопрос был отредактирован. Btw UNION имеет смысл и для меня. Но Spark не поддерживает вложенный запрос типа «SELECT path FROM blabla WHERE value LIKE« abc »И parent (path) IN (SELECT path FROM blabla WHERE значение LIKE 'qwe')". Он также не поддерживается с помощью DataFrame api. –

+0

Вы пробовали [Фильтр] (http://www.tutorialspoint.com/design_pattern/filter_pattern.htm)? Конечно, вам нужно будет адаптировать данный пример, но я думаю, что это может быть ответ –

ответ

1

Там, по крайней мере несколько проблем с кодом:

  • вы не можете выполнить действие или преобразования внутри другого действия или преобразования. Это означает, что фильтрация широковещательных сообщений DataFrame просто не может работать, и вы должны получить исключение.
  • join вы используете как декартово произведение, за которым следует фильтр. Так как Spark использует Hashing для объединений, то только рационализированные объединения могут эффективно выполняться без декартовых. Он немного связан с Why using a UDF in a SQL query leads to cartesian product?
  • если оба DataFrames относительно большие и имеют аналогичный размер, то широковещательная передача вряд ли будет полезна.См. Why my BroadcastHashJoin is slower than ShuffledHashJoin in Spark
  • не важно, когда дело доходит до производительности, но isPrefix кажется неправильным. В частности, похоже, что он может соответствовать как префиксу, так и суффиксу
  • col("first.path").lt(col("second.path")) Состояние выглядит неправильно. Я предполагаю, что вы хотите a/a/a/b/c от df1 матч a/a/a от df2. Если это так, то должно быть gt не lt.

Вероятно, самое лучшее, что вы можете сделать что-то похожее на это:

import org.apache.spark.sql.functions.{col, regexp_extract} 

val df = sc.parallelize(Seq(
    ("a/a/a/b/c", "abc"), ("a/a/a","qwe"), 
    ("a/b/c/d/e", "abc"), ("a/b/c", "qwe"), 
    ("a/b/b/k", "foo"), ("a/b/f/a", "bar") 
)).toDF("path", "value") 

val df1 = df 
    .where(col("value") === "abc")  
    .withColumn("path_short", regexp_extract(col("path"), "^(.*)(/.){2}$", 1)) 
    .as("df1") 

val df2 = df.where(col("value") === "qwe").as("df2") 
val joined = df1.join(df2, col("df1.path_short") === col("df2.path")) 

Вы можете попробовать транслировать одну из таблиц, как это (Спарк> = 1.5.0 только):

import org.apache.spark.sql.functions.broadcast 

df1.join(broadcast(df2), col("df1.path_short") === col("df2.path")) 

и увеличить пределы автоматического вещания, но, как я уже упоминал выше, он, скорее всего, будет менее эффективным, чем обычный HashJoin.

+0

Я также сталкиваюсь с декартовой проблемой продукта, проверяя неравномерность соединения dataframe, есть ли какое-либо решение или обходное решение для этого? – Shankar

1

В качестве возможного способа реализации IN с подзапроса, то LEFT SEMI JOIN может быть использован:

JavaSparkContext javaSparkContext = new JavaSparkContext("local", "testApp"); 
    SQLContext sqlContext = new SQLContext(javaSparkContext); 
    StructType schema = DataTypes.createStructType(new StructField[]{ 
      DataTypes.createStructField("path", DataTypes.StringType, false), 
      DataTypes.createStructField("value", DataTypes.StringType, false) 
    }); 
    // Prepare First DataFrame 
    List<Row> dataForFirstDF = new ArrayList<>(); 
    dataForFirstDF.add(RowFactory.create("a/a/a/b/c", "abc")); 
    dataForFirstDF.add(RowFactory.create("a/b/c/d/e", "abc")); 
    dataForFirstDF.add(RowFactory.create("x/y/z", "xyz")); 
    DataFrame df1 = sqlContext.createDataFrame(javaSparkContext.parallelize(dataForFirstDF), schema); 
    // 
    df1.show(); 
    // 
    // +---------+-----+ 
    // |  path|value| 
    // +---------+-----+ 
    // |a/a/a/b/c| abc| 
    // |a/b/c/d/e| abc| 
    // | x/y/z| xyz| 
    // +---------+-----+ 

    // Prepare Second DataFrame 
    List<Row> dataForSecondDF = new ArrayList<>(); 
    dataForSecondDF.add(RowFactory.create("a/a/a", "qwe")); 
    dataForSecondDF.add(RowFactory.create("a/b/c", "qwe")); 
    DataFrame df2 = sqlContext.createDataFrame(javaSparkContext.parallelize(dataForSecondDF), schema); 

    // Use left semi join to filter out df1 based on path in df2 
    Column pathContains = functions.column("firstDF.path").contains(functions.column("secondDF.path")); 
    DataFrame result = df1.as("firstDF").join(df2.as("secondDF"), pathContains, "leftsemi"); 

    // 
    result.show(); 
    // 
    // +---------+-----+ 
    // |  path|value| 
    // +---------+-----+ 
    // |a/a/a/b/c| abc| 
    // |a/b/c/d/e| abc| 
    // +---------+-----+ 

Физический план такого запроса будет выглядеть следующим образом:

== Physical Plan == 
Limit 21 
ConvertToSafe 
    LeftSemiJoinBNL Some(Contains(path#0, path#2)) 
    ConvertToUnsafe 
    Scan PhysicalRDD[path#0,value#1] 
    TungstenProject [path#2] 
    Scan PhysicalRDD[path#2,value#3] 

Он будет использовать LeftSemiJoinBNL для фактической операции соединения, которая должна транслировать значения внутренне. Более подробно см. Фактическую реализацию в искры - LeftSemiJoinBNL.scala

P.S. Я не совсем понял необходимость удаления двух последних символов, но если это необходимо - это можно сделать, например, @ zero323 (используя regexp_extract).

+1

Ницца. Я полностью забыл о полусоединении. Но проблема с 'contains' (или даже' startsWith' и 'endsWith') заключается в том, что она не может быть оптимизирована. Поэтому, если вы можете уменьшить проблему до проверки равенства, это будет столь же значительное повышение производительности ('LeftSemiJoinHash' и' LeftSemiJoinBNL'). – zero323

Смежные вопросы