Я использую PySpark и я ищу способ проверить:Фильтр РДД значениями PySpark
Для данного check_number = 01
если значение третьего элемента в моей rdd1
не содержит check_number ==> получить всю информацию об этом check_number от rdd2
..
Дано:
rdd1 = sc.parallelize([(u'_guid_F361IeVTC8Q0kckDRw7iOJCe64ELpRmMKQgESgf-uEE=',
u'serviceXXX',
u'testAB_02',
u'2016-07-03')])
Предположим, что первым элементом является ID
, второй - имя службы, третье - это тестовое имя с ID
, а четвертым элементом является дата.
rdd2 = sc.parallelize([(u'9b023b8233c242c09b93506942002e0a',
u'01',
u'2016-11-02'),
(u'XXXX52547412558933nnBlmquhdyhM',
u'02',
u'2016-11-04')])
Предположим, что первым элементом является идентификатор, второй - идентификатор теста, а последний элемент - дата.
Итак, здесь у меня есть в моем rdd1
testAB_02
, который не совпадает с моим номером check_number (поэтому имя службы должно заканчиваться значением check_number). Мой объект if должен получить все строки от rdd2
, с 01
в качестве идентификатора теста. Ожидаемый результат здесь должен быть:
[(u'9b023b8233c242c09b93506942002e0a',
u'01',
u'2016-11-02')
Это мой код:
def update_typesdecohorte_table(rdd1, rdd2):
if rdd1.filter(lambda x : (re.match('.*?' + check_number, x[2]))).isEmpty() is True:
new_rdd2 = rdd2.filter(lambda x : x[1] == check_number)
else:
pass
return new_rdd2
new_rdd2 = update_typesdecohorte_table(rdd1, rdd2)
Wich дает:
[(u'9b023b8233c242c09b93506942002e0a', u'01', u'2016-11-02')]
Этот код работает, но мне не нравится метод .. Каков наиболее эффективный способ сделать это?
Thx Mariusz! Это то, что я хочу, и метод кажется надежным! – DataAddicted