2016-12-09 1 views
-2

Я получаю ниже исключения, если я соединяюсь между двумя кадрами данных в искро (ver 1.5, scala 2.10).Исключение сбоя DataFrame соединения, если ключевой столбец содержит период (".") В конце

Exception in thread "main" org.apache.spark.sql.AnalysisException: syntax error in attribute name: col1.; 
    at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$.e$1(unresolved.scala:99) 
    at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$.parseAttributeName(unresolved.scala:118) 
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveQuoted(LogicalPlan.scala:182) 
    at org.apache.spark.sql.DataFrame.resolve(DataFrame.scala:158) 
    at org.apache.spark.sql.DataFrame.col(DataFrame.scala:653) 
    at com.nielsen.buy.integration.commons.Demo$.main(Demo.scala:62) 
    at com.nielsen.buy.integration.commons.Demo.main(Demo.scala) 

Код работает нормально, если столбец в dataframe не содержит никакого периода. Пожалуйста, помогите мне.

Вы можете найти код, который я использую.

import org.apache.spark.SparkConf 
import org.apache.spark.SparkContext 
import org.apache.spark.sql.SQLContext 
import com.google.gson.Gson 
import org.apache.spark.sql.types.StructType 
import org.apache.spark.sql.types.StructField 
import org.apache.spark.sql.types.StringType 
import org.apache.spark.sql.Row 

object Demo 
{ 
lazy val sc: SparkContext = { 
    val conf = new SparkConf().setMaster("local") 
     .setAppName("demooo") 
     .set("spark.driver.allowMultipleContexts", "true") 
    new SparkContext(conf) 
    } 
    sc.setLogLevel("ERROR") 

    lazy val sqlcontext=new SQLContext(sc) 

val data=List(Row("a","b"),Row("v","b")) 
     val dataRdd=sc.parallelize(data) 
     val schema = new StructType(Array(StructField("col.1",StringType,true),StructField("col2",StringType,true))) 
     val df1=sqlcontext.createDataFrame(dataRdd, schema) 

     val data2=List(Row("a","b"),Row("v","b")) 
     val dataRdd2=sc.parallelize(data2) 
     val schema2 = new StructType(Array(StructField("col3",StringType,true),StructField("col4",StringType,true))) 
     val df2=sqlcontext.createDataFrame(dataRdd2, schema2) 
     val val1="col.1" 
     val df3= df1.join(df2,df1.col(val1).equalTo(df2.col("col3")),"outer").show 
} 

ответ

1

В общем, период используется для доступа к членам поля структуры. Исключительная версия, используемая вами (1.5), относительно устарела. Несколько таких проблем были исправлены в более поздних версиях, поэтому, если вы обновите его, это может решить проблему.

При этом вы можете просто использовать withColumnRenamed для переименования столбца в то, что не имеет периода до объединения. Таким образом, вы в основном, делают что-то вроде этого:

val dfTmp = df1.withColumnRenamed(val1, "JOIN_COL") 
val df3= dfTmp.join(df2,dfTmp.col("JOIN_COL").equalTo(df2.col("col3")),"outer").withColumnRenamed("JOIN_COL", val1) 
df3.show 

кстати показать возвращает блок, так что вы, вероятно, имел в виду DF3 быть равна выражению без него и не df3.show отдельно.

+0

спасибо .. ранее я использовал для замены точки во всех именах col и создавал новый dataframe. в свою очередь, требовал преобразования данных в rdd, что было ненужным действием. то я наконец понял после вашего ответа, что только ключевой столбец должен избавиться от точки. благодаря –

Смежные вопросы