2015-03-11 2 views
4

Я пытаюсь оптимизировать свою искру, избегая перетасовки как можно больше.Spark Cassandra Connector keyBy и перетасовка

Я использую cassandraTable для создания RDD.

имена Столбца семьи являются динамическими, таким образом, она определяется следующим образом:

CREATE TABLE "Profile" (
    key text, 
    column1 text, 
    value blob, 
    PRIMARY KEY (key, column1) 
) WITH COMPACT STORAGE AND 
    bloom_filter_fp_chance=0.010000 AND 
    caching='ALL' AND 
    ... 

результатов этого определения элементов CassandraRow RDD в следующем формате:

CassandraRow <key, column1, value> 
  • ключа - RowKey
  • column1 - значение column1 - это имя динамического столбца
  • значение - значение динамического столбца

Так что, если у меня есть RK = «Профиль1», с колоннами имя = «Джордж» и возраст = '34' , в результате РДД будет:

CassandraRow<key=profile1, column1=name, value=George> 
CassandraRow<key=profile1, column1=age, value=34> 

Тогда мне нужно сгруппировать элементы, которые разделяют один и тот же ключ вместе, чтобы получить PairRdd:

PairRdd<String, Iterable<CassandraRow>> 

Важно сказать, что все элементы мне нужно сгруппировать в том же узле Cassandra (один и тот же ключ строки), поэтому я ожидаю, что соединитель сохранит локальный y данных.

Проблема в том, что использование groupBy или groupByKey вызывает перетасовку. Я скорее группировать их на месте, потому что все данные находятся на том же узле:

JavaPairRDD<String, Iterable<CassandraRow>> rdd = javaFunctions(context) 
     .cassandraTable(ks, "Profile") 
     .groupBy(new Function<ColumnFamilyModel, String>() { 
      @Override 
      public String call(ColumnFamilyModel arg0) throws Exception { 
       return arg0.getKey(); 
      } 
     }) 

Мои вопросы:

  1. ли с помощью keyBy на РДУ вызовет перетасовки, или он будет хранить данные локально ?
  2. Есть ли способ сгруппировать элементы по клавишам без перетасовки? Я читал о mapPartitions, но не совсем понял его использование.

Спасибо,

Шай

ответ

5

Я думаю, что вы ищете spanByKey, Кассандра разъем конкретной операцию, которая использует преимущество упорядочения предоставленного Кассандрой для группировки элементов, не подвергаясь в случайном порядке сцена.

В вашем случае, это должно выглядеть следующим образом:

sc.cassandraTable("keyspace", "Profile") 
    .keyBy(row => (row.getString("key"))) 
    .spanByKey 

Подробнее в документации:
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/3_selection.md#grouping-rows-by-partition-key

+0

Благодаря maasg! Я не знаю, как я это пропустил ... – Shai

Смежные вопросы