2015-10-20 1 views
8

У меня есть следующий код:Спарк теряя Println() на стандартный вывод

val blueCount = sc.accumulator[Long](0) 
val output = input.map { data => 
    for (value <- data.getValues()) { 
    if (record.getEnum() == DataEnum.BLUE) { 
     blueCount += 1 
     println("Enum = BLUE : " + value.toString() 
    } 
    } 
    data 
}.persist(StorageLevel.MEMORY_ONLY_SER) 

output.saveAsTextFile("myOutput") 

Тогда blueCount не равна нулю, но у меня нет Println() выход! Я что-то пропустил? Благодаря!

ответ

3

Я был в состоянии работать вокруг, сделав UtilityFunction:

object PrintUtiltity { 
    def print(data:String) = { 
     println(data) 
    } 
} 
+5

Почему он работает? – angelcervera

+0

Поскольку Spark считает, что он вызывает функцию Utility вместо вызова функции печати. Спарк, по-видимому, не (и не мог) проверить каждую строку в своей функции полезности. – Edamame

+1

Что вы делаете, это создать экземпляр объекта в вашей программе драйверов. Я бы не стал рассчитывать на это поведение без четкой модели того, что происходит. Ожидайте, что поведение изменится непредсказуемо с любыми изменениями в вашей программе или тем, как вы вызываете объект PrintUtility. Если вы хотите собирать журналы, используйте стандартные методы для этого, не изобретайте случайные механизмы, которые вы не понимаете. Ваше объяснение, почему это работает, опасно неправильно - нет запрета делать то, что вы сделали; нет проверки кода, чтобы убедиться, что вы не обманываете: все поведение следует за дизайном системы – David

13

Это концептуальный вопрос ...

Представьте у вас есть большой кластер, состоящий из многих рабочих скажут n рабочих и те работники хранят переборку RDD или DataFrame, представьте Вы начинаете map задачу по что данные, и внутри этого map у вас есть print заявление, прежде всего:

  • Где эти данные распечатываются?
  • Какой узел имеет приоритет и какой раздел?
  • Если все узлы работают параллельно, кто будет печататься первым?
  • Как будет создана очередь печати?

Это слишком много вопросов, таким образом, конструкторы/Сопровождающие из apache-spark решил логически отказаться от любой поддержки print операторов внутри любой map-reduce операции (это включает accumulators и даже broadcast переменные).

Это также имеет смысл, потому что Spark - это язык , рассчитанный на для очень больших наборов данных. Хотя печать может быть полезна для тестирования и отладки, вы не захотите печатать каждую строку DataFrame или RDD, потому что они созданы, чтобы иметь миллионы или миллиарды строк! Итак, зачем решать эти сложные вопросы, когда вы даже не хотите печатать в первую очередь?

Чтобы доказать это, вы можете запустить этот SCALA код, например:

// Let's create a simple RDD 
val rdd = sc.parallelize(1 to 10000) 

def printStuff(x:Int):Int = { 
    println(x) 
    x + 1 
} 

// It doesn't print anything! because of a logic design limitation! 
rdd.map(printStuff) 

// But you can print the RDD by doing the following: 
rdd.take(10).foreach(println) 
+6

Я считаю Println работает просто отлично: он просто переходит к stdout/stderr на компьютере, на котором работает исполнитель искры. Поэтому, если у вас нет возможности захватить то, что находится в этих журналах, вы никогда не увидите его. Если вы используете пряжу, есть команда, чтобы распечатать все это для вас. – David

+0

Хотя аргументация действительна, Spark не выполняет какой-либо статический анализ для удаления кода. Результат просто не переходит к драйверу «STDOUT», как объясняется @David –

Смежные вопросы