2016-06-27 2 views
3

У меня есть очень простой искровой DataFrame, и при запуске DataFrame GroupBy, производительность страшна - около 8х медленнее, чем (в моей голове), что эквивалентно РДД reduceByKey ...Почему мой Spark DataFrame намного медленнее, чем RDD?

Мой кэшируются DF всего две колонки, клиента и имя только 50k строк:

== Physical Plan == 
InMemoryColumnarTableScan [customer#2454,name#2456], InMemoryRelation [customer#2454,name#2456], true, 10000, StorageLevel(true, true, false, true, 1), Scan ParquetRelation[customer#2454,name#2456] InputPaths: hdfs://nameservice1/tmp/v2_selected_parquet/test_parquet2, None 

Когда я запускаю следующие два фрагмента, я бы ожидать схожую производительность, а не рдд версии для запуска в 10s и версии DF в 85S ...

rawtempDF2.rdd.map(lambda x: (x['name'], 1)).reduceByKey(lambda x,y: x+y).collect() 

rawtempDF2.groupby('name').count().collect() 

Я не хватает s что-то действительно фундаментальное здесь? FWIW, версия RDD работает 54 этапа, а версия DF - 227:/

Редактировать: Я использую Spark 1.6.1 и Python 3.4.2. Edit2: Кроме того, исходный паркет был разделен на клиента/день/имя - в настоящее время 27 клиентов, 1 день, c. 45 имен.

+0

Оба номера кажутся чрезмерно высокими. Как вы запускаете этот код и измеряете время? – zero323

+0

Я бежал от ноутбука Jupyter и занимал время работы от SparkUI. Back-end - это Mesos (построенный лучшими парнями, чем я), а мой экземпляр Spark имеет 24 ядра и 99 ГБ оперативной памяти. Я новичок во всем этом, так что все еще изучаю лучший способ времени/теста ... – RichD

ответ

5

Оба номера кажутся относительно высокими, и не совсем ясно, как вы создаете DataFrame или измеряете время, но в общем случае разница может быть объяснена низким количеством записей по сравнению с количеством разделов.

Значение по умолчанию для spark.sql.shuffle.partitions - 200, которое в количестве задач вы получаете. При записи 50K накладные расходы на запуск задачи будут выше, чем ускорение, которое вы можете получить от параллельного выполнения. Давайте проиллюстрируем это простым примером. Первый позволяет создать пример данные:

import string 
import random 

random.seed(323) 

def random_string(): 
    n = random.randint(3, 6) 
    return (''.join(random.choice(string.ascii_uppercase) for _ in range(n)),) 

df = (sc 
    .parallelize([random_string() for _ in range(50000)], 8).toDF(["name"]) 
    .cache()) 

и измерить время, в зависимости от количества shuffle.partitions:

sqlContext.setConf("spark.sql.shuffle.partitions", "1") 
%timeit -n 10 df.groupby('name').count().collect() 
## 10 loops, best of 3: 504 ms per loop 

sqlContext.setConf("spark.sql.shuffle.partitions", "1") 
%timeit -n 10 df.groupby('name').count().collect() 
## 10 loops, best of 3: 451 ms per loop 

sqlContext.setConf("spark.sql.shuffle.partitions", "100") 
%timeit -n 10 df.groupby('name').count().collect() 
## 10 loops, best of 3: 624 ms per loop 

sqlContext.setConf("spark.sql.shuffle.partitions", "200") 
%timeit -n 10 df.groupby('name').count().collect() 
## 10 loops, best of 3: 778 ms per loop 

sqlContext.setConf("spark.sql.shuffle.partitions", "1000") 
%timeit -n 10 df.groupby('name').count().collect() 
## 10 loops, best of 3: 1.75 s per loop 

Хотя эти значения не сопоставимы с тем, что вы утверждаете, и эти данные были собраны в местном режиме вы можете видеть это относительно четкое изображение. То же самое относится и к РДУ:

from operator import add 

%timeit -n 10 df.rdd.map(lambda x: (x['name'], 1)).reduceByKey(add, 1).collect() 
## 10 loops, best of 3: 414 ms per loop 

%timeit -n 10 df.rdd.map(lambda x: (x['name'], 1)).reduceByKey(add, 10).collect() 
## 10 loops, best of 3: 439 ms per loop 

%timeit -n 10 df.rdd.map(lambda x: (x['name'], 1)).reduceByKey(add, 100).collect() 
## 10 loops, best of 3: 1.3 s per loop 

%timeit -n 10 df.rdd.map(lambda x: (x['name'], 1)).reduceByKey(add, 1000).collect() 
## 10 loops, best of 3: 8.41 s per loop 

В правильной распределенной среде это будет выше из-за стоимости сети IO.

Для сравнения позволяет проверить, сколько времени потребуется, чтобы выполнить эту задачу локально, без искры

from collections import Counter 

data = df.rdd.flatMap(lambda x: x).collect() 

%timeit -n 10 Counter(data) 
## 10 loops, best of 3: 9.9 ms per loop 

Вы должны также взглянуть на местности данных. В зависимости от используемого вами хранилища и конфигурации это может добавить дополнительную задержку для ваших заданий даже с небольшим входом, подобным этому.

+0

Отличная демонстрация того, где все происходит неправильно. 100 пусков в случайном порядке выполняли 31 сек на петлю (с искровым запуском на 6-узловой кластере мезос) - это определенно не так, но я буду делать больше тестов с использованием более крупных наборов данных/разных разделов, прежде чем я буду жаловаться на инфраструктурных парней;) – RichD

+0

Для справки: Время переходов/avg: 1/1.53s, 10/2.6s, 100/31.3s, 200/65s – RichD

+0

Ну, я вполне уверен, что это все, что вам нужно, чтобы объяснить пропорции, но абсолютные значения кажутся способными выкл. – zero323

Смежные вопросы