2015-07-08 1 views
5

Я хочу написать тест для моего искрового потокового приложения, которое потребляет источник потока.Использование текстового файла в качестве источника потока Spark для целей тестирования

http://mkuthan.github.io/blog/2015/03/01/spark-unit-testing/ предлагает использовать ManualClock, но на данный момент для чтения файла и проверки выходов будет достаточно для меня.

Так что я хотел бы использовать:

JavaStreamingContext streamingContext = ... 
JavaDStream<String> stream = streamingContext.textFileStream(dataDirectory); 
stream.print(); 
streamingContext.awaitTermination(); 
streamingContext.start(); 

К сожалению, ничего не печатает.

Я пробовал:

  • DataDirectory = "HDFS: // узел: порт/абсолютный/путь/на/HDFS /"
  • DataDirectory = «Файл: // C: \\ \\ абсолют путь \\ на \\ \\ окна»
  • добавляющих текстовый файл в каталоге перед программой начинается
  • добавления текстового файла в каталоге пОКА запустить программу

Ничего не работает.

Любое предложение читать из текстового файла?

Спасибо,

Martin

ответ

0

Я так глуп, я перевернутые звонки начать() и awaitTermination()

Если вы хотите сделать то же самое, вы должны прочитать из HDFS, и добавьте файл WHILE, когда программа запускается.

+1

Здравствуйте @Martin, можете ли вы, если возможно, поделиться общим кодом? Благодарю. – user4342532

8

Порядок начала и ожидания действительно обратный.

В дополнение к этому, самый простой способ передачи данных в приложение Spark Streaming для тестирования - это QueueDStream. Это измененная очередь RDD произвольных данных. Это означает, что вы можете создавать данные программно или загружать их с диска в RDD и передавать их в код Spark Streaming.

Например. чтобы избежать проблем с синхронизацией с файловым компьютером, вы можете попробовать следующее:

val rdd = sparkContext.textFile(...) 
val rddQueue: Queue[RDD[String]] = Queue() 
rddQueue += rdd 
val dstream = streamingContext.queueStream(rddQueue) 
doMyStuffWithDstream(dstream) 
streamingContext.start() 
streamingContext.awaitTermination() 
Смежные вопросы