2017-01-19 3 views
1

Я хочу эффективно искать множество идентификаторов. То, что у меня есть это dataframe, который выглядит, как этот dataframedf_source но пару миллионов записей, распределяемой на 10 рабочих:Efficent Поиск в Dataframe в Apache Spark

+-------+----------------+ 
| URI|  Links_lists| 
+-------+----------------+ 
| URI_1|[URI_8,URI_9,...| 
| URI_2|[URI_6,URI_7,...| 
| URI_3|[URI_4,URI_1,...| 
| URI_4|[URI_1,URI_5,...| 
| URI_5|[URI_3,URI_2,...| 
+-------+----------------+ 

Мой первый шаг должен был бы сделать RDD из df_source:

rdd_source = df_source.rdd 

из rdd_source Я хочу создать RDD, который содержит только URI с идентификаторами. Я делаю это так:

rdd_index = rdd_source.map(lambda x: x[0]).zipWithUniqueId() 

теперь я также .flatMap()rdd_source в к RDD, который содержит все отношения. До сих пор содержался только в столбце Links_list.

rdd_relations = rdd_source.flatMap(lamda x: x) 

теперь трансформировать как rdd_index и rdd_relations обратно в dataframes, потому что я хочу сделать соединения и я думаю (я мог бы быть неправильно на этом) присоединяется на dataframes быстрее.

schema_index = StructType([ 
    StructField("URI", StringType(), True), 
    StructField("ID", IntegerType(), True)) 

df_index = sqlContext.createDataFrame(rdd_index, schema=schema_index) 

и

schema_relation = StructType([ 
    StructField("URI", StringType(), True), 
    StructField("LINK", StringType(), True)) 

df_relations = sqlContext.createDataFrame(rdd_relations, schema=schema_relation) 

В результате dataframes должны выглядеть эти два:

df_index: 
+-------+-------+ 
| URI|  ID| 
+-------+-------+ 
| URI_1|  1| 
| URI_2|  2| 
| URI_3|  3| 
| URI_4|  4| 
| URI_5|  5| 
+-------+-------+ 

df_relations: 
+-------+-------+ 
| URI| LINK| 
+-------+-------+ 
| URI_1| URI_5| 
| URI_1| URI_8| 
| URI_1| URI_9| 
| URI_2| URI_3| 
| URI_2| URI_4| 
+-------+-------+ 

теперь заменить длинные строки URI, в df_relations я буду делать присоединяется на df_index, то первое присоединение:

df_relations =\ 
df_relations.join(df_index, df_relations.URI == df_index.URI,'inner')\ 
      .select(col(ID).alias(URI_ID),col('LINK')) 

Это должно дать мне dataframe глядя, как это:

df_relations: 
+-------+-------+ 
| URI_ID| LINK| 
+-------+-------+ 
|  1| URI_5| 
|  1| URI_8| 
|  1| URI_9| 
|  2| URI_3| 
|  2| URI_4| 
+-------+-------+ 

И второй присоединиться:

df_relations =\ 
df_relations.join(df_index, df_relations.LINK == df_index.URI,'inner')\ 
      .select(col(URI_ID),col('ID').alias(LINK_ID)) 

это должно привести в конечном dataframe тот, мне нужно. Глядя, как этот

df_relations: 
+-------+-------+ 
| URI_ID|LINK_ID| 
+-------+-------+ 
|  1|  5| 
|  1|  8| 
|  1|  9| 
|  2|  3| 
|  2|  4| 
+-------+-------+ 

где все идентификаторы URI заменяются идентификаторами из df_index.

Это эффективный способ поиска идентификаторов для всех URI на обоих столбцах в таблице отношений или есть ли более эффективный способ сделать это?

Я использую Apache Спарк 2.1.0 с Python 3.5

ответ

1

Вам не нужно использовать RDD для операций, которые вы описали. Использование RDD может быть очень дорогостоящим. Во-вторых вам не нужно делать два соединения, вы можете сделать только один:

import pyspark.sql.functions as f 
# add a unique id for each URI 
withID = df_source.withColumn("URI_ID", f.monotonically_increasing_id()) 
# create a single line from each element in the array 
exploded = withID.select("URI_ID", f.explode("Links_lists").alias("LINK") 
linkID = withID.withColumnRenamed("URI_ID", "LINK_ID").drop("Links_lists") 
joined= exploded.join(linkID, on=exploded.LINK==linkID.URI).drop("URI").drop("LINK") 

Наконец, если LinkId (который в основном df_source с колонкой замененного) является относительно небольшим (т.е. может быть полностью содержаться в одном работника) вы можете транслировать его.перед подключением следует добавить следующее:

linkID = f.broadcast(linkID) 
+0

Это выглядит очень полезно, я попробую завтра! – Thagor

Смежные вопросы