2015-12-19 3 views
8

В настоящее время я работаю с данными ДНК-последовательности, и я столкнулся с небольшим шагом производительности.Spark: более эффективное агрегирование для объединения строк из разных строк

У меня есть два словаря поиска/хеширования (как RDD) с ДНК-словами (короткие последовательности) в виде ключей и список индексных позиций в качестве значения. Один - для более короткой последовательности запросов, а другой для последовательности базы данных. Создание таблиц довольно быстро даже с очень и очень большими последовательностями.

Для следующего шага мне нужно соединить их и найти «хиты» (пары позиций индекса для каждого общего слова).

Я сначала присоединяюсь к поисковым словарям, что достаточно быстро. Тем не менее, мне теперь нужны пары, поэтому я должен дважды выполнить flatmap, один раз, чтобы развернуть список индексов из запроса и второй раз, чтобы развернуть список индексов из базы данных. Это не идеально, но я не вижу другого способа сделать это. По крайней мере, он работает нормально.

Выход на этом этапе: (query_index, (word_length, diagonal_offset)), где диагональное смещение - это database_sequence_index минус индекс последовательности запросов.

Однако теперь мне нужно найти пары индексов с тем же диагональным смещением (db_index - query_index) и достаточно близко друг к другу и присоединиться к ним (поэтому я увеличиваю длину слова), но только как пары (т.е. один раз Я присоединяюсь к одному индексу с другим, я не хочу, чтобы с ним сливалось другое).

Я делаю это с помощью операции aggregateByKey с использованием специального объекта с именем Seed().

PARALELLISM = 16 # I have 4 cores with hyperthreading 
def generateHsps(query_lookup_table_rdd, database_lookup_table_rdd): 
    global broadcastSequences 

    def mergeValueOp(seedlist, (query_index, seed_length)): 
     return seedlist.addSeed((query_index, seed_length)) 

    def mergeSeedListsOp(seedlist1, seedlist2): 
     return seedlist1.mergeSeedListIntoSelf(seedlist2) 

    hits_rdd = (query_lookup_table_rdd.join(database_lookup_table_rdd) 
       .flatMap(lambda (word, (query_indices, db_indices)): [(query_index, db_indices) for query_index in query_indices], preservesPartitioning=True) 
       .flatMap(lambda (query_index, db_indices): [(db_index - query_index, (query_index, WORD_SIZE)) for db_index in db_indices], preservesPartitioning=True) 
       .aggregateByKey(SeedList(), mergeValueOp, mergeSeedListsOp, PARALLELISM) 
       .map(lambda (diagonal, seedlist): (diagonal, seedlist.mergedSeedList)) 
       .flatMap(lambda (diagonal, seedlist): [(query_index, seed_length, diagonal) for query_index, seed_length in seedlist]) 
       ) 

    return hits_rdd 

Seed():

class SeedList(): 
    def __init__(self): 
     self.unmergedSeedList = [] 
     self.mergedSeedList = [] 


    #Try to find a more efficient way to do this 
    def addSeed(self, (query_index1, seed_length1)): 
     for i in range(0, len(self.unmergedSeedList)): 
      (query_index2, seed_length2) = self.unmergedSeedList[i] 
      #print "Checking ({0}, {1})".format(query_index2, seed_length2) 
      if min(abs(query_index2 + seed_length2 - query_index1), abs(query_index1 + seed_length1 - query_index2)) <= WINDOW_SIZE: 
       self.mergedSeedList.append((min(query_index1, query_index2), max(query_index1+seed_length1, query_index2+seed_length2)-min(query_index1, query_index2))) 
       self.unmergedSeedList.pop(i) 
       return self 
     self.unmergedSeedList.append((query_index1, seed_length1)) 
     return self 

    def mergeSeedListIntoSelf(self, seedlist2): 
     print "merging seed" 
     for (query_index2, seed_length2) in seedlist2.unmergedSeedList: 
      wasmerged = False 
      for i in range(0, len(self.unmergedSeedList)): 
       (query_index1, seed_length1) = self.unmergedSeedList[i] 
       if min(abs(query_index2 + seed_length2 - query_index1), abs(query_index1 + seed_length1 - query_index2)) <= WINDOW_SIZE: 
        self.mergedSeedList.append((min(query_index1, query_index2), max(query_index1+seed_length1, query_index2+seed_length2)-min(query_index1, query_index2))) 
        self.unmergedSeedList.pop(i) 
        wasmerged = True 
        break 
      if not wasmerged: 
       self.unmergedSeedList.append((query_index2, seed_length2)) 
     return self 

Это где производительность действительно расщепляет для четных последовательностей умеренной длины.

Есть ли лучший способ сделать это объединение? Мое чувство кишки говорит «да», но я не могу придумать это.

Я знаю, что это очень длинный и технический вопрос, и я был бы очень признателен за понимание, даже если нет простого решения.

Edit: Вот как я делаю таблицы поиска:

def createLookupTable(sequence_rdd, sequence_name, word_length): 
    global broadcastSequences 
    blank_list = [] 

    def addItemToList(lst, val): 
     lst.append(val) 
     return lst 

    def mergeLists(lst1, lst2): 
     #print "Merging" 
     return lst1+lst2 

    return (sequence_rdd 
      .flatMap(lambda seq_len: range(0, seq_len - word_length + 1)) 
      .repartition(PARALLELISM) 
      #.partitionBy(PARALLELISM) 
      .map(lambda index: (str(broadcastSequences.value[sequence_name][index:index + word_length]), index), preservesPartitioning=True) 
      .aggregateByKey(blank_list, addItemToList, mergeLists, PARALLELISM)) 
      #.map(lambda (word, indices): (word, sorted(indices)))) 

А вот функция, которая выполняет всю операцию:

def run(query_seq, database_sequence, translate_query=False): 
    global broadcastSequences 
    scoring_matrix = 'nucleotide' if isinstance(query_seq.alphabet, DNAAlphabet) else 'blosum62' 
    sequences = {'query': query_seq, 
       'database': database_sequence} 

    broadcastSequences = sc.broadcast(sequences) 
    query_rdd = sc.parallelize([len(query_seq)]) 
    query_rdd.cache() 
    database_rdd = sc.parallelize([len(database_sequence)]) 
    database_rdd.cache() 
    query_lookup_table_rdd = createLookupTable(query_rdd, 'query', WORD_SIZE) 
    query_lookup_table_rdd.cache() 
    database_lookup_table_rdd = createLookupTable(database_rdd, 'database', WORD_SIZE) 
    seeds_rdd = generateHsps(query_lookup_table_rdd, database_lookup_table_rdd) 
    return seeds_rdd 

Edit 2: Я подредактированны вещи немного и слегка улучшена производительность за счет замены:

   .flatMap(lambda (word, (query_indices, db_indices)): [(query_index, db_indices) for query_index in query_indices], preservesPartitioning=True) 
       .flatMap(lambda (query_index, db_indices): [(db_index - query_index, (query_index, WORD_SIZE)) for db_index in db_indices], preservesPartitioning=True) 

в hits_rdd с:

.flatMap(lambda (word, (query_indices, db_indices)): itertools.product(query_indices, db_indices)) 
       .map(lambda (query_index, db_index): (db_index - query_index, (query_index, WORD_SIZE))) 

По крайней мере, сейчас я не сжигаю память с промежуточными структурами данных.

+0

является WINDOW_SIZE разным для каждого SeedList? – Back2Basics

+0

@ Back2Basics WINDOW_SIZE одинаково для каждого SeedList. –

+0

Хиты_rdd не имеют смысла. Можем ли мы взглянуть на образцы query_lookup_table_dd и database_lookup_table_dd, а также ваши реализации flatMap и aggregateByKey? – Back2Basics

ответ

1

Давайте забудем о технических подробностях того, что вы делаете, и подумайте «функционально» о предпринятых шагах, забыв о деталях реализации.Функциональное мышление, подобное этому, является важной частью параллельного анализа данных; в идеале, если мы можем сломать проблему так, мы можем более четко рассуждать о шагах, и в итоге получим более четкие и зачастую более кратким. Если подумать о модели табличных данных, я бы рассмотрел вашу проблему следующим образом:

  1. Присоединяйтесь к двум вашим наборам данных в столбце последовательности.
  2. Создайте новый столбец delta, содержащий разницу между индексами.
  3. Сортировать по (или) индексу, чтобы убедиться, что подпоследовательности находятся в правильном порядке.
  4. Группа delta и объедините строки в столбце последовательности, чтобы получить полные совпадения между вашими наборами данных.

Для первых 3 шагов, я думаю, имеет смысл использовать DataFrames, так как эта модель данных имеет смысл в моей голове обработки вида, которую мы делаем. (На самом деле я мог бы использовать DataFrames для шага 4, кроме того, pyspark в настоящее время не поддерживает настраиваемые агрегированные функции для DataFrames, хотя Scala делает).

Для четвертого шага (если я правильно понимаю, о чем вы действительно спрашиваете в своем вопросе), немного сложно подумать о том, как сделать это функционально, однако я считаю, что элегантное и эффективное решение используйте сокращение (также известное правую складку); этот шаблон может быть применен к любой проблеме, которую вы можете выразить в терминах итеративного применения ассоциативной двоичной функции, то есть функции, где «группировка» любых трех аргументов не имеет значения (хотя порядок, безусловно, может иметь значение), символически, это функция x,y -> f(x,y) где f(x, f(y, z)) = f(f(x, y), z). Конкатенация String (или, вообще говоря, список) является такой функцией.

Вот пример того, как это может выглядеть в pyspark; надеюсь, вы можете приспособить это к деталям своих данных:

#setup some sample data 
query = [['abcd', 30] ,['adab', 34] ,['dbab',38]] 
reference = [['dbab', 20], ['ccdd', 24], ['abcd', 50], ['adab',54], ['dbab',58], ['dbab', 62]] 

#create data frames 
query_df = sqlContext.createDataFrame(query, schema = ['sequence1', 'index1']) 
reference_df = sqlContext.createDataFrame(reference, schema = ['sequence2', 'index2']) 

#step 1: join 
matches = query_df.join(reference_df, query_df.sequence1 == reference_df.sequence2) 

#step 2: calculate delta column 
matches_delta = matches.withColumn('delta', matches.index2 - matches.index1) 

#step 3: sort by index 
matches_sorted = matches_delta.sort('delta').sort('index2') 

#step 4: convert back to rdd and reduce 
#note that + is just string concatenation for strings 
r = matches_sorted['delta', 'sequence1'].rdd 
r.reduceByKey(lambda x, y : x + y).collect() 

#expected output: 
#[(24, u'dbab'), (-18, u'dbab'), (20, u'abcdadabdbab')] 
+0

Интересный подход. Само соединение не является проблемой, это расширение парных списков как декартова продукта, поэтому, возможно, я пропущу шаг агрегации и сделаю это вместо этого. Никогда раньше не работал с DataFrames, но я подозревал, что хотел бы, чтобы они это сделали. –

+0

Я на самом деле получаю худшую производительность с этим, я подозреваю, что это потому, что мы сопоставляем каждую возможную пару слов, а не целые списки слов, как я делал раньше. Не уверен, что это окупится большими партиями, где шаг расширения - большой успех, но я посмотрю. –

Смежные вопросы