2016-03-10 6 views
3

В Hadoop объединение/объединение больших эквидистанционных наборов данных может быть выполнено без перестановки и уменьшения фазы, просто используя соединение на стороне карты с CompositeInputFormat.Слияние равнораспределенных фреймов данных в Spark

Пытаясь выяснить, чтобы сделать это в Спарк:

val x = sc.parallelize(Seq(("D", 1), ("C", 2), ("B", 3), ("A", 4))).toDF("k", "v") 
    .repartition(col("k")).cache() 
val y = sc.parallelize(Seq(("F", 5), ("E", 6), ("D", 7), ("C", 8))).toDF("k", "v") 
    .repartition(col("k")).cache() 

val xy = x.join(y, x.col("k") === y.col("k"), "outer") 

x.show() y.show() xy.show() 

+---+---+ +---+---+ +----+----+----+----+ 
| k| v| | k| v| | k| v| k| v| 
+---+---+ +---+---+ +----+----+----+----+ 
| A| 6| | C| 12| | A| 4|null|null| 
| B| 5| | D| 11| | B| 3|null|null| 
| C| 4| | E| 10| | C| 2| C| 8| 
| D| 3| | F| 9| | D| 1| D| 7| 
| E| 2| | G| 8| |null|null| E| 6| 
| F| 1| | H| 7| |null|null| F| 5| 
+---+---+ +---+---+ +----+----+----+----+ 

До сих пор так хорошо. Но когда я проверяю план выполнения, я вижу «ненужные» виды:

xy.explain 

== Physical Plan == 
SortMergeOuterJoin [k#1283], [k#1297], FullOuter, None 
:- Sort [k#1283 ASC], false, 0 
: +- InMemoryColumnarTableScan [k#1283,v#1284], InMemoryRelation [k#1283,v#1284], true, 10000, StorageLevel(true, true, false, true, 1), TungstenExchange hashpartitioning(k#1283,200), None, None 
+- Sort [k#1297 ASC], false, 0 
    +- InMemoryColumnarTableScan [k#1297,v#1298], InMemoryRelation [k#1297,v#1298], true, 10000, StorageLevel(true, true, false, true, 1), TungstenExchange hashpartitioning(k#1297,200), None, None 

Можно ли избежать разного рода здесь?

Редактировать

Для справки, Hadoop была эта "особенность" доступна с 2007 года: https://issues.apache.org/jira/browse/HADOOP-2085

Update

Как Lezzar отметил переделу() не является достаточным для добиться равнораспределенного упорядоченного состояния. Я думаю, что теперь он должен следовать sortWithinPartitions() Так что следует сделать трюк:

val x = sc.parallelize(Seq(("F", 1), ("E", 2), ("D", 3), ("C", 4), ("B", 5), ("A", 6))).toDF("k", "v") 
    .repartition(col("k")).sortWithinPartitions(col("k")).cache() 
val y = sc.parallelize(Seq(("H", 7), ("G", 8), ("F", 9), ("E",10), ("D",11), ("C",12))).toDF("k", "v") 
    .repartition(col("k")).sortWithinPartitions(col("k")).cache() 

xy.explain()

== Physical Plan == 
SortMergeOuterJoin [k#1055], [k#1069], FullOuter, None 
:- InMemoryColumnarTableScan [k#1055,v#1056], InMemoryRelation [k#1055,v#1056], true, 10000, StorageLevel(true, true, false, true, 1), Sort [k#1055 ASC], false, 0, None 
+- InMemoryColumnarTableScan [k#1069,v#1070], InMemoryRelation [k#1069,v#1070], true, 10000, StorageLevel(true, true, false, true, 1), Sort [k#1069 ASC], false, 0, None 

Нет сортировочных больше!

ответ

0

Похожая на присоединение к карте в Hadoop, Spark имеет широковещательное соединение, которое передает данные таблицы всем рабочим точно так же, как то, что распределяет кеш в hasoop mapreduce. Пожалуйста, обратитесь к источнику искра. Spark автоматически позаботится об этом, в отличие от улья. Поэтому нет необходимости беспокоиться об этом.

Для этого вам нужно будет понять несколько параметров.

-> spark.sql.autoBroadcastJoinThreshold, размер, при котором, искровое окно автоматически транслирует таблицу.

Вы можете попробовать под кодом, чтобы понять, как присоединиться к широковещанию, а также вы можете обратиться к источнику искры. Документация для BroadCast присоединяется или google для более подробной информации.

Пример кода попробовать:

val sqlContext = new HiveContext(sc); 
1) sqlContext.sql("CREATE TABLE IF NOT EXISTS tab3 (key INT, value STRING)") 

2) sqlContext.sql("INSERT INTO tab4 select 1,\"srini\" from sr23"); 
(I have created other table to just insert a record into table. As hive only support insert into select, i have used this trick to have some data.) You can skip this step as well, as you just want to see the physical plan. 

------ You can also use any Hive table that is already created instead.. I am just trying to simulate the hive table thats it. --- 

3) val srini_df1 = sqlContext.sql("ANALYZE TABLE tab4 COMPUTE STATISTICS NOSCAN"); 

4) val df2 = sc.parallelize(Seq((5,"F"), (6,"E"), (7,"sri"), (1,"test"))).toDF("key", "value") 

5) val join_df = sqlContext.sql("SELECT * FROM tab5").join(df2,"key"); 

6) join_df.explain 
16/03/15 22:40:09 INFO storage.MemoryStore: ensureFreeSpace(530360) called with curMem=238151, maxMem=555755765 
16/03/15 22:40:09 INFO storage.MemoryStore: Block broadcast_23 stored as values in memory (estimated size 517.9 KB, free 529.3 MB) 
16/03/15 22:40:09 INFO storage.MemoryStore: ensureFreeSpace(42660) called with curMem=768511, maxMem=555755765 
16/03/15 22:40:09 INFO storage.MemoryStore: Block broadcast_23_piece0 stored as bytes in memory (estimated size 41.7 KB, free 529.2 MB) 
16/03/15 22:40:09 INFO storage.BlockManagerInfo: Added broadcast_23_piece0 in memory on localhost:63721 (size: 41.7 KB, free: 529.9 MB) 
16/03/15 22:40:09 INFO spark.SparkContext: Created broadcast 23 from explain at <console>:28 
== Physical Plan == 
Project [key#374,value#375,value#370] 
BroadcastHashJoin [key#374], [key#369], BuildLeft 
    HiveTableScan [key#374,value#375], (MetastoreRelation default, tab5, None) 
    Project [_1#367 AS key#369,_2#368 AS value#370] 
    Scan PhysicalRDD[_1#367,_2#368] 
+0

Спасибо, что посмотрели на него, но мой вопрос касается двух БОЛЬШИЕ наборы. Широковещательное соединение просто не имеет никакого смысла, если только одна из присоединенных частей не является достаточно маленькой. – yurgis

0

Почему вы говорите ненужного рода? Объединение слияния требует сортировки данных. И в IMHO нет лучшей стратегии, чем объединение слияния, чтобы выполнить полное внешнее соединение, за исключением случаев, когда один из ваших информационных кадров достаточно мал, чтобы транслироваться.

+0

Когда вы объединяете два отсортированных набора, сортировать их не нужно. Это в значительной степени O (n). Более того, для равнораспределенных множеств слияние может выполняться локально для каждого разбиения. – yurgis

+0

Да, но когда вы отсортировали данные? вы только переделали его с помощью ключа соединения, но вы никогда не делали никакого этапа сортировки. Поэтому нет никакой возможности, чтобы искра могла знать, что ваши данные отсортированы. –

+0

Я понимаю, что перераспределение сортирует данные в пределах раздела, попробуйте команду x.show() – yurgis

Смежные вопросы