2015-03-10 2 views
6

Я прочитал о Спарк поддержка «s для Gzip -kind входных файлов here, и я удивляюсь, если та же поддержка существует для различного рода сжатых файлов, таких как .zip файлы , До сих пор я пытался вычислить файл, сжатый в файле zip, но Spark кажется неспособным прочитать его содержимое успешно.Zip поддержка в Apache Спарк

Я принял взглянуть на Hadoop «s newAPIHadoopFile и newAPIHadoopRDD, но до сих пор я не был в состоянии получить что-нибудь работает.

Кроме того, Спарк поддерживает создание раздела для каждого файла под указанную папку, как в приведенном ниже примере:

SparkConf SpkCnf = new SparkConf().setAppName("SparkApp") 
            .setMaster("local[4]"); 

JavaSparkContext Ctx = new JavaSparkContext(SpkCnf); 

JavaRDD<String> FirstRDD = Ctx.textFile("C:\input\).cache(); 

Где C:\input\ указывает на каталог с несколькими файлами.

В случае вычисления застегнул файлов будет возможно, будет также можно упаковать каждый файл в один сжатый файл и следовать той же схеме один раздел в файле?

ответ

7

Поскольку Apache Spark использует форматы ввода Hadoop, мы можем посмотреть документацию о том, как обрабатывать zip-файлы и посмотреть, есть ли что-то, что работает.

This site дает нам представление о том, как использовать это (а именно, мы можем использовать ZipFileInputFormat). При этом, поскольку zip-файлы не являются split-table (см. this), ваш запрос на наличие одного сжатого файла на самом деле не очень хорошо поддерживается. Вместо этого, если это возможно, было бы лучше иметь каталог, содержащий множество отдельных zip-файлов.

Этот вопрос похож на this other question, однако он добавляет дополнительный вопрос о том, возможно ли иметь один zip-файл (который, поскольку он не является разделенным табличным форматом, не является хорошей идеей).

2

Вы можете использовать sc.binaryFiles для чтения Zip, как двоичный файл

val rdd = sc.binaryFiles(path).flatMap { case (name: String, content: PortableDataStream) => new ZipInputStream(content.open) } //=> RDD[ZipInputStream]

И тогда вы можете отобразить ZipInputStream в список строк:

val zis = rdd.first val entry = zis.getNextEntry val br = new BufferedReader(new InputStreamReader(in, "UTF-8")) val res = Stream.continually(br.readLine()).takeWhile(_ != null).toList

Но проблема остается что zip-файл не является разделяемым.

+1

У меня аналогичная проблема. Я пробовал это, но получал ошибку. может у, пожалуйста, помогите? val zipFileRDD = sc.binaryFiles (zipFile) .flatMap {case (name: String, content: PortableDataStream) => новый ZipInputStream (content.открытый)} > >: 95: ошибка: тип несоответствия; > найдено: java.util.zip.ZipInputStream > требуется: [?] TraversableOnce > Val zipFileRDD = sc.binaryFiles (ZipFile) .flatMap {случай (название, содержание) => новый ZipInputStream (content.open)} – Pooja3101

1

Вы можете использовать sc.binaryFiles, чтобы открыть zip-файл в двоичном формате, а затем распаковать его в текстовый формат. К сожалению, zip-файл не сплит-совместим. Поэтому вам нужно дождаться декомпрессии, а затем, возможно, перетасовать данные, чтобы сбалансировать данные в каждом разделе.

Вот пример в Python. Более подробная информация в http://gregwiki.duckdns.org/index.php/2016/04/11/read-zip-file-in-spark/

file_RDD = sc.binaryFiles(HDFS_path + data_path) 

def Zip_open(binary_stream_string) : # New version, treat a stream as zipped file 
    try : 
     pseudo_file = io.BytesIO(binary_stream_string) 
     zf = zipfile.ZipFile(pseudo_file) 
     return zf 
    except : 
     return None 

def read_zip_lines(zipfile_object) : 
    file_iter = zipfile_object.open('diff.txt') 
    data = file_iter.readlines() 
    return data 

My_RDD = file_RDD.map(lambda kv: (kv[0], Zip_open(kv[1]))) 
0

Ниже приведен пример, который ищет каталог для.zip-файлы и создать RDD с использованием настраиваемого FileInputFormat с именем ZipFileInputFormat и API-интерфейса newAPIHadoopFile в контексте Spark. Затем он записывает эти файлы в выходной каталог.

allzip.foreach { x => 
    val zipFileRDD = sc.newAPIHadoopFile(
    x.getPath.toString, 
    classOf[ZipFileInputFormat], 
    classOf[Text], 
    classOf[BytesWritable], hadoopConf) 

    zipFileRDD.foreach { y => 
    ProcessFile(y._1.toString, y._2) 
    } 

https://github.com/alvinhenrick/apache-spark-examples/blob/master/src/main/scala/com/zip/example/Unzip.scala

ZipFileInputFormat, используемый в примере, можно найти здесь: поддержка https://github.com/cotdp/com-cotdp-hadoop/tree/master/src/main/java/com/cotdp/hadoop

2

Спарк по умолчанию сжатые файлы

По Spark Programming Guide

All of Spark’s file-based input methods, including textFile, support running on directories, compressed files, and wildcards as well. For example, you can use textFile("/my/directory"), textFile("/my/directory/.txt"), and textFile("/my/directory/.gz").

Этот может быть расширена за счет предоставления информации о том, что форматы сжатия поддерживаются Hadoop, которые в основном могут быть проверены путем нахождения всех классов, простирающиеся CompressionCodec (docs)

name | ext  | codec class 
------------------------------------------------------------- 
bzip2 | .bz2  | org.apache.hadoop.io.compress.BZip2Codec 
default | .deflate | org.apache.hadoop.io.compress.DefaultCodec 
deflate | .deflate | org.apache.hadoop.io.compress.DeflateCodec 
gzip | .gz  | org.apache.hadoop.io.compress.GzipCodec 
lz4  | .lz4  | org.apache.hadoop.io.compress.Lz4Codec 
snappy | .snappy | org.apache.hadoop.io.compress.SnappyCodec 

Источник: List the available hadoop codecs

так что приведенные выше форматы и гораздо больше возможностей можно было бы достичь, позвонив по телефону:

sc.readFile(path) 

Чтение zip-файлов в Spark

К сожалению, zip по умолчанию не поддерживается в списке поддерживаемых.

Я нашел большую статью: Hadoop: Processing ZIP files in Map/Reduce и некоторые ответы (example) объясняет, как использовать импортирован ZipFileInputFormat вместе с sc.newAPIHadoopFile API. Но это не сработало для меня.

Мое решение

без каких-либо внешних зависимостей, вы можете загрузить файл с sc.binaryFiles и позже распаковывать PortableDataStream чтение содержимого. Это тот подход, который я выбрал.

import java.io.{BufferedReader, InputStreamReader} 
import java.util.zip.ZipInputStream 
import org.apache.spark.SparkContext 
import org.apache.spark.input.PortableDataStream 
import org.apache.spark.rdd.RDD 

implicit class ZipSparkContext(val sc: SparkContext) extends AnyVal { 

    def readFile(path: String, 
       minPartitions: Int = sc.defaultMinPartitions): RDD[String] = { 

     if (path.endsWith(".zip")) { 
     sc.binaryFiles(path, minPartitions) 
      .flatMap { case (name: String, content: PortableDataStream) => 
      val zis = new ZipInputStream(content.open) 
      // this solution works only for single file in the zip 
      val entry = zis.getNextEntry 
      val br = new BufferedReader(new InputStreamReader(zis)) 
      Stream.continually(br.readLine()).takeWhile(_ != null) 
      } 
     } else { 
     sc.textFile(path, minPartitions) 
     } 
    } 
    } 

используя этот неявный класс, вам нужно импортировать его и вызвать метод readFile на SparkContext:

import com.github.atais.spark.Implicits.ZipSparkContext 
sc.readFile(path) 

И неявный класс будет загружать файл zip правильно и вернуть RDD[String] как раньше ,

Примечание: Это работает только для одного файла в архиве zip!
Для нескольких файлов в вашей почтовой поддержке, проверьте этот ответ: https://stackoverflow.com/a/45958458/1549135

+0

Вы не закрываете соединения. – Programmer

+0

Любая идея подхода к каталогу, содержащему несколько zip-файлов (разделов)? –

Смежные вопросы