3

Я пытаюсь проанализировать около 1 миллиона файлов HTML с помощью PySpark (Google Dataproc) и написать соответствующие поля в сжатый файл. Каждый файл HTML составляет около 200 КБ. Следовательно, все данные составляют около 200 ГБ.PySpark + Google Cloud Storage (wholeTextFiles)

Приведенный ниже код работает отлично, если я использую подмножество данных, но работает в течение нескольких часов, а затем сработает во время работы по всему набору данных. Кроме того, рабочие узлы не используются (< 5% CPU), поэтому я знаю, что есть некоторые проблемы.

Я считаю, что система задыхается от приема данных от GCS. Есть лучший способ сделать это? Кроме того, когда я использую wholeTextFiles таким образом, мастер пытается загрузить все файлы, а затем отправить их исполнителям или он позволяет исполнителям загружать их?

def my_func(keyval): 
    keyval = (file_name, file_str) 
    return parser(file_str).__dict__ 

data = sc.wholeTextFiles("gs://data/*") 
output = data.map(my_func) 
output.saveAsTextFile("gs://results/a") 
+0

Любые сообщения об ошибках, следы стека и т. Д. Были бы полезны. Мастер не будет считывать все содержащиеся данные, но он будет получать статус для всех входных файлов перед началом работы. По умолчанию Dataproc устанавливает свойство «mapreduce.input.fileinputformat.list-status.num-threads» на 20, чтобы помочь улучшить время этого поиска, но RPC все еще выполняется для каждого файла в GCS. Один из способов дальнейшего улучшения поиска состоит в том, чтобы выполнить некоторую логику поиска через искру, создав префиксы файлов, содержащие RDD, используя flatMap для преобразования этих префиксов в имена файлов, а затем сопоставление имен файлов с содержимым файла. –

+0

Хорошо. Предположим, я создаю RDD имен файлов, как вы предлагаете. Как сопоставить это имя файла с файлом? Я не могу вызвать sc.wholeTextFile внутри исполнителя. Я мог бы использовать boto API внутри исполнителя, чтобы загрузить файл. Я пробовал это, но он еще медленнее. Мое подозрение в том, что у boto API есть много накладных расходов на аутентификацию при каждом запросе. –

ответ

3

Чтобы ответить на ваш вопрос, мастер не будет считывать все содержащиеся данные, но он будет получать статус для всех входных файлов перед началом работы. По умолчанию Dataproc устанавливает свойство «mapreduce.input.fileinputformat.list-status.num-threads» на 20, чтобы помочь улучшить время этого поиска, но RPC все еще выполняется для каждого файла в GCS.

Кажется, вы нашли случай, когда даже добавление потоков не очень помогает и просто приводит драйвер к OOM быстрее.

Расширяя возможности распараллеливания чтения, у меня есть две идеи.

Но сначала немного предупреждение: ни одно из этих решений, поскольку они являются очень надежными, чтобы каталоги были включены в глобус. Вероятно, вы захотите защитить от каталогов, отображаемых в списке файлов для чтения.

Первый выполняется с помощью python и инструментов командной строки hadoop (это также можно выполнить с помощью gsutil). Ниже приведен пример того, как это может выглядеть и выполняет файл листинг на рабочих, считывает содержимое файла в пар и, наконец, вычисляет пар (имя файла, длина файла):

from __future__ import print_function 

from pyspark.rdd import RDD 
from pyspark import SparkContext 

import sys 
import subprocess 


def hadoop_ls(file_glob): 
    lines = subprocess.check_output(["/usr/bin/hadoop", "fs", "-ls", file_glob]).split("\n") 
    files = [line.split()[7] for line in lines if len(line) > 0] 
    return files 

def hadoop_cat(file): 
    return subprocess.check_output(["/usr/bin/hadoop", "fs", "-cat", file]).decode("utf-8") 

if __name__ == "__main__": 
    if len(sys.argv) < 2: 
    print("Provide a list of path globs to read.") 
    exit(-1) 

    sc = SparkContext() 
    # This is just for testing. You'll want to generate a list 
    # of prefix globs instead of having a list passed in from the 
    # command line. 
    globs = sys.argv[1:] 
    # Desired listing partition count 
    lpc = 100 
    # Desired 'cat' partition count, should be less than total number of files 
    cpc = 1000 
    files = sc.parallelize(globs).repartition(lpc).flatMap(hadoop_ls) 
    files_and_content = files.repartition(cpc).map(lambda f: [f, hadoop_cat(f)]) 
    files_and_char_count = files_and_content.map(lambda p: [p[0], len(p[1])]) 
    local = files_and_char_count.collect() 
    for pair in local: 
    print("File {} had {} chars".format(pair[0], pair[1])) 

Я бы первым начать с этим подпроцессом решение и играть с разделением вызовов hadoop_ls и hadoop_cat и посмотреть, можете ли вы получить что-то приемлемое.

Второе решение сложнее, но, вероятно, даст более эффективный конвейер, избегая многих, многих вызовов exec.

В этом втором решении мы будем составлять специальную вспомогательную банку, используя операцию инициализации, чтобы скопировать эту банку всем работникам и, наконец, использовать помощника из нашего драйвера.

Окончательная структура каталога нашего проекта Scala банки будет выглядеть примерно так:

helper/src/main/scala/com/google/cloud/dataproc/support/PysparkHelper.scala 
helper/build.sbt 

В нашем файле PysparkHelper.scala мы будем иметь малый класс SCALA, что функции много, как наш чистый раствор питона выше делает , Сначала мы создадим RDD файловых глобусов, затем RDD имен файлов и, наконец, RDD имен файлов и пар содержимого файла.

package com.google.cloud.dataproc.support 

import collection.JavaConversions._ 

import org.apache.commons.io.IOUtils 
import org.apache.hadoop.conf.Configuration 
import org.apache.hadoop.fs.{FileSystem, Path} 
import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext} 

import java.util.ArrayList 
import java.nio.charset.StandardCharsets 

class PysparkHelper extends Serializable { 
    def wholeTextFiles(
    context: JavaSparkContext, 
    paths: ArrayList[String], 
    partitions: Int): JavaPairRDD[String, String] = { 

    val globRDD = context.sc.parallelize(paths).repartition(partitions) 
    // map globs to file names: 
    val filenameRDD = globRDD.flatMap(glob => { 
     val path = new Path(glob) 
     val fs: FileSystem = path.getFileSystem(new Configuration) 
     val statuses = fs.globStatus(path) 
     statuses.map(s => s.getPath.toString) 
    }) 
    // Map file name to (name, content) pairs: 
    // TODO: Consider adding a second parititon count parameter to repartition before 
    // the below map. 
    val fileNameContentRDD = filenameRDD.map(f => { 
     Pair(f, readPath(f, new Configuration)) 
    }) 

    new JavaPairRDD(fileNameContentRDD) 
    } 

    def readPath(file: String, conf: Configuration) = { 
    val path = new Path(file) 
    val fs: FileSystem = path.getFileSystem(conf) 
    val stream = fs.open(path) 
    try { 
     IOUtils.toString(stream, StandardCharsets.UTF_8) 
    } finally { 
     stream.close() 
    } 
    } 
} 

Помощник/сборка.SBT файл будет выглядеть примерно так:

organization := "com.google.cloud.dataproc.support" 
name := "pyspark_support" 
version := "0.1" 
scalaVersion := "2.10.5" 
libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.0" % "provided" 
libraryDependencies += "org.apache.hadoop" % "hadoop-common" % "2.7.1" % "provided" 
exportJars := true 

Затем мы можем построить помощника с SBT:

$ cd helper && sbt package 

Выходной помощник баночка должна быть цель/Scala-2,10/pyspark_support_2.10-0.1.jar

Теперь мы должны получить эту банку в наш кластер, и для этого нам нужно сделать две вещи: 1) загрузить банку в GCS и 2) создать в команде GCS операцию инициализации, чтобы скопировать банку на узлы кластера.

В качестве иллюстрации предположим, что ваш ведро имеет имя MY_BUCKET (вставьте здесь соответствующий морф, связанный с моржами).

$ gsutil cp target/scala-2.10/pyspark_support_2.10-0.1.jar gs://MY_BUCKET/pyspark_support.jar 

Создать действие инициализации (назовем его pyspark_init_action.sh, заменив MY_BUCKET при необходимости):

#!/bin/bash 

gsutil cp gs://MY_BUCKET/pyspark_support.jar /usr/lib/hadoop/lib/ 

и, наконец, загрузить действие инициализации для ГКС:

$ gsutil cp pyspark_init_action.sh gs://MY_BUCKET/pyspark_init_action.sh 

Кластер теперь можно начать с передачи следующих флагов в gcloud:

--initialization-actions gs://MY_BUCKET/pyspark_init_action.sh 

После создания, загрузки и установки нашей новой библиотеки мы можем наконец использовать его из pyspark:

from __future__ import print_function 

from pyspark.rdd import RDD 
from pyspark import SparkContext 
from pyspark.serializers import PairDeserializer, UTF8Deserializer 

import sys 

class DataprocUtils(object): 

    @staticmethod 
    def wholeTextFiles(sc, glob_list, partitions): 
    """ 
    Read whole text file content from GCS. 
    :param sc: Spark context 
    :param glob_list: List of globs, each glob should be a prefix for part of the dataset. 
    :param partitions: number of partitions to use when creating the RDD 
    :return: RDD of filename, filecontent pairs. 
    """ 
    helper = sc._jvm.com.google.cloud.dataproc.support.PysparkHelper() 
    return RDD(helper.wholeTextFiles(sc._jsc, glob_list, partitions), sc, 
       PairDeserializer(UTF8Deserializer(), UTF8Deserializer())) 

if __name__ == "__main__": 
    if len(sys.argv) < 2: 
    print("Provide a list of path globs to read.") 
    exit(-1) 

    sc = SparkContext() 
    globs = sys.argv[1:] 
    partitions = 10 
    files_and_content = DataprocUtils.wholeTextFiles(sc, globs, partitions) 
    files_and_char_count = files_and_content.map(lambda p: (p[0], len(p[1]))) 
    local = files_and_char_count.collect() 
    for pair in local: 
    print("File {} had {} chars".format(pair[0], pair[1])) 
0

Спасибо! Я попробовал первый метод. Он работает, но не очень эффективен из-за вызовов exec и RPC/auth. Для кластера с 32 узлами требуется около 10 часов. Я смог запустить его через 30 минут на кластере с четырьмя узлами, используя databricks на aws с разъемом Amazon s3. Кажется, там намного меньше накладных расходов. Я хочу, чтобы Google предоставил лучший способ заимствования данных от GCS до Spark.

+0

Я хотел бы посмотреть на это немного больше; 10 часов кажется слишком высоким - можете ли вы поделиться макетом своих файлов/каталогов (например, это 1 миллион объектов в одном каталоге или 1000 каталогов с 1000 объектами и т. Д.). Также - можете ли вы поделиться общей формой вашего glob - (например,/bucket/*/dir/* или что-то еще)? –

Смежные вопросы