2015-09-30 3 views
2

данных в моем первом РДУ, какКак пропустить более одной строки заголовка в РДУ в Спарк

1253 
545553 
12344896 
1 2 1 
1 43 2 
1 46 1 
1 53 2 

Теперь первые 3 числа некоторые счетчики, которые мне нужно транслировать. После того, что все линии имеют один и тот же формат, как

1 2 1 
1 43 2 

Я сопоставит все эти значения после 3-х счетчиков к новому РДУ после выполнения некоторых вычислений с ними функции. Но я не могу понять, как отделить эти первые 3 значения и отобразить остальные как обычно.

Мой Python код, как этот

documents = sc.textFile("file.txt").map(lambda line: line.split(" ")) 

final_doc = documents.map(lambda x: (int(x[0]), function1(int(x[1]), int(x[2])))).reduceByKey(lambda x, y: x + " " + y) 

Он работает только тогда, когда первые 3 значения не в текстовом файле, но с ними он дает ошибку.

Я не хочу пропускать эти первые 3 значения, но сохраняю их в 3 широковещательных переменных, а затем передаю оставшийся набор данных в функции карты.

И да, текстовый файл должен быть только в этом формате. Я не могу удалить эти 3 значения/счетчики

Функция 1 просто выполняет некоторые вычисления и возвращает значения.

+0

Возможный дубликат [Как пропустить заголовок из файлов csv в Spark?] (Http://stackoverflow.com/questions/27854919/how-to-skip-header-from-csv-files-in-spark) – zero323

+0

Но я не хочу пропускать, я хочу сохранить эти 3 значения в 3 разных переменных, а затем работать со всеми другими данными в наборе данных. Я не хочу передавать эти 3 значения функции карты, описанной выше. – Nicky

+0

Загрузить данные: 'raw = sc.textFile (" file.txt ")', Возьмите первые три строки, которые вы хотите использовать для трансляции: 'header = raw.take (3)', используйте один из методов, описанных в связанной ответьте, чтобы пропустить заголовок и обработать все остальное. – zero323

ответ

4
  1. Импорт для Python 2

    from __future__ import print_function 
    
  2. Приготовьте фиктивные данные:

    s = "1253\n545553\n12344896\n1 2 1\n1 43 2\n1 46 1\n1 53 2" 
    with open("file.txt", "w") as fw: fw.write(s) 
    
  3. Read необработанные входные данные:

    raw = sc.textFile("file.txt") 
    
  4. заголовка Извлечение:

    header = raw.take(3) 
    print(header) 
    ### [u'1253', u'545553', u'12344896'] 
    
  5. фильтра линии:

    • использованием zipWithIndex

      content = raw.zipWithIndex().filter(lambda kv: kv[1] > 2).keys() 
      print(content.first()) 
      ## 1 2 1 
      
    • использованием mapPartitionsWithIndex

      from itertools import islice 
      
      content = raw.mapPartitionsWithIndex(
          lambda i, iter: islice(iter, 3, None) if i == 0 else iter) 
      
      print(content.first()) 
      ## 1 2 1 
      

ПРИМЕЧАНИЕ: Вся Заслуга pzecevic и Sean Owen (см связанных источников).

+0

Спасибо за это решение. – Nicky

+0

Итак, из 3 решений, которые наиболее эффективны.
1. zipWithIndex 2. mapPartitonsWithIndex 3. и фильтр – Nicky

+0

Я бы с 'mapPartitionsWithIndex'. – zero323

1

Сначала принимает значение с использованием методы тека(), как zero323 предложил

raw = sc.textfile("file.txt") 
headers = raw.take(3) 

Тогда

final_raw = raw.filter(lambda x: x != headers) 

и сделал.

Смежные вопросы