2016-10-28 4 views
2

Удалось ли кому-нибудь написать файлы (и особенно CSV) с помощью Spark's DataFrame на Windows?Scala & Spark: Dataframe.write._ на Windows

Многие ответы на SO устарели (например, this one) из-за собственных возможностей Sparks для записи .CSV (и унифицированного метода write()) с версии 2.0. Кроме того, я загрузил и добавил winutils.exe, как предложенный here.

Код:

// reading works just fine 
val df = spark.read 
      .option("header", true) 
      .option("inferSchema", true) 
      .csv("file:///C:/tmp/in.csv") 
// writing fails, none of these work 
df.write.csv("file:///C:/tmp/out.csv") 
df.write.csv("C:/tmp/out.csv") 

Ошибка:

Exception in thread "main" org.apache.spark.SparkException: Job aborted. 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelationCommand.scala:149) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply(InsertIntoHadoopFsRelationCommand.scala:115) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply(InsertIntoHadoopFsRelationCommand.scala:115) 
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:115) 
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:60) 
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:58) 
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) 
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) 
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86) 
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86) 
    at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:487) 
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211) 
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:194) 
    at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:551) 
    at prost.ebtl.load.DataSourceCSV$.loadFromFilesystem(DataSourceCSV.scala:12) 
    at TestScala$$anonfun$main$2.apply(TestScala.scala:98) 
    at TestScala$$anonfun$main$2.apply(TestScala.scala:80) 
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) 
    at scala.collection.immutable.List.foreach(List.scala:381) 
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) 
    at TestScala$.main(TestScala.scala:80) 
    at TestScala.main(TestScala.scala) 
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 3.0 failed 4 times, most recent failure: Lost task 1.3 in stage 3.0 (TID 13, 192.168.56.1): java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.createFileWithMode0(Ljava/lang/String;JJJI)Ljava/io/FileDescriptor; 
    at org.apache.hadoop.io.nativeio.NativeIO$Windows.createFileWithMode0(Native Method) 
    at org.apache.hadoop.io.nativeio.NativeIO$Windows.createFileOutputStreamWithMode(NativeIO.java:559) 
    at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:219) 
    at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:209) 
    at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:305) 
    at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:294) 
    at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:326) 
    at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:393) 
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:456) 
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:435) 
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:909) 
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:890) 
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:787) 
    at org.apache.hadoop.mapreduce.lib.output.TextOutputFormat.getRecordWriter(TextOutputFormat.java:132) 
    at org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.<init>(CSVRelation.scala:191) 
    at org.apache.spark.sql.execution.datasources.csv.CSVOutputWriterFactory.newInstance(CSVRelation.scala:169) 
    at org.apache.spark.sql.execution.datasources.BaseWriterContainer.newOutputWriter(WriterContainer.scala:131) 
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:247) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:85) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
    at java.lang.Thread.run(Unknown Source) 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) 
    at scala.Option.foreach(Option.scala:257) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1904) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelationCommand.scala:143) 
    ... 27 more 
Caused by: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.createFileWithMode0(Ljava/lang/String;JJJI)Ljava/io/FileDescriptor; 
    at org.apache.hadoop.io.nativeio.NativeIO$Windows.createFileWithMode0(Native Method) 
    at org.apache.hadoop.io.nativeio.NativeIO$Windows.createFileOutputStreamWithMode(NativeIO.java:559) 
    at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:219) 
    at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:209) 
    at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:305) 
    at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:294) 
    at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:326) 
    at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:393) 
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:456) 
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:435) 
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:909) 
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:890) 
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:787) 
    at org.apache.hadoop.mapreduce.lib.output.TextOutputFormat.getRecordWriter(TextOutputFormat.java:132) 
    at org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.<init>(CSVRelation.scala:191) 
    at org.apache.spark.sql.execution.datasources.csv.CSVOutputWriterFactory.newInstance(CSVRelation.scala:169) 
    at org.apache.spark.sql.execution.datasources.BaseWriterContainer.newOutputWriter(WriterContainer.scala:131) 
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:247) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:85) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
    at java.lang.Thread.run(Unknown Source) 

Примечание: папка с именем out.csv создается хотя

Настройка: Hadoop v.2.7.3, Spark 2.0.1 Intelli J IDEA 2016.2, Scala 2.11.8, Testcluster на рабочей станции Win7

ответ

2

Я пробовал это, его работа. Вам необходимо установить конфигурацию хранилища. Это единственное, что отсутствует в вашем коде, также у вас есть доступ на запись в каталог, в котором вы пытаетесь написать.

val spark = SparkSession 
    .builder() 
    .appName("Spark SQL CSV example") 
    .master("local") 
    .config("spark.sql.warehouse.dir", "file:///C:/IJava/") 
    .getOrCreate() 

    val df = spark.read 
    .option("header", true) 
    .option("inferSchema", true) 
    .csv("file:///C:/Users/sankar/Downloads/FLinsurancesample.csv") 

    df.write.csv("file:///C:/Users/sankar/Downloads/out.csv") 
+0

Спасибо! Wohooo, вот почему мне нравится переполнение стека. Спасибо, что нашли время ответить. Ваш ответ решает проблему при работе с 'master (" local [2] ")', но не с URL-адресом spark: // '. Я думаю, это зависит от окон. Наверное, я подожду, пока не нахожусь на Mac. Благодаря! – Boern