2015-03-18 5 views
0

StreamingContext - fileStream перегружен, чтобы использовать объект конфигурации Hadoop, но он, похоже, не работает.Spark Streaming с объектом конфигурации Hadoop

фрагмент кода из исходного кода Спарк:

https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala

def fileStream[K: ClassTag,V: ClassTag,F <: NewInputFormat[K, V]: ClassTag] (directory: String): InputDStream[(K, V)] = 
{ new FileInputDStream[K, V, F](this, directory) } 

def fileStream[K: ClassTag,V: ClassTag,F <: NewInputFormat[K, V]: ClassTag] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): InputDStream[(K, V)] = 
{ new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly) } 

def fileStream[K: ClassTag,V: ClassTag,F <: NewInputFormat[K, V]: ClassTag] (directory: String,filter: Path => Boolean, newFilesOnly: Boolean, conf: Configuration): InputDStream[(K, V)] = 
{ new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly, Option(conf)) } 

фрагмент кода: работает отлично

val windowDStream = ssc.fileStream[LongWritable, Text, TextInputFormat](args(0), (x: Path) => true, true); 

Ошибка компиляции:

val conf = sc.hadoopConfiguration; 
    val windowDStream = ssc.fileStream[LongWritable, Text, TextInputFormat](args(0), (x: Path) => true, true,conf); 

ошибка:

overloaded method value fileStream with alternatives: (directory: String,filter: org.apache.hadoop.fs.Path ⇒ Boolean,newFilesOnly: Boolean)(implicit evidence$9: scala.reflect.ClassTag[org.apache.hadoop.io.LongWritable], implicit evidence$10: scala.reflect.ClassTag[org.apache.hadoop.io.Text], implicit evidence$11: scala.reflect.ClassTag[org.apache.hadoop.mapreduce.lib.input.TextInputFormat])org.apache.spark.streaming.dstream.InputDStream[(org.apache.hadoop.io.LongWritable, org.apache.hadoop.io.Text)] <and> (directory: String)(implicit evidence$6: scala.reflect.ClassTag[org.apache.hadoop.io.LongWritable], implicit evidence$7: scala.reflect.ClassTag[org.apache.hadoop.io.Text], implicit evidence$8: scala.reflect.ClassTag[org.apache.hadoop.mapreduce.lib.input.TextInputFormat])org.apache.spark.streaming.dstream.InputDStream[(org.apache.hadoop.io.LongWritable, org.apache.hadoop.io.Text)] cannot be applied to (String, org.apache.hadoop.fs.Path ⇒ Boolean, Boolean, org.apache.hadoop.conf.Configuration) 

ответ

0

Я предполагаю, что вы используете Спарк 1.2 или старше. Если вы перейдете от основной к ветви 1.2, вы увидите, что эта перегрузка не существует. Фактически, FileInputDStream сам не принимал это как аргумент конструктора до 1.3 либо

+0

Да, мы используем Spark 1.2. Есть ли какая-нибудь работа, чтобы передать объект конфигурации hadoop в пользовательский формат ввода? –

+0

Нет, не сливая его в 1.2. Похоже, что это невинное изменение, которое вы могли бы создать для PR. Кодовая страница в 1.2 статична и не считывается из любого conf. –