2015-06-14 2 views
1

У меня есть искровая программа, которая по существу делает это:Неожиданной искры

def foo(a: RDD[...], b: RDD[...]) = { 
    val c = a.map(...) 
    c.persist(StorageLevel.MEMORY_ONLY_SER) 
    var current = b 
    for (_ <- 1 to 10) { 
    val next = some_other_rdd_ops(c, current) 
    next.persist(StorageLevel.MEMORY_ONLY) 
    current.unpersist() 
    current = next 
    } 
    current.saveAsTextFile(...) 
} 

странного поведение, что я вижу, является то, что искровые этапы, соответствующих val c = a.map(...) происходят в 10 раз. Я бы ожидал, что это произойдет только один раз из-за немедленного кэширования на следующей строке, но это не так. Когда я смотрю на вкладке «хранилище» текущей работы, очень немногие из разделов c кэшируются.

Кроме того, 10 экземпляров этого этапа сразу же отображаются как «активные». 10 копий этапа, соответствующего val next = some_other_rdd_ops(c, current), отображаются в ожидании, и они примерно чередуются с исполнением.

Я не понимаю, как получить Spark для кэширования RDD?

Редактировать: здесь есть программа, содержащая программу для воспроизведения: https://gist.github.com/jfkelley/f407c7750a086cdb059c. Он ожидает, что в качестве входных данных будет список краев графика (с весами краев). Например:

a b 1000.0 
a c 1000.0 
b c 1000.0 
d e 1000.0 
d f 1000.0 
e f 1000.0 
g h 1000.0 
h i 1000.0 
g i 1000.0 
d g 400.0 

Линии 31-42 настоящего стандарта соответствуют упрощенной версии выше. Я получаю 10 этапов, соответствующих строке 31, когда я только ожидал 1.

+0

Я думаю, что ваше ожидание является правильным. Может быть, есть что-то подозрительное в коде? Не могли бы вы привести пример, с помощью которого мы можем воспроизвести проблему? Одно из возможных объяснений заключалось бы в том, что, когда вы продолжаете вкладывать вещи в кеш, он выталкивает 'c'. Я не уверен, что это так. –

+0

Догадка Даниэля о том, что кеш выдается, действительна. Кроме того, some_other_rdd_ops - это черный ящик для нас ... так что это может сделать что-то неожиданное. –

+1

Я бы посмотрел больше на 'current.unpersist()' заявление у вас есть. Вы уверены, что c никогда не станет текущим? – marios

ответ

0

Кэширование не уменьшает сцены, оно просто не будет перекомпоновать сцену каждый раз.

В первой итерации в разделе «Размер ввода» вы можете видеть, что данные поступают из Hadoop и что он считывает данные в случайном порядке. В последующих итерациях данные поступают из памяти и больше не вставляются в случайный порядок. Кроме того, время выполнения значительно сокращается.

Новые этапы отображения создаются при каждом перетасовке, например, когда есть изменение в разделении, в вашем случае добавление ключа к RDD.

0

Проблема в том, что вызов cache является ленивым. Ничто не будет кэшироваться до тех пор, пока действие не будет инициировано и RDD не будет оценен. Все вызовы устанавливают флаг в RDD, чтобы указать, что он должен быть кэширован при оценке.

Unpersist только вступает в действие немедленно. Он очищает флаг, указывающий, что RDD следует кэшировать, а также начинает чистку данных из кеша. Поскольку у вас есть только одно действие в конце вашего приложения, это означает, что к тому времени, когда любой из RDD будет оценен, Spark не видит, что любой из них должен быть сохранен!

Я согласен, что это удивительное поведение. То, как некоторые библиотеки Spark (включая реализацию PageRank в GraphX) работают вокруг этого, заключается в явном материализации каждого RDD между вызовами cache и unpersist. Например, в вашем случае вы можете сделать следующее:

def foo(a: RDD[...], b: RDD[...]) = { 
    val c = a.map(...) 
    c.persist(StorageLevel.MEMORY_ONLY_SER) 
    var current = b 
    for (_ <- 1 to 10) { 
    val next = some_other_rdd_ops(c, current) 
    next.persist(StorageLevel.MEMORY_ONLY) 
    next.foreachPartition(x => {}) # materialize before unpersisting 
    current.unpersist() 
    current = next 
    } 
    current.saveAsTextFile(...) 
} 
Смежные вопросы