У меня есть настройки Спарк 2.0 и Кассандру 3.0 на локальной машине (8 ядер, 16GB RAM) для целей тестирования и редактировать spark-defaults.conf
следующим образом:Спарк: PySpark + Кассандры производительность запроса
spark.python.worker.memory 1g
spark.executor.cores 4
spark.executor.instances 4
spark.sql.shuffle.partitions 4
Далее я импортировал 1,5 миллиона строки в Кассандре:
test(
tid int,
cid int,
pid int,
ev list<double>,
primary key (tid)
)
test.ev
представляет собой список, содержащий числовые значения, т.е. [2240,2081,159,304,1189,1125,1779,693,2187,1738,546,496,382,1761,680]
Теперь в коде , Чтобы проверить все, что я только что создал SparkSession
, соединенный с Кассандрой и сделать простой SELECT COUNT:
cassandra = spark.read.format("org.apache.spark.sql.cassandra")
df = cassandra.load(keyspace="testks",table="test")
df.select().count()
На данный момент, Спарк выводит count
и занимает около 28 секунд, чтобы закончить Job
, распределенный в 13 Tasks
(в Spark UI
, общий вход для задач это 331.6MB)
Вопросы:
- является то, что ожидаемой перфорацией ormance? Если нет, чего мне не хватает?
- Теория говорит, что количество разделов DataFrame определяет количество задач Spark будет распределять задание. Если я устанавливаю
spark.sql.shuffle.partitions
на 4, почему создается 13 задач? (Также убедились, что количество разделов, призывающихrdd.getNumPartitions()
на моем DataFrame)
Update
Обычной операция Я хотел бы проверить через эти данные:
- запросы больших набор данных, скажем, из 100 000 ~ N строк, сгруппированных по
pid
- Выбрать
ev
, alist<double>
- Выполнение в среднем на каждого члена, предполагая, что к настоящему времени каждый список имеет ту же длину IE
df.groupBy('pid').agg(avg(df['ev'][1]))
В
@ zero323 предложил, я развернула внешнюю машину (2 Гб ОЗУ, 4 ядра, SSD) с Cassandra только для этого теста, и загрузил тот же набор данных. Результатом
df.select().count()
была ожидаемая большая латентность и общая плохая производительность по сравнению с моим предыдущим тестом (заняло около 70 секунд, чтобы закончить
Job
).
Редактировать: Я неправильно понял его предложение. @ zero323 означало, чтобы позволить Cassandra выполнить подсчет вместо искровой SQL, как описано в here
Кроме того, я хотел бы отметить, что я знаю, присущего анти-схеме установки list<double>
вместо широкого ряда для этот тип данных, но мои проблемы в этот момент - это больше времени, затрачиваемого на извлечение большого набора данных, а не фактическое среднее время вычислений.
Если вы хотите выполнить подсчет, то запрос внешнего источника будет более эффективным. В общем, многое зависит от того, что вы делаете. В отношении разделов 'spark.sql.shuffle.partitions' здесь не используется. Исходное количество разделов задается источником данных, и count всегда использует 1 задачу для окончательной агрегации. – zero323
еще раз спасибо @ zero323. Проверьте мое обновление. Кроме того, если я правильно понимаю, вы говорите, что количество разделов задано Cassandra? – TMichel
OK <Я думаю, что я был недостаточно ясен:/Мое мнение заключалось в том, чтобы выполнить запрос непосредственно против Cassandra без использования Spark SQL, если вы выполняете простые действия, такие как подсчет всех строк. Не развертывать отдельный сервер. – zero323