Я читаю csv-файл из hdfs, используя Spark. Он входит в объект FSDataInputStream. Я не могу использовать метод textfile(), потому что он разбивает файл csv на фид строки, и я читаю файл csv, у которого есть линейные фиды внутри текстовых полей. Opencsv from sourcefourge обрабатывает линейные каналы внутри ячеек, его хороший проект, но он принимает Reader как входной сигнал. Мне нужно преобразовать его в строку, чтобы передать ее opencsv как StringReader. Итак, файл HDFS -> FSdataINputStream -> String -> StringReader -> список строк в opencsv. Ниже приведен код ...Что происходит, когда вы выполняете манипуляции с данными java в Spark вне RDD
import java.io._
import org.apache.spark.sql.SQLContext
import org.apache.hadoop.fs._
import org.apache.hadoop.conf._
import com.opencsv._
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import java.lang.StringBuilder
val conf = new Configuration()
val hdfsCoreSitePath = new Path("core-site.xml")
val hdfsHDFSSitePath = new Path("hdfs-site.xml")
conf.addResource(hdfsCoreSitePath)
conf.addResource(hdfsHDFSSitePath)
val fileSystem = FileSystem.get(conf)
val csvPath = new Path("/raw_data/project_name/csv/file_name.csv")
val csvFile = fileSystem.open(csvPath)
val fileLen = fileSystem.getFileStatus(csvPath).getLen().toInt
var b = Array.fill[Byte](2048)(0)
var j = 1
val stringBuilder = new StringBuilder()
var bufferString = ""
csvFile.seek(0)
csvFile.read(b)
var bufferString = new String(b,"UTF-8")
stringBuilder.append(bufferString)
while(j != -1) {b = Array.fill[Byte](2048)(0);j=csvFile.read(b);bufferString = new String(b,"UTF-8");stringBuilder.append(bufferString)}
val stringBuilderClean = new StringBuilder()
stringBuilderClean = stringBuilder.substring(0,fileLen)
val reader: Reader = new StringReader(stringBuilderClean.toString()).asInstanceOf[Reader]
val csv = new CSVReader(reader)
val javaContext = new JavaSparkContext(sc)
val sqlContext = new SQLContext(sc)
val javaRDD = javaContext.parallelize(csv.readAll())
//do a bunch of transformations on the RDD
Это работает, но я сомневаюсь, что оно масштабируемо. Это заставляет меня задаться вопросом, насколько большим ограничением является наличие программы драйвера, которая передает данные во всех данных через один jvm. Мои вопросы к любому очень знакомые с искрой являются:
Что происходит, когда вы делаете манипуляции с данными по всему набору данных, как это, прежде чем он даже потеряется на вход RDD? Он просто рассматривается как любая другая программа и будет меняться как сумасшедший, я думаю?
Как вы могли бы сделать любую искровую программу масштабируемой? Вам всегда нужно извлекать данные непосредственно во входной RDD?
Это один находится где-то между «слишком широким» и «неясно, что вы просите». Можете ли вы уточнить, какова ваша фактическая проблема? –
Хорошо, я отредактировал его! –