2016-10-04 3 views
0

Я новичок в Spark, но у меня был некоторый опыт работы в Hadoop. Я пытаюсь адаптировать код python, который я использую в потоковой передаче Hadoop, который отфильтровывает некоторые твиты в формате JSON.Не пишите Нет или пустые строки в Spark (Python)

Как правило, у моей функции есть условие, которое печатает для вычитания твита, если условие истинно и ничего не печатает.

def filter(tweet): 
    if criteria(tweet) is True: 
     print json.dumps(tweet) 

Таким образом, окончательный выходной файл будет содержать только те твиты, которые я хочу.

Однако, пытаясь использовать Spark, мне пришлось изменить оператор print с return, поэтому я возвращаю твит, если условие True, и None в противном случае.

def filter(tweet): 
     if criteria(tweet) is True: 
      return json.dumps(tweet) 

Проблема возникает при попытке сохранить результаты на диск. Используя метод Pyspark saveAsTextFile, он сохраняет не только те твиты, которые я хочу, но также возвращает None, когда условие False.

Как я могу избежать записи None в файл, чтобы сохранить только нужные твиты?

Большое спасибо заранее.

Jorge

+0

Почему ты не можешь просто еще вернуться «»? Если это не сработает, вы не можете выполнить некоторую простую обработку сообщений для возвращаемого дампа json? –

+0

Hi free_mind. Это хороший момент, но не собирается ли печатать пустую строку? ..... Попробуем на всякий случай. Благодарю за ваш ответ. – Salias

ответ

1

Очень элегантное решение, что позволяет избежать цепочки filter и map, является использование flatMap:

def filter(tweet): 
    return [json.dumps(tweet)] if criteria(tweet) is True else [] 

some_rdd.flatMap(filter) 
0

Если вы используете функцию в карте, это не уменьшит количество элементов, которые вы имеете. Чтобы фильтровать элементы, вы должны использовать метод filter для тестирования, если элемент None после вас map.

Смежные вопросы