2016-10-27 3 views
3

У меня есть ведро S3, которое заполнено файлами Gz, которые не имеют расширения файла. Например, s3://mybucket/1234502827-34231Spark - чтение сжатых файлов без расширения файла

sc.textFile использует это расширение файла для выбора декодера. Я нашел много сообщений в блоге по обработке пользовательских расширений файлов, но ничего о недостающих расширениях файлов.

Я думаю, что решение может быть sc.binaryFiles и разархивировать файл вручную.

Другая возможность - выяснить, как sc.textFile находит формат файла. Я не понимаю, как работают эти classOf[].

def textFile(
     path: String, 
     minPartitions: Int = defaultMinPartitions): RDD[String] = withScope { 
    assertNotStopped() 
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], 
     minPartitions).map(pair => pair._2.toString).setName(path) 
    } 
+0

'sc.textFile' не определяет формат. Это делается с помощью «TextInputFormat», и используется только расширение. –

+0

Или переименуйте все файлы в s3, добавив '.gz'. Я взглянул на источник, и он реализован здесь: https://hadoop.apache.org/docs/stable/api/src-html/org/apache/hadoop/io/compress/CompressionCodecFactory#line.191 Это действительно используйте расширение файла. Спектр предполагает, что вы можете просто взглянуть на первые пару байтов http://www.zlib.org/rfc-gzip.html#file-format, но это говорит о том, что вы можете получить ложные срабатывания и должны учитывать endian https: // stackoverflow.com/questions/6059302/how-to-check-if-a-file-is-gzip-compressed, поэтому, без сомнения, просто использование '.gz' является более быстрым и надежным соглашением – Davos

+0

@ user6022341' TextInputFormat' не делает это метод 'getCodec (Path file)' в этом классе 'org.apache.hadoop.io.compress.CompressionCodecFactory' – Davos

ответ

2

Можете ли вы попытаться объединить нижеследующее решение для файлов ZIP с библиотекой gzipFileInputFormat?

здесь - How to open/stream .zip files through Spark? Вы можете видеть, как сделать это с помощью ZIP:

rdd1 = sc.newAPIHadoopFile("/Users/myname/data/compressed/target_file.ZIP", ZipFileInputFormat.class, Text.class, Text.class, new Job().getConfiguration()); 

gzipFileInputFormat:

https://github.com/bsankaran/internet_routing/blob/master/hadoop-tr/src/main/java/edu/usc/csci551/tools/GZipFileInputFormat.java

Некоторые подробности о newAPIHadoopFile() можно найти здесь: http://spark.apache.org/docs/latest/api/python/pyspark.html

+0

Спасибо, я дал это пару часов и не смог заставить его работать. – jspooner

1

Я нашел несколько примеров того, что почти соответствует моим потребностям. Вот последний код, который я использовал для анализа файла, сжатого с помощью GZ.

import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream 
import org.apache.spark.input.PortableDataStream 
import scala.util.Try 
import java.nio.charset._ 

def extractBSM(ps: PortableDataStream, n: Int = 1024) = Try { 
    val gz = new GzipCompressorInputStream(ps.open) 
    Stream.continually { 
    // Read n bytes 
    val buffer = Array.fill[Byte](n)(-1) 
    val i = gz.read(buffer, 0, n) 
    (i, buffer.take(i)) 
    } 
    // Take as long as we've read something 
    .takeWhile(_._1 > 0) 
    .map(_._2) 
    .flatten 
    .toArray 
} 
def decode(charset: Charset = StandardCharsets.UTF_8)(bytes: Array[Byte]) = new String(bytes, StandardCharsets.UTF_8) 
val inputFile = "s3://my-bucket/157c96bd-fb21-4cc7-b340-0bd4b8e2b614" 
val rdd = sc.binaryFiles(inputFile).flatMapValues(x => extractBSM(x).toOption).map(x => decode()(x._2)) 
val rdd2 = rdd.flatMap { x => x.split("\n") } 
rdd2.take(10).foreach(println) 
+0

Это работает для GZ, но нам действительно нужно проверить магические байты и применить правильное сжатие algo. – jspooner

+0

Вы должны указать источник. Также это оправдано только для нерасщепляемых форматов (например, gz), поэтому универсальное решение с распознаванием не имеет большого смысла. –

Смежные вопросы