2016-09-19 2 views
4

У меня есть настройки Спарк 2.0 и Кассандру 3.0 на локальной машине (8 ядер, 16GB RAM) для целей тестирования и редактировать spark-defaults.conf следующим образом:Спарк: PySpark + Кассандры производительность запроса

spark.python.worker.memory 1g 
spark.executor.cores 4 
spark.executor.instances 4 
spark.sql.shuffle.partitions 4 

Далее я импортировал 1,5 миллиона строки в Кассандре:

test(
    tid int, 
    cid int, 
    pid int, 
    ev list<double>, 
    primary key (tid) 
) 

test.ev представляет собой список, содержащий числовые значения, т.е. [2240,2081,159,304,1189,1125,1779,693,2187,1738,546,496,382,1761,680]

Теперь в коде , Чтобы проверить все, что я только что создал SparkSession, соединенный с Кассандрой и сделать простой SELECT COUNT:

cassandra = spark.read.format("org.apache.spark.sql.cassandra") 
df = cassandra.load(keyspace="testks",table="test") 
df.select().count() 

На данный момент, Спарк выводит count и занимает около 28 секунд, чтобы закончить Job, распределенный в 13 TasksSpark UI, общий вход для задач это 331.6MB)

Вопросы:

  • является то, что ожидаемой перфорацией ormance? Если нет, чего мне не хватает?
  • Теория говорит, что количество разделов DataFrame определяет количество задач Spark будет распределять задание. Если я устанавливаю spark.sql.shuffle.partitions на 4, почему создается 13 задач? (Также убедились, что количество разделов, призывающих rdd.getNumPartitions() на моем DataFrame)

Update

Обычной операция Я хотел бы проверить через эти данные:

  • запросы больших набор данных, скажем, из 100 000 ~ N строк, сгруппированных по pid
  • Выбрать ev, a list<double>
  • Выполнение в среднем на каждого члена, предполагая, что к настоящему времени каждый список имеет ту же длину IE df.groupBy('pid').agg(avg(df['ev'][1]))

В @ zero323 предложил, я развернула внешнюю машину (2 Гб ОЗУ, 4 ядра, SSD) с Cassandra только для этого теста, и загрузил тот же набор данных. Результатом df.select().count() была ожидаемая большая латентность и общая плохая производительность по сравнению с моим предыдущим тестом (заняло около 70 секунд, чтобы закончить Job).

Редактировать: Я неправильно понял его предложение. @ zero323 означало, чтобы позволить Cassandra выполнить подсчет вместо искровой SQL, как описано в here

Кроме того, я хотел бы отметить, что я знаю, присущего анти-схеме установки list<double> вместо широкого ряда для этот тип данных, но мои проблемы в этот момент - это больше времени, затрачиваемого на извлечение большого набора данных, а не фактическое среднее время вычислений.

+0

Если вы хотите выполнить подсчет, то запрос внешнего источника будет более эффективным. В общем, многое зависит от того, что вы делаете. В отношении разделов 'spark.sql.shuffle.partitions' здесь не используется. Исходное количество разделов задается источником данных, и count всегда использует 1 задачу для окончательной агрегации. – zero323

+0

еще раз спасибо @ zero323. Проверьте мое обновление. Кроме того, если я правильно понимаю, вы говорите, что количество разделов задано Cassandra? – TMichel

+0

OK <Я думаю, что я был недостаточно ясен:/Мое мнение заключалось в том, чтобы выполнить запрос непосредственно против Cassandra без использования Spark SQL, если вы выполняете простые действия, такие как подсчет всех строк. Не развертывать отдельный сервер. – zero323

ответ

3

Это ожидаемая производительность? Если нет, чего мне не хватает?

Это выглядит медленно, но это не совсем неожиданно. В целом count выражается как

SELECT 1 FROM table 

с последующим суммированием боковой Спарк. Таким образом, хотя он оптимизирован, он все еще довольно неэффективен, потому что вы извлекаете N длинных целых чисел из внешнего источника, чтобы суммировать их локально.

Как объяснено в the docs Кассандра с поддержкой RDD (не Datasets) обеспечивает оптимизированный метод cassandraCount, который выполняет подсчет сборок на стороне сервера.

Теория говорит, что число разбиений DataFrame определяет количество задач Спарк будет распределять работу. Если я устанавливаю spark.sql.shuffle.partitions к (...), почему создание (...) Задачи?

Потому что spark.sql.shuffle.partitions здесь не используется. Это свойство используется для определения количества разделов для перетасовки (когда данные агрегируются некоторым набором ключей), а не для создания Dataset или глобальных агрегатов, таких как count(*) (которые всегда используют 1 раздел для окончательной агрегации).

Если вы заинтересованы в контроле количества начальных разделов, вы должны смотреть на spark.cassandra.input.split.size_in_mb, которая определяет:

Приблизительный объем данных, быть извлечена в раздел Спарк. Минимальное количество получившегося Spark, перегородка составляет 1 + 2 * SparkContext.defaultParallelism

Как вы можете видеть еще один фактор, здесь spark.default.parallelism, но это не совсем тонкая настройка так в зависимости от этого, в общем, не является оптимальным выбором.

+1

Это было действительно, действительно показательно. Спасибо. – TMichel

Смежные вопросы