2016-04-28 2 views
8

Я искровое приложение с несколькими точками, где я хотел бы сохранить текущее состояние. Обычно это происходит после большого шага или кэширования состояния, которое я хотел бы использовать несколько раз. Похоже, что когда я вызываю кеш на свою фреймворк второй раз, новая копия кэшируется в память. В моем приложении это приводит к проблемам с памятью при масштабировании. Несмотря на то, что в моих текущих тестах данный блок данных составляет максимум около 100 МБ, совокупный размер промежуточных результатов растет за пределы выделенной памяти у исполнителя. Ниже приведен небольшой пример, демонстрирующий это поведение.Un-persisting все dataframes в (py) spark

cache_test.py:

from pyspark import SparkContext, HiveContext 

spark_context = SparkContext(appName='cache_test') 
hive_context = HiveContext(spark_context) 

df = (hive_context.read 
     .format('com.databricks.spark.csv') 
     .load('simple_data.csv') 
    ) 
df.cache() 
df.show() 

df = df.withColumn('C1+C2', df['C1'] + df['C2']) 
df.cache() 
df.show() 

spark_context.stop() 

simple_data.csv:

1,2,3 
4,5,6 
7,8,9 

Глядя на пользовательском интерфейсе приложения, есть копия оригинального dataframe, в ADITION на один с новой колонки , Я могу удалить исходную копию, вызвав df.unpersist() перед строкой withColumn. Является ли это рекомендуемым способом для удаления промежуточного результата кэширования (т. Е. Разблокировать вызов перед каждым cache()).

Также возможно очистить все кешированные объекты. В моем приложении есть естественные точки останова, где я могу просто очистить всю память и перейти к следующему файлу. Я хотел бы сделать это, не создавая новое искровое приложение для каждого входного файла.

Спасибо заранее!

ответ

11

Спарк 2.x

Вы можете использовать Catalog.clearCache:

from pyspark.sql import SparkSession 

spark = SparkSession.builder.getOrCreate 
... 
spark.catalog.clearCache() 

Спарк 1.x

Вы можете использовать SQLContext.clearCache метод, который

Удаляет все кэшированных таблиц из кэша в памяти.

from pyspark.sql import SQLContext 
from pyspark import SparkContext 

sqlContext = SQLContext.getOrCreate(SparkContext.getOrCreate()) 
... 
sqlContext.clearCache() 
+1

Это хорошее решение сейчас, поскольку это позволяет мне, чтобы очистить весь кэш на разумные точки останова. Я включу это, но я беспокоюсь о том, когда я расширяюсь и начинаю работать с более крупными наборами данных, старые кеши начинают начинать расти из-под контроля. Если я хочу очистить старые кэши, когда я ухожу, рекомендуется создать новую переменную (или временные переменные) и явно отключить старые объекты. Что-то вроде: 'df.cache()'; 'df_new = df.withColumn ('C1 + C2', df ['C1'] + df ['C2'])'; 'df_new.cache()'; 'Df.unpersist()'. Это кажется немного громоздким, если это единственный способ ... – bjack3

+0

Обычно нет необходимости явно очищать кеш. При необходимости очищается автоматически. – zero323

+0

Я беспокоюсь, что я делаю что-то не так. В моем полном приложении мои задания в конечном итоге выйдут из строя из-за ошибок из-за памяти.Каждая отдельная копия кадра данных достаточно мала (до 100 МБ), но кэши, похоже, живут вечно; даже после записи вывода в файл и перехода к следующим шагам. Я посмотрю, смогу ли я создать небольшой рабочий пример, чтобы показать это в действии. – bjack3

1

Мы используем это довольно часто

for (id, rdd) in sc._jsc.getPersistentRDDs().items(): 
    rdd.unpersist() 
0

может индивидуально unpersist всех ДХ-х:

firstDF.unpersist() 
secondDF.unpersist() 
Смежные вопросы