2016-05-27 5 views
-1

Я преобразовываю свой код Scala в pyspark, как показано ниже, но получил разные значения для окончательного RDD.Несогласованные результаты работы Scala Spark и pyspark

Мой Scala код:

val scalaRDD = rowRDD.map { 
    row: Row => 
    var rowList: ListBuffer[Row] = ListBuffer() 
    rowList.add(row) 
    (row.getString(1) + "_" + row.getString(6), rowList) 
}.reduceByKey{ (list1,list2) => 

    var rowList: ListBuffer[Row] = ListBuffer() 
    for (i <- 0 to list1.length -1) { 
    val row1 = list1.get(i); 

    var foundMatch = false; 

    breakable { 
     for (j <- 0 to list2.length -1) { 
     var row2 = list2.get(j); 
     val result = mergeRow(row1, row2) 
     if (result._1) { 
      list2.set(j, result._2) 
      foundMatch = true; 
      break; 
     } 
     } // for j loop 
    } // breakable for j 

    if(!foundMatch) { 
     rowList.add(row1); 
    } 
    } 

    list2.addAll(rowList); 

    list2 
}.flatMap { t=> t._2 } 

где

def mergeRow(row1:Row, row2:Row):(Boolean, Row)= { 
    var z:Array[String] = new Array[String](row1.length) 
    var hasDiff = false 

    for (k <- 1 to row1.length -2){ 
       // k = 0 : ID, always different 
       // k = 43 : last field, which is not important 

     if (row1.getString(0) < row2.getString(0)) { 
     z(0) = row2.getString(0) 
     z(43) = row2.getString(43) 
     } else { 
     z(0) = row1.getString(0) 
     z(43) = row1.getString(43) 
     } 

     if (Option(row2.getString(k)).getOrElse("").isEmpty && !Option(row1.getString(k)).getOrElse("").isEmpty) { 
      z(k) = row1.getString(k) 
      hasDiff = true 
     } else if (!Option(row1.getString(k)).getOrElse("").isEmpty && !Option(row2.getString(k)).getOrElse("").isEmpty && row1.getString(k) != row2.getString(k)) { 
      return (false, null) 
     } else { 
      z(k) = row2.getString(k) 
     } 
    } // for k loop 

    if (hasDiff) { 
     (true, Row.fromSeq(z)) 
    } else { 
     (true, row2) 
    } 
} 

Затем я попытался преобразовать их в pyspark код, как показано ниже:

pySparkRDD = rowRDD.map (
    lambda row : singleRowList(row) 
).reduceByKey(lambda list1,list2: mergeList(list1,list2)).flatMap(lambda x : x[1]) 

, где у меня есть:

def mergeRow(row1, row2): 
    z=[] 
    hasDiff = False 

    #for (k <- 1 to row1.length -2){ 
    for k in xrange(1, len(row1) - 2): 
       # k = 0 : ID, always different 
       # k = 43 : last field, which is not important 

     if (row1[0] < row2[0]): 
     z[0] = row2[0] 
     z[43] = row2[43] 
     else: 
     z[0] = row1[0] 
     z[43] = row1[43] 


     if not(row2[k]) and row1[k]: 
      z[k] = row1[k].strip() 
      hasDiff = True 
     elif row1[k] and row2[k] and row1[k].strip() != row2[k].strip(): 
      return (False, None) 
     else: 
      z[k] = row2[k].strip() 



    if hasDiff: 
     return (True, Row.fromSeq(z)) 
    else: 
     return (True, row2) 

и

def singleRowList(row): 
    myList=[] 
    myList.append(row) 

    return (row[1] + "_" + row[6], myList) 

и

def mergeList(list1, list2): 
    rowList = [] 
    for i in xrange(0, len(list1)-1): 
    row1 = list1[i] 
    foundMatch = False 
    for j in xrange(0, len(list2)-1): 
     row2 = list2[j] 
     resultBool, resultRow = mergeRow(row1, row2) 
     if resultBool: 
      list2[j] = resultRow 
      foundMatch = True 
      break 

    if foundMatch == False: 
     rowList.append(row1) 

    list2.extend(rowList) 

    return list2 

BTV, rowRDD преобразуется из кадра данных. то есть rowRDD = myDF.rdd

Однако у меня есть разные значения для scalaRDD и pySparkRDD. Я много раз проверял коды, но не мог понять, что я пропустил. У кого-нибудь есть идеи? Благодаря!

ответ

2

Рассмотрим это:

scala> (1 to 5).length 
res1: Int = 5 

и это:

>>> len(xrange(1, 5)) 
4 
+0

В Scala, вы должны использовать "до" воспроизвести "диапазон" Питона: (от 1 до 5) -> 1 , 2, 3, 4, 5 (1 до 5) -> 1, 2, 3, 4 – FLab

Смежные вопросы