Я храню time-series
данные в HBase
. RowKey состоит из user_id
и timestamp
, как это:Использует ли искру используемый порядок ключей hbase при использовании hbase в качестве источника данных
{
"userid1-1428364800" : {
"columnFamily1" : {
"val" : "1"
}
}
}
"userid1-1428364803" : {
"columnFamily1" : {
"val" : "2"
}
}
}
"userid2-1428364812" : {
"columnFamily1" : {
"val" : "abc"
}
}
}
}
Теперь мне нужно выполнить для каждого пользователя анализа. Вот инициализация hbase_rdd
(от here)
sc = SparkContext(appName="HBaseInputFormat")
conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table}
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
hbase_rdd = sc.newAPIHadoopRDD(
"org.apache.hadoop.hbase.mapreduce.TableInputFormat",
"org.apache.hadoop.hbase.io.ImmutableBytesWritable",
"org.apache.hadoop.hbase.client.Result",
keyConverter=keyConv,
valueConverter=valueConv,
conf=conf)
Природная MapReduce, как способ обработки будет:
hbase_rdd
.map(lambda row: (row[0].split('-')[0], (row[0].split('-')[1], row[1]))) # shift timestamp from key to value
.groupByKey()
.map(processUserData) # process user's data
При выполнении первой карты (сдвиг метки от ключа к значению) важно чтобы узнать, когда данные временного ряда текущего пользователя закончены, и поэтому может быть запущено преобразование groupByKey. Таким образом, нам не нужно отображать всю таблицу и хранить все временные данные. Это возможно, потому что hbase хранит строки-ключи в отсортированном порядке.
С Hadoop потокового это может быть сделано таким образом:
import sys
current_user_data = []
last_userid = None
for line in sys.stdin:
k, v = line.split('\t')
userid, timestamp = k.split('-')
if userid != last_userid and current_user_data:
print processUserData(last_userid, current_user_data)
last_userid = userid
current_user_data = [(timestamp, v)]
else:
current_user_data.append((timestamp, v))
Вопрос заключается в том: как использовать отсортированный порядок Hbase ключей в пределах Спарк?
Да, но проблема будет с пользователями, которые распределены между двумя различными разделами, не так ли? – doubts
да это правильно. Я в основном просто переводил ваш пример с потоком хаопов, но это похоже на красную селедку. Я думаю, что ваш вопрос действительно: у меня есть данные, хранящиеся в HBase с определенными свойствами (например, сгруппированные вместе) - как я могу получить эти данные в RDD, где я поддерживаю эти свойства? Или, может быть, данные еще не сгруппированы в HBase, но вы хотите, чтобы HBase выполнял группировку для вас (вместо того, чтобы просить искру сделать это)? Не то, чтобы у меня был ответ на любой из этих вопросов, но я думаю, что разъяснение может помочь вам ответить. –
также - данные становятся разделенными на разделы так же, как в Hadoop, поэтому, если вы ожидаете, что TableInputFormat уже сделает то, что вы хотите с Hadoop, тогда он будет работать с Spark. –