2015-02-25 2 views
4

У меня есть эта функция в программе драйвера, которая собирает результат из rdds в массив и отправляет его обратно. Однако, несмотря на то, что у RDD (в dstream) есть данные, функция возвращает пустой массив ... Что я делаю неправильно?Соберите результаты из RDD в программе драйвера dstream

def runTopFunction() : Array[(String, Int)] = { 
     val topSearches = some function.... 
     val summary = new ArrayBuffer[(String,Int)]() 
     topSearches.foreachRDD(rdd => { 
      summary = summary.++(rdd.collect()) 
     })  

    return summary.toArray 
} 

ответ

1

Таким образом, хотя foreachRDD будет делать то, что вы хотите сделать, это также не блокирование, которое означает, что он не будет ждать, пока весь поток не будет обработан. Поскольку у вас есть toArray на вашем буфере сразу после звонка на foreachRDD, еще не было элементов, обработанных.

+2

Вместо того, чтобы «не блокировать», вычисление является ленивым и запланировано на более поздний момент. Таким образом, этот ответ неверен в терминологии. – maasg

+1

foreachrdd не ленится так же, как преобразования на DStreams или RDD, это скорее действие, чем трансформация. – Holden

+0

@ user2888475 Тем не менее ничего не произойдет до тех пор, пока 'streamingContext.start()' не будет вызван, и что-то произойдет запланировано на каждый интервал времени потоковой передачи. Действия в Spark Streaming приводят к планированию так же, как в Spark, они приводят к выполнению. т. е. Dstream без действий ничего не сделает. – maasg

1

DStream.forEachRDD - действие по заданному DStream и будет запланировано к исполнению для каждого интервала периодического потока. Это декларативное построение задания, которое будет выполнено позже.

Накопление значений таким образом не поддерживается, поскольку, поскольку Dstream.forEachRDD просто говорит «делать это на каждой итерации», окружающий код накопления выполняется немедленно, в результате получается пустой массив.

В зависимости от того, что происходит с данными summary после его вычисленной, там уже несколько вариантов, как реализовать это:

  • Если необходимо извлечь другим процессом данных, использовать общий поточно- состав. Очередь приоритетов отлично подходит для использования top-k.
  • Если данные будут сохранены (fs, db), вы можете просто записать их в хранилище после применения функции topSearches к dstream.
Смежные вопросы