2015-04-16 3 views
6

Я пытаюсь обрабатывать 4 каталога текстовых файлов, которые постоянно растут каждый день. Что мне нужно сделать, если кто-то пытается найти номер счета-фактуры, я должен предоставить им список файлов.Spark-Получение имени файла в RDD

Мне удалось сопоставить и уменьшить значения в текстовых файлах, загрузив их как RDD. Но как я могу получить имя файла и другие атрибуты файла?

ответ

4

Если ваши текстовые файлы достаточно малы, вы можете использовать SparkContext.wholeTextFiles, который возвращает RDD (filename,content).

+2

В 1.6+ Это действительно возможно без чтения полных файлов. – zero323

3

Если текстовые файлы слишком велики для SparkContext.wholeTextFiles, вы будете использовать (простой) пользовательские InputFormat, а затем вызвать SparkContext.hadoopRDD

InputFormat нужно будет возвращать кортеж (имя файла, строка), а не линии, то вы могли бы фильтр с использованием предиката, который просматривает содержимое строки, затем уникально и собирает имена файлов.

От искры, код будет выглядеть примерно так:

val ft = classOf[FileNamerInputFormat] 
val kt = classOf[String] 
val vt = classOf[String] 

val hadoopConfig = new Configuration(sc.hadoopConfiguration) 
sc.newAPIHadoopFile(path, ft, kt, vt, hadoopConfig) 
    .filter { case (f, l) => isInteresting(l) } 
    .map { case (f, _) => f } 
    .distinct() 
    .collect() 
+0

Не могли бы вы рассказать об этом? Может быть, пример? Как это поможет, если вы находитесь в MapPartitionsRDD или другом RDD, который не основан на чтении файла? –

+0

@JustinPihony расширенный ответ немного. Надеюсь, вы не попросите меня показать InputFormat ... :) –

+3

Привет, Алистер. Большое спасибо за ваш ответ. Я использовал последнюю часть вашего решения, и я мог получить (файл, строку) без использования пользовательский формат ввода. Проверьте эту ссылку: http://themodernlife.github.io/scala/spark/hadoop/hdfs/2014/09/28/spark-input-filename/ –

12

С Спарк 1.6 вы можете объединить text источник данных и input_file_name функцию следующим образом:

Scala:

import org.apache.spark.sql.functions.input_file_name 

val inputPath: String = ??? 

spark.read.text(inputPath) 
    .select(input_file_name, $"value") 
    .as[(String, String)] // Optionally convert to Dataset 
    .rdd // or RDD 

Python:

(версии перед 2.х глючат и не может сохранить имена при преобразовании в RDD):

from pyspark.sql.functions import input_file_name 

(spark.read.text(input_path) 
    .select(input_file_name(), "value")) 
    .rdd) 

Это может быть использовано с другими форматами ввода, а также.

+0

Для меня этот метод не работает при использовании в Python. Поле, которое должно быть input_file_name, заполняется при выполнении первой операции, такой как .take (10), но каждая последующая операция, такая как карта в строках, создает пустую строку. В Scala это работает. Spark 1.6 – ludwigm

+1

@ludwigm Это работает в PySpark только до тех пор, пока вы не перемещаете данные из JVM. – zero323

+0

@ zero323, почему я не могу переместить имя_события из JVM. Мне нужно сохранить список файлов? – rado

2

Вы можете попробовать это, если вы находитесь в pyspark:

test = sc.wholeTextFiles("pathtofile") 

вы получите в результате RDD с первым элементом = FilePath и второй элемент = содержимому

2

Вы можете использовать WholeTextFile() для достижения этой цели. Однако, если входные файлы большие, тогда было бы обратным продуктом использовать WholeTextFile(), поскольку он помещал весь файл в одну запись.

Лучший способ получить имена файлов в таком сценарии - использовать mapPartitionsWithInputSplit(). Вы можете найти рабочий пример, используя этот сценарий: my blog.

+0

Добавлена ​​более подробная информация, я надеюсь, что она насытится. Код довольно большой, и лучше всего было бы вытащить его из блога! – BJC

+0

Это уже выглядит лучше, я удалил свой предыдущий комментарий, чтобы избежать путаницы в будущем. – g00glen00b

0

Кажется, излишне использовать Spark напрямую ... Если эти данные будут «собраны» водителю, почему бы не использовать API HDFS? Часто Hadoop поставляется вместе с Spark. Вот пример:

import org.apache.hadoop.fs._ 
import org.apache.hadoop.conf._ 

val fileSpec = "/data/Invoices/20171123/21" 
val conf = new Configuration() 
val fs = org.apache.hadoop.fs.FileSystem.get(new URI("hdfs://nameNodeEneteredHere"),conf) 
val path = new Path(fileSpec) 
// if(fs.exists(path) && fs.isDirectory(path) == true) ... 
val fileList = fs.listStatus(path) 

Затем с println(fileList(0)), информация (отформатированный), как этот первый пункт (в качестве примера) можно рассматривать как org.apache.hadoop.fs.FileStatus:

FileStatus { 
    path=hdfs://nameNodeEneteredHere/Invoices-0001.avro; 
    isDirectory=false; 
    length=29665563; 
    replication=3; 
    blocksize=134217728; 
    modification_time=1511810355666; 
    access_time=1511838291440; 
    owner=codeaperature; 
    group=supergroup; 
    permission=rw-r--r--; 
    isSymlink=false 
} 

Где fileList(0).getPath даст hdfs://nameNodeEneteredHere/Invoices-0001.avro.

Я предполагаю, что это средство чтения файлов в основном было бы с назначением HDFS, но не внутри каждого исполнителя. TLDR; Я уверен, что Spark, скорее всего, опросит namenode, чтобы получить RDD. Если базовый вызов Spark опросит namenode для управления RDD, возможно, вышеописанное является эффективным решением. Тем не менее, предложения, предлагающие любое направление, будут приветствоваться.

Смежные вопросы