0
StreamingContext - fileStream перегружен, чтобы использовать объект конфигурации Hadoop, но он, похоже, не работает.Spark Streaming с объектом конфигурации Hadoop
фрагмент кода из исходного кода Спарк:
def fileStream[K: ClassTag,V: ClassTag,F <: NewInputFormat[K, V]: ClassTag] (directory: String): InputDStream[(K, V)] =
{ new FileInputDStream[K, V, F](this, directory) }
def fileStream[K: ClassTag,V: ClassTag,F <: NewInputFormat[K, V]: ClassTag] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): InputDStream[(K, V)] =
{ new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly) }
def fileStream[K: ClassTag,V: ClassTag,F <: NewInputFormat[K, V]: ClassTag] (directory: String,filter: Path => Boolean, newFilesOnly: Boolean, conf: Configuration): InputDStream[(K, V)] =
{ new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly, Option(conf)) }
фрагмент кода: работает отлично
val windowDStream = ssc.fileStream[LongWritable, Text, TextInputFormat](args(0), (x: Path) => true, true);
Ошибка компиляции:
val conf = sc.hadoopConfiguration;
val windowDStream = ssc.fileStream[LongWritable, Text, TextInputFormat](args(0), (x: Path) => true, true,conf);
ошибка:
overloaded method value fileStream with alternatives: (directory: String,filter: org.apache.hadoop.fs.Path ⇒ Boolean,newFilesOnly: Boolean)(implicit evidence$9: scala.reflect.ClassTag[org.apache.hadoop.io.LongWritable], implicit evidence$10: scala.reflect.ClassTag[org.apache.hadoop.io.Text], implicit evidence$11: scala.reflect.ClassTag[org.apache.hadoop.mapreduce.lib.input.TextInputFormat])org.apache.spark.streaming.dstream.InputDStream[(org.apache.hadoop.io.LongWritable, org.apache.hadoop.io.Text)] <and> (directory: String)(implicit evidence$6: scala.reflect.ClassTag[org.apache.hadoop.io.LongWritable], implicit evidence$7: scala.reflect.ClassTag[org.apache.hadoop.io.Text], implicit evidence$8: scala.reflect.ClassTag[org.apache.hadoop.mapreduce.lib.input.TextInputFormat])org.apache.spark.streaming.dstream.InputDStream[(org.apache.hadoop.io.LongWritable, org.apache.hadoop.io.Text)] cannot be applied to (String, org.apache.hadoop.fs.Path ⇒ Boolean, Boolean, org.apache.hadoop.conf.Configuration)
Да, мы используем Spark 1.2. Есть ли какая-нибудь работа, чтобы передать объект конфигурации hadoop в пользовательский формат ввода? –
Нет, не сливая его в 1.2. Похоже, что это невинное изменение, которое вы могли бы создать для PR. Кодовая страница в 1.2 статична и не считывается из любого conf. –