2016-08-03 2 views
0

Я в настоящее время изучаю Spark. Я столкнулся со следующей задачей: получить RDD, разбить его на основе определенных критериев и затем записать несколько файлов в разные папки в ведро S3.Spark RDD foreachPartition to S3

Все в порядке, пока мы не придем к загрузке на часть S3. Я прочитал все вопросы, относящиеся к этой проблеме на SO, и обнаружил, что могу использовать AmazonS3Client или метод saveToTextFile для RDD. Есть две проблемы, я лицо:

  1. Если я иду с AmazonS3Client я получаю java.io.NotSerializableException, так как код передается от водителя Спарк работнику он должен быть сериализованная и, видимо, AmazonS3Client не поддерживает что.

  2. Если я иду с saveToTextFile, я сталкиваюсь с аналогичной проблемой. Когда я иду в цикл foreachPartition, мне нужно получить Iterable[T] (в данном случае p), поэтому, если я хочу использовать saveToTextFile, мне нужно создать RDD Iterable, следовательно, parallelize. Проблема в том, что SparkContext sc также (по праву) не сериализуется.

rdd.foreachPartition { p => sc.parallelize(p.toSeq).saveAsTextFile(s"s3n://") }

Любая помощь будет принята с благодарностью.

ответ

2

Нет необходимости в этом. Вы можете просто использовать saveAsTextFile с РДОМ:

rdd.saveAsTextFile(s"s3n://dir/to/aux/file") 

saveAsTextFile напишет S3 в папке с большим количеством частей файла (как много частей, как перегородки). Затем вы можете объединиться в один файл, если хотите:

def mergeToS3(srcPath: String, dstPath: String, sc: SparkContext): Unit = { 
    val hadoopConfig = sc.hadoopConfiguration 
    val fs = FileSystem.get(new URI(srcPath), hadoopConfig) 
    FileUtil.copyMerge(fs, new Path(srcPath), fs, new Path(dstPath), true, hadoopConfig, null) 
    } 

    mergeToS3("s3n://dir/to/aux/file", "s3n://dir/to/singleFile",sc) 
Смежные вопросы