2015-04-07 1 views
4

Я храню time-series данные в HBase. RowKey состоит из user_id и timestamp, как это:Использует ли искру используемый порядок ключей hbase при использовании hbase в качестве источника данных

{ 
    "userid1-1428364800" : { 
     "columnFamily1" : { 
      "val" : "1" 
      } 
     } 
    } 
    "userid1-1428364803" : { 
     "columnFamily1" : { 
      "val" : "2" 
      } 
     } 
    } 

    "userid2-1428364812" : { 
     "columnFamily1" : { 
      "val" : "abc" 
      } 
     } 
    } 

} 

Теперь мне нужно выполнить для каждого пользователя анализа. Вот инициализация hbase_rdd (от here)

sc = SparkContext(appName="HBaseInputFormat") 

conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table} 
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter" 
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter" 

hbase_rdd = sc.newAPIHadoopRDD(
     "org.apache.hadoop.hbase.mapreduce.TableInputFormat", 
     "org.apache.hadoop.hbase.io.ImmutableBytesWritable", 
     "org.apache.hadoop.hbase.client.Result", 
     keyConverter=keyConv, 
     valueConverter=valueConv, 
     conf=conf) 

Природная MapReduce, как способ обработки будет:

hbase_rdd 
    .map(lambda row: (row[0].split('-')[0], (row[0].split('-')[1], row[1]))) # shift timestamp from key to value 
    .groupByKey() 
    .map(processUserData) # process user's data 

При выполнении первой карты (сдвиг метки от ключа к значению) важно чтобы узнать, когда данные временного ряда текущего пользователя закончены, и поэтому может быть запущено преобразование groupByKey. Таким образом, нам не нужно отображать всю таблицу и хранить все временные данные. Это возможно, потому что hbase хранит строки-ключи в отсортированном порядке.

С Hadoop потокового это может быть сделано таким образом:

import sys 

current_user_data = [] 
last_userid = None 
for line in sys.stdin: 
    k, v = line.split('\t') 
    userid, timestamp = k.split('-') 
    if userid != last_userid and current_user_data: 
     print processUserData(last_userid, current_user_data) 
     last_userid = userid 
     current_user_data = [(timestamp, v)] 
    else: 
     current_user_data.append((timestamp, v)) 

Вопрос заключается в том: как использовать отсортированный порядок Hbase ключей в пределах Спарк?

ответ

2

Я не очень хорошо разбираюсь в гарантиях, которые вы получаете с помощью того, как вы извлекаете данные из HBase, но если я правильно понимаю, я могу ответить только простой старой игрой.

У вас есть RDD[X]. Насколько известно Spark, X s в этом RDD полностью неупорядочены. Но у вас есть некоторые внешние знания, и вы можете гарантировать, что данные фактически сгруппированы по некоторому полю X (и, возможно, даже отсортированы по другому полю).

В этом случае вы можете использовать mapPartitions, чтобы сделать практически то же самое, что и с потоком hadoop. Это позволяет вам перебирать все записи в одном разделе, чтобы вы могли искать блоки записей с одним и тем же ключом.

val myRDD: RDD[X] = ... 
val groupedData: RDD[Seq[X]] = myRdd.mapPartitions { itr => 
    var currentUserData = new scala.collection.mutable.ArrayBuffer[X]() 
    var currentUser: X = null 
    //itr is an iterator over *all* the records in one partition 
    itr.flatMap { x => 
    if (currentUser != null && x.userId == currentUser.userId) { 
     // same user as before -- add the data to our list 
     currentUserData += x 
     None 
    } else { 
     // its a new user -- return all the data for the old user, and make 
     // another buffer for the new user 
     val userDataGrouped = currentUserData 
     currentUserData = new scala.collection.mutable.ArrayBuffer[X]() 
     currentUserData += x 
     currentUser = x 
     Some(userDataGrouped) 
    } 
    } 
} 
// now groupedRDD has all the data for one user grouped together, and we didn't 
// need to do an expensive shuffle. Also, the above transformation is lazy, so 
// we don't necessarily even store all that data in memory -- we could still 
// do more filtering on the fly, eg: 
val usersWithLotsOfData = groupedRDD.filter{ userData => userData.size > 10 } 

Я понимаю, что вы хотите использовать Python - извините я полагаю, что я, скорее всего, чтобы получить пример правильно, если я пишу в Scala. и я думаю, что аннотации типа делают смысл более понятным, но это, вероятно, смещение Scala ... :). В любом случае, надеюсь, вы сможете понять, что происходит и переводить. (Не беспокойтесь слишком много о flatMap & Some & None, вероятно, неважно, если вы понимаете идею ...)

+1

Да, но проблема будет с пользователями, которые распределены между двумя различными разделами, не так ли? – doubts

+0

да это правильно. Я в основном просто переводил ваш пример с потоком хаопов, но это похоже на красную селедку. Я думаю, что ваш вопрос действительно: у меня есть данные, хранящиеся в HBase с определенными свойствами (например, сгруппированные вместе) - как я могу получить эти данные в RDD, где я поддерживаю эти свойства? Или, может быть, данные еще не сгруппированы в HBase, но вы хотите, чтобы HBase выполнял группировку для вас (вместо того, чтобы просить искру сделать это)? Не то, чтобы у меня был ответ на любой из этих вопросов, но я думаю, что разъяснение может помочь вам ответить. –

+0

также - данные становятся разделенными на разделы так же, как в Hadoop, поэтому, если вы ожидаете, что TableInputFormat уже сделает то, что вы хотите с Hadoop, тогда он будет работать с Spark. –

Смежные вопросы