2016-04-01 3 views
0

У меня CASSANDRA таблица следующей структурой:Спарк и Cassandra, диапазон запросов на кластеризации ключ

CREATE TABLE таблица ( ключ INT, время временной метки, мера с плавающей точкой, первичный ключ (ключ, время) );

Мне нужно создать задание Spark, который будет считывать данные из предыдущей таблицы, в пределах определенной начальной и конечной метки времени некоторой обработка, и встраивание результатов обратно к Кассандре.

Таким образом, мой соединитель spark-cassandra должен будет выполнить запрос диапазона для кластеризации столбца таблицы cassandra.

Существуют ли какие-либо различия в производительности, если я:

sc.cassandraTable(keyspace,table). 
as(caseClassObject). 
filter(a => a.time.before(startTime) && a.time.after(endTime)..... 

так, что я делаю, загрузив все данные в Спарк и применение фильтрации

ИЛИ если я делаю это:

sc.cassandraTable(keyspace, table). 
where(s"time>$startTime and time<$endTime)...... 

, который фильтрует все данные в Cassandra, а затем загружает меньшие подмножества в Spark.

Селективность запроса диапазона будет около 1% Невозможно включить ключ раздела в запрос.

Какое из этих двух решений является предпочтительным?

ответ

2
sc.cassandraTable(keyspace, table).where(s"time>$startTime and time<$endTime) 

Будет намного быстрее. Вы в основном делаете процент (если вы только вытаскиваете 5% данных 5% от общей работы) полного захвата в первой команде для получения тех же данных.

В первом случае вы

  1. Чтение всех данных от Кассандры.
  2. Сериализация каждого объекта, а затем перемещение его на Spark.
  3. Затем, наконец, фильтровать все.

Во втором случае вы

  1. Чтение только данные, которые вы на самом деле хотите от C *
  2. Сериализация только это крошечное подмножество
  3. Там нет ни одного шага 3

В качестве дополнительного комментария вы также можете поместить свой тип класса case прямо в вызов

sc.cassandraTable[CaseClassObject](keyspace, table) 
+0

Я думаю, 1/20 будет преувеличением. Главным образом, потому что время - это ключ кластеризации, поэтому cassandra всегда будет выполнять полное сканирование таблицы. Но это действительно сэкономит время, затрачиваемое на перенос данных из кассандры в искру, что может быть большим количеством данных. –

+0

Я бы определенно ожидал, что это будет экономить пропорционально количеству прочитанных данных. 1) Количество sstables, которое требуется прочитать, будет намного меньше, поскольку сканирование диапазона не потребует считывания всех sstables, пока не будет выполняться сканирование диапазона. 2) Количество объектов, которые должны пройти через C * Jvm wil, будет намного меньше, опять же это значительная сумма меньше объектов, которые будут выделены и десериализованы. Поэтому требуется меньше сбора мусора и меньше размещения объектов. 3.) Объем трафика между процессами C * и процессами Spark Executor минимизирован. Это не нулевая система копирования. – RussS

+0

Благодарим вас за ответ. Я ожидал такого поведения, но хотел быть уверенным. Мое единственное беспокойство заключалось в том, что cassandra придется выполнять полное сканирование таблицы (потому что я не указываю никаких условий для ключа раздела), но в любом случае второе решение является лучшим способом. –