Оба номера кажутся относительно высокими, и не совсем ясно, как вы создаете DataFrame
или измеряете время, но в общем случае разница может быть объяснена низким количеством записей по сравнению с количеством разделов.
Значение по умолчанию для spark.sql.shuffle.partitions
- 200, которое в количестве задач вы получаете. При записи 50K накладные расходы на запуск задачи будут выше, чем ускорение, которое вы можете получить от параллельного выполнения. Давайте проиллюстрируем это простым примером. Первый позволяет создать пример данные:
import string
import random
random.seed(323)
def random_string():
n = random.randint(3, 6)
return (''.join(random.choice(string.ascii_uppercase) for _ in range(n)),)
df = (sc
.parallelize([random_string() for _ in range(50000)], 8).toDF(["name"])
.cache())
и измерить время, в зависимости от количества shuffle.partitions
:
sqlContext.setConf("spark.sql.shuffle.partitions", "1")
%timeit -n 10 df.groupby('name').count().collect()
## 10 loops, best of 3: 504 ms per loop
sqlContext.setConf("spark.sql.shuffle.partitions", "1")
%timeit -n 10 df.groupby('name').count().collect()
## 10 loops, best of 3: 451 ms per loop
sqlContext.setConf("spark.sql.shuffle.partitions", "100")
%timeit -n 10 df.groupby('name').count().collect()
## 10 loops, best of 3: 624 ms per loop
sqlContext.setConf("spark.sql.shuffle.partitions", "200")
%timeit -n 10 df.groupby('name').count().collect()
## 10 loops, best of 3: 778 ms per loop
sqlContext.setConf("spark.sql.shuffle.partitions", "1000")
%timeit -n 10 df.groupby('name').count().collect()
## 10 loops, best of 3: 1.75 s per loop
Хотя эти значения не сопоставимы с тем, что вы утверждаете, и эти данные были собраны в местном режиме вы можете видеть это относительно четкое изображение. То же самое относится и к РДУ:
from operator import add
%timeit -n 10 df.rdd.map(lambda x: (x['name'], 1)).reduceByKey(add, 1).collect()
## 10 loops, best of 3: 414 ms per loop
%timeit -n 10 df.rdd.map(lambda x: (x['name'], 1)).reduceByKey(add, 10).collect()
## 10 loops, best of 3: 439 ms per loop
%timeit -n 10 df.rdd.map(lambda x: (x['name'], 1)).reduceByKey(add, 100).collect()
## 10 loops, best of 3: 1.3 s per loop
%timeit -n 10 df.rdd.map(lambda x: (x['name'], 1)).reduceByKey(add, 1000).collect()
## 10 loops, best of 3: 8.41 s per loop
В правильной распределенной среде это будет выше из-за стоимости сети IO.
Для сравнения позволяет проверить, сколько времени потребуется, чтобы выполнить эту задачу локально, без искры
from collections import Counter
data = df.rdd.flatMap(lambda x: x).collect()
%timeit -n 10 Counter(data)
## 10 loops, best of 3: 9.9 ms per loop
Вы должны также взглянуть на местности данных. В зависимости от используемого вами хранилища и конфигурации это может добавить дополнительную задержку для ваших заданий даже с небольшим входом, подобным этому.
Оба номера кажутся чрезмерно высокими. Как вы запускаете этот код и измеряете время? – zero323
Я бежал от ноутбука Jupyter и занимал время работы от SparkUI. Back-end - это Mesos (построенный лучшими парнями, чем я), а мой экземпляр Spark имеет 24 ядра и 99 ГБ оперативной памяти. Я новичок во всем этом, так что все еще изучаю лучший способ времени/теста ... – RichD