В настоящее время я работаю с данными ДНК-последовательности, и я столкнулся с небольшим шагом производительности.Spark: более эффективное агрегирование для объединения строк из разных строк
У меня есть два словаря поиска/хеширования (как RDD) с ДНК-словами (короткие последовательности) в виде ключей и список индексных позиций в качестве значения. Один - для более короткой последовательности запросов, а другой для последовательности базы данных. Создание таблиц довольно быстро даже с очень и очень большими последовательностями.
Для следующего шага мне нужно соединить их и найти «хиты» (пары позиций индекса для каждого общего слова).
Я сначала присоединяюсь к поисковым словарям, что достаточно быстро. Тем не менее, мне теперь нужны пары, поэтому я должен дважды выполнить flatmap, один раз, чтобы развернуть список индексов из запроса и второй раз, чтобы развернуть список индексов из базы данных. Это не идеально, но я не вижу другого способа сделать это. По крайней мере, он работает нормально.
Выход на этом этапе: (query_index, (word_length, diagonal_offset))
, где диагональное смещение - это database_sequence_index минус индекс последовательности запросов.
Однако теперь мне нужно найти пары индексов с тем же диагональным смещением (db_index - query_index) и достаточно близко друг к другу и присоединиться к ним (поэтому я увеличиваю длину слова), но только как пары (т.е. один раз Я присоединяюсь к одному индексу с другим, я не хочу, чтобы с ним сливалось другое).
Я делаю это с помощью операции aggregateByKey с использованием специального объекта с именем Seed().
PARALELLISM = 16 # I have 4 cores with hyperthreading
def generateHsps(query_lookup_table_rdd, database_lookup_table_rdd):
global broadcastSequences
def mergeValueOp(seedlist, (query_index, seed_length)):
return seedlist.addSeed((query_index, seed_length))
def mergeSeedListsOp(seedlist1, seedlist2):
return seedlist1.mergeSeedListIntoSelf(seedlist2)
hits_rdd = (query_lookup_table_rdd.join(database_lookup_table_rdd)
.flatMap(lambda (word, (query_indices, db_indices)): [(query_index, db_indices) for query_index in query_indices], preservesPartitioning=True)
.flatMap(lambda (query_index, db_indices): [(db_index - query_index, (query_index, WORD_SIZE)) for db_index in db_indices], preservesPartitioning=True)
.aggregateByKey(SeedList(), mergeValueOp, mergeSeedListsOp, PARALLELISM)
.map(lambda (diagonal, seedlist): (diagonal, seedlist.mergedSeedList))
.flatMap(lambda (diagonal, seedlist): [(query_index, seed_length, diagonal) for query_index, seed_length in seedlist])
)
return hits_rdd
Seed():
class SeedList():
def __init__(self):
self.unmergedSeedList = []
self.mergedSeedList = []
#Try to find a more efficient way to do this
def addSeed(self, (query_index1, seed_length1)):
for i in range(0, len(self.unmergedSeedList)):
(query_index2, seed_length2) = self.unmergedSeedList[i]
#print "Checking ({0}, {1})".format(query_index2, seed_length2)
if min(abs(query_index2 + seed_length2 - query_index1), abs(query_index1 + seed_length1 - query_index2)) <= WINDOW_SIZE:
self.mergedSeedList.append((min(query_index1, query_index2), max(query_index1+seed_length1, query_index2+seed_length2)-min(query_index1, query_index2)))
self.unmergedSeedList.pop(i)
return self
self.unmergedSeedList.append((query_index1, seed_length1))
return self
def mergeSeedListIntoSelf(self, seedlist2):
print "merging seed"
for (query_index2, seed_length2) in seedlist2.unmergedSeedList:
wasmerged = False
for i in range(0, len(self.unmergedSeedList)):
(query_index1, seed_length1) = self.unmergedSeedList[i]
if min(abs(query_index2 + seed_length2 - query_index1), abs(query_index1 + seed_length1 - query_index2)) <= WINDOW_SIZE:
self.mergedSeedList.append((min(query_index1, query_index2), max(query_index1+seed_length1, query_index2+seed_length2)-min(query_index1, query_index2)))
self.unmergedSeedList.pop(i)
wasmerged = True
break
if not wasmerged:
self.unmergedSeedList.append((query_index2, seed_length2))
return self
Это где производительность действительно расщепляет для четных последовательностей умеренной длины.
Есть ли лучший способ сделать это объединение? Мое чувство кишки говорит «да», но я не могу придумать это.
Я знаю, что это очень длинный и технический вопрос, и я был бы очень признателен за понимание, даже если нет простого решения.
Edit: Вот как я делаю таблицы поиска:
def createLookupTable(sequence_rdd, sequence_name, word_length):
global broadcastSequences
blank_list = []
def addItemToList(lst, val):
lst.append(val)
return lst
def mergeLists(lst1, lst2):
#print "Merging"
return lst1+lst2
return (sequence_rdd
.flatMap(lambda seq_len: range(0, seq_len - word_length + 1))
.repartition(PARALLELISM)
#.partitionBy(PARALLELISM)
.map(lambda index: (str(broadcastSequences.value[sequence_name][index:index + word_length]), index), preservesPartitioning=True)
.aggregateByKey(blank_list, addItemToList, mergeLists, PARALLELISM))
#.map(lambda (word, indices): (word, sorted(indices))))
А вот функция, которая выполняет всю операцию:
def run(query_seq, database_sequence, translate_query=False):
global broadcastSequences
scoring_matrix = 'nucleotide' if isinstance(query_seq.alphabet, DNAAlphabet) else 'blosum62'
sequences = {'query': query_seq,
'database': database_sequence}
broadcastSequences = sc.broadcast(sequences)
query_rdd = sc.parallelize([len(query_seq)])
query_rdd.cache()
database_rdd = sc.parallelize([len(database_sequence)])
database_rdd.cache()
query_lookup_table_rdd = createLookupTable(query_rdd, 'query', WORD_SIZE)
query_lookup_table_rdd.cache()
database_lookup_table_rdd = createLookupTable(database_rdd, 'database', WORD_SIZE)
seeds_rdd = generateHsps(query_lookup_table_rdd, database_lookup_table_rdd)
return seeds_rdd
Edit 2: Я подредактированны вещи немного и слегка улучшена производительность за счет замены:
.flatMap(lambda (word, (query_indices, db_indices)): [(query_index, db_indices) for query_index in query_indices], preservesPartitioning=True)
.flatMap(lambda (query_index, db_indices): [(db_index - query_index, (query_index, WORD_SIZE)) for db_index in db_indices], preservesPartitioning=True)
в hits_rdd с:
.flatMap(lambda (word, (query_indices, db_indices)): itertools.product(query_indices, db_indices))
.map(lambda (query_index, db_index): (db_index - query_index, (query_index, WORD_SIZE)))
По крайней мере, сейчас я не сжигаю память с промежуточными структурами данных.
является WINDOW_SIZE разным для каждого SeedList? – Back2Basics
@ Back2Basics WINDOW_SIZE одинаково для каждого SeedList. –
Хиты_rdd не имеют смысла. Можем ли мы взглянуть на образцы query_lookup_table_dd и database_lookup_table_dd, а также ваши реализации flatMap и aggregateByKey? – Back2Basics