2014-07-08 3 views
11

Необходима помощь для внедрения лучшей практики. Условия эксплуатации:Apache Spark on YARN: Большое количество файлов входных данных (объединить несколько входных файлов в искру)

  • Файл данных регистрации поступает нерегулярно.
  • Размер файла данных журнала составляет от 3,9 КБ до 8,5 МБ. В среднем около 1 МБ.
  • Число записей файла данных составляет от 13 строк до 22000 строк. В среднем около 2700 строк.
  • Файл данных должен быть обработан после обработки перед агрегацией.
  • Алгоритм последующей обработки может быть изменен.
  • Последующий файл управляется отдельно с исходным файлом данных, так как алгоритм пост-обработки может быть изменен.
  • Выполняется ежедневная агрегация. Весь файл данных после обработки должен быть отфильтрован по записям и агрегированию (средний, максимальный мин ...).
  • Поскольку агрегация мелкозернистая, количество записей после агрегации не так мало. Это может быть около половины количества исходных записей.
  • В какой-то момент число файлов после обработки может составлять около 200 000.
  • Файл данных должен быть удален отдельно.

В тесте я попытался обработать 160 000 обработанных после обработки файлов Spark, начинающихся с sc.textFile() с помощью glob-пути, с обработкой исключения OutOfMemory в процессе драйвера.

Какова наилучшая практика для обработки данных такого типа? Должен ли я использовать HBase вместо обычных файлов для сохранения данных после обработки?

ответ

8

Мы написали собственный погрузчик. Он решил проблему с небольшими файлами в HDFS. Он использует Hadoop CombineFileInputFormat. В нашем случае это уменьшило количество преобразователей от 100000 до 3000 и сделало работу значительно быстрее.

https://github.com/RetailRocket/SparkMultiTool

Пример:

import ru.retailrocket.spark.multitool.Loaders 
val sessions = Loaders.combineTextFile(sc, "file:///test/*") 
// or val sessions = Loaders.combineTextFile(sc, conf.weblogs(), size = 256, delim = "\n") 
// where size is split size in Megabytes, delim - line break character 
println(sessions.count()) 
+0

Спасибо, что поделились этим. Я думаю, что аргумент размера особенно ценен, поскольку он не может быть указан на coalesce(). – zeodtr

+0

Это решение лучше, чем слияние, потому что оно работает на этапе карты, но сливается после. –

+1

С тех пор hasoop поддерживает CombineTextInputFormat (по крайней мере, от версии 2.2), объединение небольших входных файлов может быть выполнено с помощью sc.newAPIHadoopFile() без реализации пользовательского класса. – zeodtr

3

Я уверен, что причина, по которой вы получаете OOM, связана с обработкой большого количества небольших файлов. Вы хотите объединить входные файлы, чтобы не было так много разделов. Я пытаюсь ограничить свои рабочие места примерно до 10 тыс. Разделов.

После textFile вы можете использовать .coalesce(10000, false) ... не на 100% уверен, что это сработает, потому что прошло какое-то время, так как я сделал это, пожалуйста, дайте мне знать. Поэтому попробуйте

sc.textFile(path).coalesce(10000, false) 
+0

Спасибо! Я попробую. – zeodtr

+0

Это сработало! Фактически я использовал коэффициент коалесценции 1227, который является числом разделов, когда Spark обрабатывает большой одиночный файл, содержащий все записи. Но работа выполняется медленнее (как и ожидалось), и все же кажется, что информация обо всех файлах по-прежнему передается процессу драйвера, что может вызвать OOM, когда задействовано слишком много файлов. Но 1,68 ГБ для процесса драйвера для 168016 файлов не так уж плох. – zeodtr

+0

Ну, у нас есть отдельная простая работа, специально для сокращения количества файлов, так как это такая важная вещь. Как только я должен был запустить его в 5 идет на 5 подмножеств – samthebest

0

Вы можете использовать этот

Сначала вы можете получить буфер/Список S3 Дорожки/То же самое для HDFS или Local Path

Если вы «Попробуйте с Amazon S3:

import scala.collection.JavaConverters._ 
import java.util.ArrayList 
import com.amazonaws.services.s3.AmazonS3Client 
import com.amazonaws.services.s3.model.ObjectListing 
import com.amazonaws.services.s3.model.S3ObjectSummary 
import com.amazonaws.services.s3.model.ListObjectsRequest 

def listFiles(s3_bucket:String, base_prefix : String) = { 
    var files = new ArrayList[String] 

    //S3 Client and List Object Request 
    var s3Client = new AmazonS3Client(); 
    var objectListing: ObjectListing = null; 
    var listObjectsRequest = new ListObjectsRequest(); 

    //Your S3 Bucket 
    listObjectsRequest.setBucketName(s3_bucket) 

    //Your Folder path or Prefix 
    listObjectsRequest.setPrefix(base_prefix) 

    //Adding s3:// to the paths and adding to a list 
    do { 
     objectListing = s3Client.listObjects(listObjectsRequest); 
     for (objectSummary <- objectListing.getObjectSummaries().asScala) { 
     files.add("s3://" + s3_bucket + "/" + objectSummary.getKey()); 
     } 
     listObjectsRequest.setMarker(objectListing.getNextMarker()); 
    } while (objectListing.isTruncated()); 

    //Removing Base Directory Name 
    files.remove(0) 

    //Creating a Scala List for same 
    files.asScala 
    } 

Теперь передать этот объект списка на следующий фрагмент кода, примечание: СБН является объектом SQLContext

var df: DataFrame = null; 
    for (file <- files) { 
    val fileDf= sc.textFile(file) 
    if (df!= null) { 
     df= df.unionAll(fileDf) 
    } else { 
     df= fileDf 
    } 
    } 

Теперь вы получили окончательное Унифицированная РДД т.е. ДФ

Дополнительно, и вы можете также перераспределить его в одном BigRDD

val files = sc.textFile(filename, 1).repartition(1) 

Переформатирование всегда работает: D

Смежные вопросы