Спарк по умолчанию сжатые файлы
По Spark Programming Guide
All of Spark’s file-based input methods, including textFile, support running on directories, compressed files, and wildcards as well. For example, you can use textFile("/my/directory"), textFile("/my/directory/.txt"), and textFile("/my/directory/.gz").
Этот может быть расширена за счет предоставления информации о том, что форматы сжатия поддерживаются Hadoop, которые в основном могут быть проверены путем нахождения всех классов, простирающиеся CompressionCodec
(docs)
name | ext | codec class
-------------------------------------------------------------
bzip2 | .bz2 | org.apache.hadoop.io.compress.BZip2Codec
default | .deflate | org.apache.hadoop.io.compress.DefaultCodec
deflate | .deflate | org.apache.hadoop.io.compress.DeflateCodec
gzip | .gz | org.apache.hadoop.io.compress.GzipCodec
lz4 | .lz4 | org.apache.hadoop.io.compress.Lz4Codec
snappy | .snappy | org.apache.hadoop.io.compress.SnappyCodec
Источник: List the available hadoop codecs
так что приведенные выше форматы и гораздо больше возможностей можно было бы достичь, позвонив по телефону:
sc.readFile(path)
Чтение zip-файлов в Spark
К сожалению, zip
по умолчанию не поддерживается в списке поддерживаемых.
Я нашел большую статью: Hadoop: Processing ZIP files in Map/Reduce и некоторые ответы (example) объясняет, как использовать импортирован ZipFileInputFormat
вместе с sc.newAPIHadoopFile
API. Но это не сработало для меня.
Мое решение
без каких-либо внешних зависимостей, вы можете загрузить файл с sc.binaryFiles
и позже распаковывать PortableDataStream
чтение содержимого. Это тот подход, который я выбрал.
import java.io.{BufferedReader, InputStreamReader}
import java.util.zip.ZipInputStream
import org.apache.spark.SparkContext
import org.apache.spark.input.PortableDataStream
import org.apache.spark.rdd.RDD
implicit class ZipSparkContext(val sc: SparkContext) extends AnyVal {
def readFile(path: String,
minPartitions: Int = sc.defaultMinPartitions): RDD[String] = {
if (path.endsWith(".zip")) {
sc.binaryFiles(path, minPartitions)
.flatMap { case (name: String, content: PortableDataStream) =>
val zis = new ZipInputStream(content.open)
// this solution works only for single file in the zip
val entry = zis.getNextEntry
val br = new BufferedReader(new InputStreamReader(zis))
Stream.continually(br.readLine()).takeWhile(_ != null)
}
} else {
sc.textFile(path, minPartitions)
}
}
}
используя этот неявный класс, вам нужно импортировать его и вызвать метод readFile
на SparkContext
:
import com.github.atais.spark.Implicits.ZipSparkContext
sc.readFile(path)
И неявный класс будет загружать файл zip
правильно и вернуть RDD[String]
как раньше ,
Примечание: Это работает только для одного файла в архиве zip!
Для нескольких файлов в вашей почтовой поддержке, проверьте этот ответ: https://stackoverflow.com/a/45958458/1549135
У меня аналогичная проблема. Я пробовал это, но получал ошибку. может у, пожалуйста, помогите? val zipFileRDD = sc.binaryFiles (zipFile) .flatMap {case (name: String, content: PortableDataStream) => новый ZipInputStream (content.открытый)} > >: 95: ошибка: тип несоответствия; > найдено: java.util.zip.ZipInputStream > требуется: [?] TraversableOnce > Val zipFileRDD = sc.binaryFiles (ZipFile) .flatMap {случай (название, содержание) => новый ZipInputStream (content.open)} –
Pooja3101