2016-02-25 1 views
1

Я читаю csv-файл из hdfs, используя Spark. Он входит в объект FSDataInputStream. Я не могу использовать метод textfile(), потому что он разбивает файл csv на фид строки, и я читаю файл csv, у которого есть линейные фиды внутри текстовых полей. Opencsv from sourcefourge обрабатывает линейные каналы внутри ячеек, его хороший проект, но он принимает Reader как входной сигнал. Мне нужно преобразовать его в строку, чтобы передать ее opencsv как StringReader. Итак, файл HDFS -> FSdataINputStream -> String -> StringReader -> список строк в opencsv. Ниже приведен код ...Что происходит, когда вы выполняете манипуляции с данными java в Spark вне RDD

import java.io._ 
import org.apache.spark.sql.SQLContext 
import org.apache.hadoop.fs._ 
import org.apache.hadoop.conf._ 
import com.opencsv._ 
import org.apache.spark.api.java.JavaSparkContext 
import org.apache.spark.sql._ 
import org.apache.spark.sql.types._ 
import java.lang.StringBuilder 

val conf = new Configuration() 
val hdfsCoreSitePath = new Path("core-site.xml") 
val hdfsHDFSSitePath = new Path("hdfs-site.xml") 
conf.addResource(hdfsCoreSitePath) 
conf.addResource(hdfsHDFSSitePath) 
val fileSystem = FileSystem.get(conf) 
val csvPath = new Path("/raw_data/project_name/csv/file_name.csv") 
val csvFile = fileSystem.open(csvPath) 
val fileLen = fileSystem.getFileStatus(csvPath).getLen().toInt 

var b = Array.fill[Byte](2048)(0) 
var j = 1 

val stringBuilder = new StringBuilder() 
var bufferString = "" 

csvFile.seek(0) 
csvFile.read(b) 
var bufferString = new String(b,"UTF-8") 
stringBuilder.append(bufferString) 

while(j != -1) {b = Array.fill[Byte](2048)(0);j=csvFile.read(b);bufferString = new String(b,"UTF-8");stringBuilder.append(bufferString)} 

val stringBuilderClean = new StringBuilder() 
stringBuilderClean = stringBuilder.substring(0,fileLen) 

val reader: Reader = new StringReader(stringBuilderClean.toString()).asInstanceOf[Reader] 
val csv = new CSVReader(reader) 
val javaContext = new JavaSparkContext(sc) 
val sqlContext = new SQLContext(sc) 
val javaRDD = javaContext.parallelize(csv.readAll()) 
//do a bunch of transformations on the RDD 

Это работает, но я сомневаюсь, что оно масштабируемо. Это заставляет меня задаться вопросом, насколько большим ограничением является наличие программы драйвера, которая передает данные во всех данных через один jvm. Мои вопросы к любому очень знакомые с искрой являются:

  1. Что происходит, когда вы делаете манипуляции с данными по всему набору данных, как это, прежде чем он даже потеряется на вход RDD? Он просто рассматривается как любая другая программа и будет меняться как сумасшедший, я думаю?

  2. Как вы могли бы сделать любую искровую программу масштабируемой? Вам всегда нужно извлекать данные непосредственно во входной RDD?

+1

Это один находится где-то между «слишком широким» и «неясно, что вы просите». Можете ли вы уточнить, какова ваша фактическая проблема? –

+0

Хорошо, я отредактировал его! –

ответ

3

Ваш код загружает данные в память, а затем драйвер Spark будет разделять и отправлять каждую часть данных исполнителю, из-за чего он не масштабируется.
Есть два способа решить ваш вопрос.

записи пользовательских InputFormat для поддержки формата файла CSV

import java.io.{InputStreamReader, IOException} 

import com.google.common.base.Charsets 
import com.opencsv.{CSVParser, CSVReader} 
import org.apache.hadoop.conf.Configuration 
import org.apache.hadoop.fs.{Seekable, Path, FileSystem} 
import org.apache.hadoop.io.compress._ 
import org.apache.hadoop.io.{ArrayWritable, Text, LongWritable} 
import org.apache.hadoop.mapred._ 

class CSVInputFormat extends FileInputFormat[LongWritable, ArrayWritable] with JobConfigurable { 
    private var compressionCodecs: CompressionCodecFactory = _ 

    def configure(conf: JobConf) { 
    compressionCodecs = new CompressionCodecFactory(conf) 
    } 

    protected override def isSplitable(fs: FileSystem, file: Path): Boolean = { 
    val codec: CompressionCodec = compressionCodecs.getCodec(file) 
    if (null == codec) { 
     return true 
    } 
    codec.isInstanceOf[SplittableCompressionCodec] 
    } 

    @throws(classOf[IOException]) 
    def getRecordReader(genericSplit: InputSplit, job: JobConf, reporter: Reporter): RecordReader[LongWritable, ArrayWritable] = { 
    reporter.setStatus(genericSplit.toString) 
    val delimiter: String = job.get("textinputformat.record.delimiter") 
    var recordDelimiterBytes: Array[Byte] = null 
    if (null != delimiter) { 
     recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8) 
    } 
    new CsvLineRecordReader(job, genericSplit.asInstanceOf[FileSplit], recordDelimiterBytes) 
    } 
} 

class CsvLineRecordReader(job: Configuration, split: FileSplit, recordDelimiter: Array[Byte]) 
    extends RecordReader[LongWritable, ArrayWritable] { 
    private val compressionCodecs = new CompressionCodecFactory(job) 
    private val maxLineLength = job.getInt(org.apache.hadoop.mapreduce.lib.input. 
    LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE) 
    private var filePosition: Seekable = _ 
    private val file = split.getPath 
    private val codec = compressionCodecs.getCodec(file) 
    private val isCompressedInput = codec != null 
    private val fs = file.getFileSystem(job) 
    private val fileIn = fs.open(file) 

    private var start = split.getStart 
    private var pos: Long = 0L 
    private var end = start + split.getLength 
    private var reader: CSVReader = _ 
    private var decompressor: Decompressor = _ 

    private lazy val CSVSeparator = 
    if (recordDelimiter == null) 
     CSVParser.DEFAULT_SEPARATOR 
    else 
     recordDelimiter(0).asInstanceOf[Char] 

    if (isCompressedInput) { 
    decompressor = CodecPool.getDecompressor(codec) 
    if (codec.isInstanceOf[SplittableCompressionCodec]) { 
     val cIn = (codec.asInstanceOf[SplittableCompressionCodec]) 
     .createInputStream(fileIn, decompressor, start, end, SplittableCompressionCodec.READ_MODE.BYBLOCK) 
     reader = new CSVReader(new InputStreamReader(cIn), CSVSeparator) 
     start = cIn.getAdjustedStart 
     end = cIn.getAdjustedEnd 
     filePosition = cIn 
    }else { 
     reader = new CSVReader(new InputStreamReader(codec.createInputStream(fileIn, decompressor)), CSVSeparator) 
     filePosition = fileIn 
    } 
    } else { 
    fileIn.seek(start) 
    reader = new CSVReader(new InputStreamReader(fileIn), CSVSeparator) 
    filePosition = fileIn 
    } 

    @throws(classOf[IOException]) 
    private def getFilePosition: Long = { 
    if (isCompressedInput && null != filePosition) { 
     filePosition.getPos 
    }else 
     pos 
    } 

    private def nextLine: Option[Array[String]] = { 
    if (getFilePosition < end){ 
     //readNext automatical split the line to elements 
     reader.readNext() match { 
     case null => None 
     case elems => Some(elems) 
     } 
    } else 
     None 
    } 

    override def next(key: LongWritable, value: ArrayWritable): Boolean = 
    nextLine 
     .exists { elems => 
     key.set(pos) 
     val lineLength = elems.foldRight(0)((a, b) => a.length + 1 + b) 
     pos += lineLength 
     value.set(elems.map(new Text(_))) 
     if (lineLength < maxLineLength) true else false 
     } 

    @throws(classOf[IOException]) 
    def getProgress: Float = 
    if (start == end) 
     0.0f 
    else 
     Math.min(1.0f, (getFilePosition - start)/(end - start).toFloat) 

    override def getPos: Long = pos 

    override def createKey(): LongWritable = new LongWritable 

    override def close(): Unit = { 
    try { 
     if (reader != null) { 
     reader.close 
     } 
    } finally { 
     if (decompressor != null) { 
     CodecPool.returnDecompressor(decompressor) 
     } 
    } 
    } 

    override def createValue(): ArrayWritable = new ArrayWritable(classOf[Text]) 
} 

Простой тестовый пример:

val arrayRdd = sc.hadoopFile("source path", classOf[CSVInputFormat], classOf[LongWritable], classOf[ArrayWritable], 
sc.defaultMinPartitions).map(_._2.get().map(_.toString)) 
arrayRdd.collect().foreach(e => println(e.mkString(","))) 

Другой способ, который я предпочитаю использования spark-csv написаны databrick s, который хорошо поддерживается для формата CSV-файла, вы можете воспользоваться некоторыми практиками на странице github.

Обновлено для spark-csv, используя univocity в parserLib, который может обрабатывать клетки многострочные

val df = sqlContext.read 
.format("com.databricks.spark.csv") 
.option("header", "true") // Use first line of all files as header 
.option("parserLib", "univocity") 
.option("inferSchema", "true") // Automatically infer data types 
.load("source path") 
+0

Я уже пробовал искру-csv и не обрабатывал линии. Он создает новые записи, когда видит линейный канал, а не ожидает LF/CR. Он не учитывает файлы csv, у которых есть строки в одной записи. –

+0

см. Мои обновления выше, я пишу простой пример с использованием spark-csv для обработки csv. – taigetco

1

Что происходит, когда вы делаете манипуляции с данными по всему набору данных, как это, прежде чем он даже потеряется на вход RDD? Он просто рассматривается как любая другая программа и будет меняться как сумасшедший, я думаю?

Вы загружаете весь набор данных в локальную память. Поэтому, если у вас есть память, она работает.

Как вы могли бы сделать любую искровую программу масштабируемой?

Вы выбрали формат данных, который может заряжать искра, или вы меняете свое приложение, чтобы оно могло загружать формат данных в искру непосредственно или немного.

В этом случае вы можете посмотреть на создание настраиваемого InputFormat, который разбивается на нечто иное, чем символы новой строки. Я думаю, вам также хотелось бы посмотреть, как вы пишете свои данные, поэтому он разделен на HDFS на границах записей, а не на новых строках.

Однако я подозреваю, что самый простой ответ заключается в кодировании данных по-разному. JSON Lines или кодировать символы новой строки в CSV-файле во время записи или Avro или ... Все, что лучше подходит для Spark & HDFS.

Смежные вопросы