2015-09-10 4 views
0

У меня этот код:Спарк: сплит строки и накапливаются

val rdd = sc.textFile(sample.log") 
val splitRDD = rdd.map(r => StringUtils.splitPreserveAllTokens(r, "\\|")) 
val rdd2 = splitRDD.filter(...).map(row => createRow(row, fieldsMap)) 
sqlContext.createDataFrame(rdd2, structType).save(
    org.apache.phoenix.spark, SaveMode.Overwrite, Map("table" -> table, "zkUrl" -> zkUrl)) 

def createRow(row: Array[String], fieldsMap: ListMap[Int, FieldConfig]): Row = { 
    //add additional index for invalidValues 
    val arrSize = fieldsMap.size + 1 
    val arr = new Array[Any](arrSize) 
    var invalidValues = "" 
    for ((k, v) <- fieldsMap) { 
     val valid = ... 
     var value : Any = null 
     if (valid) { 
     value = row(k) 
     // if (v.code == "SOURCE_NAME") --> 5th column in the row 
     // sourceNameCount = row(k).split(",").size 
     } else { 
     invalidValues += v.code + " : " + row(k) + " | " 
     } 
     arr(k) = value 
    } 
    arr(arrSize - 1) = invalidValues 
    Row.fromSeq(arr.toSeq) 
} 

fieldsMap содержит отображение входных столбцов: (индекс, FieldConfig). Где класс FieldConfig содержит значения «code» и «dataType».

TOPIC -> (0, v.code = "TOPIC", v.dataType = "String") 
GROUP -> (1, v.code = "GROUP") 
SOURCE_NAME1,SOURCE_NAME2,SOURCE_NAME3 -> (4, v.code = "SOURCE_NAME") 

Это sample.log:

TOPIC|GROUP|TIMESTAMP|STATUS|SOURCE_NAME1,SOURCE_NAME2,SOURCE_NAME3| 
SOURCE_TYPE1,SOURCE_TYPE2,SOURCE_TYPE3|SOURCE_COUNT1,SOURCE_COUNT2,SOURCE_COUNT3| 
DEST_NAME1,DEST_NAME2,DEST_NAME3|DEST_TYPE1,DEST_TYPE2,DEST_TYPE3| 
DEST_COUNT1,DEST_COUNT2,DEST_COUNT3| 

Цель состоит в том, чтобы разделить входной (sample.log), основанный на количестве SOURCE_NAME (ов) .. В приведенном выше примере, на выходе будет иметь 3 строки:

TOPIC|GROUP|TIMESTAMP|STATUS|SOURCE_NAME1|SOURCE_TYPE1|SOURCE_COUNT1| 
|DEST_NAME1|DEST_TYPE1|DEST_COUNT1| 

TOPIC|GROUP|TIMESTAMP|STATUS|SOURCE_NAME2|SOURCE_TYPE2|SOURCE_COUNT2| 
DEST_NAME2|DEST_TYPE2|DEST_COUNT2| 

TOPIC|GROUP|TIMESTAMP|STATUS|SOURCE_NAME3|SOURCE_TYPE3|SOURCE_COUNT3| 
|DEST_NAME3|DEST_TYPE3|DEST_COUNT3| 

Это новый код, который я сейчас работаю (по-прежнему с использованием createRow определено выше):

 val rdd2 = splitRDD.filter(...).flatMap(row => { 

     val srcName = row(4).split(",") 
     val srcType = row(5).split(",") 
     val srcCount = row(6).split(",") 

     val destName = row(7).split(",") 
     val destType = row(8).split(",") 
     val destCount = row(9).split(",") 

     var newRDD: ArrayBuffer[Row] = new ArrayBuffer[Row]() 

     //if (srcName != null) { 
     println("\n\nsrcName.size: " + srcName.size + "\n\n") 
     for (i <- 0 to srcName.size - 1) { 
      // missing column: destType can sometimes be null 

      val splittedRow: Array[String] = Row.fromSeq(Seq((row(0), row(1), row(2), row(3), 
      srcName(i), srcType(i), srcCount(i), destName(i), "", destCount(i)))).toSeq.toArray[String] 

      newRDD = newRDD ++ Seq(createRow(splittedRow, fieldsMap)) 
     } 
     //} 

     Seq(Row.fromSeq(Seq(newRDD))) 

    }) 

Поскольку я имею ошибку в преобразовании моего splittedRow в Array [String] (".toSeq.toArray [String]")

error: type arguments [String] do not conform to method toArray's type parameter bounds [B >: Any] 

я решил обновить свой splittedRow до:

 val rowArr: Array[String] = new Array[String](10) 

      for (j <- 0 to 3) { 
      rowArr(j) = row(j) 
      } 
      rowArr(4) = srcName(i) 
      rowArr(5) = row(5).split(",")(i) 
      rowArr(6) = row(6).split(",")(i) 
      rowArr(7) = row(7).split(",")(i) 
      rowArr(8) = row(8).split(",")(i) 
      rowArr(9) = row(9).split(",")(i) 

      val splittedRow = rowArr 

ответ

0

Вы можете использовать операцию flatMap вместо операции map для возврата нескольких строк. Следовательно, ваш createRow будет реорганизован на createRows(row: Array[String], fieldsMap: List[Int, IngestFieldConfig]): Seq[Row].

+0

Hi Till, я обновил свой вопрос, включив createRows. Можете ли вы предложить решение для «row.map», так как я получаю несколько строк? Также для отсутствующих столбцов какой подход вы бы порекомендовали? Спасибо – sophie

+0

@ Софи, я хотел бы помочь вам, но мне нужно будет узнать немного больше о вашем формате ввода и вывода. Результат вашего примера говорит что-то вроде «TOPIC | GROUP | ...», но в первой реализации «createRow» вы создаете несколько пар значений ключа «v.code +»: «+ row (k)». Не могли бы вы привести пример, где вы определяете правильный ввод и вывод? Кроме того, какие столбцы могут отсутствовать? –

+0

Привет, Тилл, я обновил свой вопрос, чтобы включить объяснение параметра «fieldsMap». Моя идея состоит в том, чтобы повторно использовать функцию createRow и предварительно обработать строки, которые я передаю им. Таким образом, если входной файл изначально является 1 строкой, но имеет 3 источника, flatMap будет генерировать 3 строки, а затем цикл createRow 3x. Затем «newRDD» будет передан в createDateFrame. О «недостающем столбце» - он основан на моем новом коде - destType (i). Я не уверен, как поместить условие destType в значение null иногда в генерации массива (Row ..) – sophie

Смежные вопросы