У меня этот код:Спарк: сплит строки и накапливаются
val rdd = sc.textFile(sample.log")
val splitRDD = rdd.map(r => StringUtils.splitPreserveAllTokens(r, "\\|"))
val rdd2 = splitRDD.filter(...).map(row => createRow(row, fieldsMap))
sqlContext.createDataFrame(rdd2, structType).save(
org.apache.phoenix.spark, SaveMode.Overwrite, Map("table" -> table, "zkUrl" -> zkUrl))
def createRow(row: Array[String], fieldsMap: ListMap[Int, FieldConfig]): Row = {
//add additional index for invalidValues
val arrSize = fieldsMap.size + 1
val arr = new Array[Any](arrSize)
var invalidValues = ""
for ((k, v) <- fieldsMap) {
val valid = ...
var value : Any = null
if (valid) {
value = row(k)
// if (v.code == "SOURCE_NAME") --> 5th column in the row
// sourceNameCount = row(k).split(",").size
} else {
invalidValues += v.code + " : " + row(k) + " | "
}
arr(k) = value
}
arr(arrSize - 1) = invalidValues
Row.fromSeq(arr.toSeq)
}
fieldsMap содержит отображение входных столбцов: (индекс, FieldConfig). Где класс FieldConfig содержит значения «code» и «dataType».
TOPIC -> (0, v.code = "TOPIC", v.dataType = "String")
GROUP -> (1, v.code = "GROUP")
SOURCE_NAME1,SOURCE_NAME2,SOURCE_NAME3 -> (4, v.code = "SOURCE_NAME")
Это sample.log:
TOPIC|GROUP|TIMESTAMP|STATUS|SOURCE_NAME1,SOURCE_NAME2,SOURCE_NAME3|
SOURCE_TYPE1,SOURCE_TYPE2,SOURCE_TYPE3|SOURCE_COUNT1,SOURCE_COUNT2,SOURCE_COUNT3|
DEST_NAME1,DEST_NAME2,DEST_NAME3|DEST_TYPE1,DEST_TYPE2,DEST_TYPE3|
DEST_COUNT1,DEST_COUNT2,DEST_COUNT3|
Цель состоит в том, чтобы разделить входной (sample.log), основанный на количестве SOURCE_NAME (ов) .. В приведенном выше примере, на выходе будет иметь 3 строки:
TOPIC|GROUP|TIMESTAMP|STATUS|SOURCE_NAME1|SOURCE_TYPE1|SOURCE_COUNT1|
|DEST_NAME1|DEST_TYPE1|DEST_COUNT1|
TOPIC|GROUP|TIMESTAMP|STATUS|SOURCE_NAME2|SOURCE_TYPE2|SOURCE_COUNT2|
DEST_NAME2|DEST_TYPE2|DEST_COUNT2|
TOPIC|GROUP|TIMESTAMP|STATUS|SOURCE_NAME3|SOURCE_TYPE3|SOURCE_COUNT3|
|DEST_NAME3|DEST_TYPE3|DEST_COUNT3|
Это новый код, который я сейчас работаю (по-прежнему с использованием createRow определено выше):
val rdd2 = splitRDD.filter(...).flatMap(row => {
val srcName = row(4).split(",")
val srcType = row(5).split(",")
val srcCount = row(6).split(",")
val destName = row(7).split(",")
val destType = row(8).split(",")
val destCount = row(9).split(",")
var newRDD: ArrayBuffer[Row] = new ArrayBuffer[Row]()
//if (srcName != null) {
println("\n\nsrcName.size: " + srcName.size + "\n\n")
for (i <- 0 to srcName.size - 1) {
// missing column: destType can sometimes be null
val splittedRow: Array[String] = Row.fromSeq(Seq((row(0), row(1), row(2), row(3),
srcName(i), srcType(i), srcCount(i), destName(i), "", destCount(i)))).toSeq.toArray[String]
newRDD = newRDD ++ Seq(createRow(splittedRow, fieldsMap))
}
//}
Seq(Row.fromSeq(Seq(newRDD)))
})
Поскольку я имею ошибку в преобразовании моего splittedRow в Array [String] (".toSeq.toArray [String]")
error: type arguments [String] do not conform to method toArray's type parameter bounds [B >: Any]
я решил обновить свой splittedRow до:
val rowArr: Array[String] = new Array[String](10)
for (j <- 0 to 3) {
rowArr(j) = row(j)
}
rowArr(4) = srcName(i)
rowArr(5) = row(5).split(",")(i)
rowArr(6) = row(6).split(",")(i)
rowArr(7) = row(7).split(",")(i)
rowArr(8) = row(8).split(",")(i)
rowArr(9) = row(9).split(",")(i)
val splittedRow = rowArr
Hi Till, я обновил свой вопрос, включив createRows. Можете ли вы предложить решение для «row.map», так как я получаю несколько строк? Также для отсутствующих столбцов какой подход вы бы порекомендовали? Спасибо – sophie
@ Софи, я хотел бы помочь вам, но мне нужно будет узнать немного больше о вашем формате ввода и вывода. Результат вашего примера говорит что-то вроде «TOPIC | GROUP | ...», но в первой реализации «createRow» вы создаете несколько пар значений ключа «v.code +»: «+ row (k)». Не могли бы вы привести пример, где вы определяете правильный ввод и вывод? Кроме того, какие столбцы могут отсутствовать? –
Привет, Тилл, я обновил свой вопрос, чтобы включить объяснение параметра «fieldsMap». Моя идея состоит в том, чтобы повторно использовать функцию createRow и предварительно обработать строки, которые я передаю им. Таким образом, если входной файл изначально является 1 строкой, но имеет 3 источника, flatMap будет генерировать 3 строки, а затем цикл createRow 3x. Затем «newRDD» будет передан в createDateFrame. О «недостающем столбце» - он основан на моем новом коде - destType (i). Я не уверен, как поместить условие destType в значение null иногда в генерации массива (Row ..) – sophie