2016-11-21 2 views
0

Я использую PySpark и я ищу способ проверить:Фильтр РДД значениями PySpark

Для данного check_number = 01

если значение третьего элемента в моей rdd1 не содержит check_number ==> получить всю информацию об этом check_number от rdd2 ..

Дано:

rdd1 = sc.parallelize([(u'_guid_F361IeVTC8Q0kckDRw7iOJCe64ELpRmMKQgESgf-uEE=', 
         u'serviceXXX', 
         u'testAB_02', 
         u'2016-07-03')]) 

Предположим, что первым элементом является ID, второй - имя службы, третье - это тестовое имя с ID, а четвертым элементом является дата.

rdd2 = sc.parallelize([(u'9b023b8233c242c09b93506942002e0a', 
         u'01', 
         u'2016-11-02'), 

         (u'XXXX52547412558933nnBlmquhdyhM', 
         u'02', 
         u'2016-11-04')]) 

Предположим, что первым элементом является идентификатор, второй - идентификатор теста, а последний элемент - дата.

Итак, здесь у меня есть в моем rdd1testAB_02, который не совпадает с моим номером check_number (поэтому имя службы должно заканчиваться значением check_number). Мой объект if должен получить все строки от rdd2, с 01 в качестве идентификатора теста. Ожидаемый результат здесь должен быть:

[(u'9b023b8233c242c09b93506942002e0a', 
    u'01', 
    u'2016-11-02') 

Это мой код:

def update_typesdecohorte_table(rdd1, rdd2): 

    if rdd1.filter(lambda x : (re.match('.*?' + check_number, x[2]))).isEmpty() is True: 

     new_rdd2 = rdd2.filter(lambda x : x[1] == check_number) 

    else: 

     pass 

    return new_rdd2 

new_rdd2 = update_typesdecohorte_table(rdd1, rdd2) 

Wich дает:

[(u'9b023b8233c242c09b93506942002e0a', u'01', u'2016-11-02')] 

Этот код работает, но мне не нравится метод .. Каков наиболее эффективный способ сделать это?

ответ

1

Если вы хотите, чтобы получить все записи из rdd2, которые не имеют совпадающие элементы в rdd1 вы можете использовать cartesian:

new_rdd2 = rdd1.cartesian(rdd2) 
    .filter(lambda r: not r[0][2].endswith(r[1][1])) 
    .map(lambda r: r[1]) 

Если check_number фиксируется, в конце фильтра по этому значению:

new_rdd2.filter(lambda r: r[1] == check_number).collect() 

Но если ваш check_number исправлен и оба RDD велики, он будет еще медленнее вашего решения, так как ему нужно перетасовать разделы во время соединения (ваш код выполняет только преобразования без перетасовки).

+0

Thx Mariusz! Это то, что я хочу, и метод кажется надежным! – DataAddicted

Смежные вопросы