2017-02-16 3 views
1

, так что эта проблема заводила меня с ума, и она начинает ощущаться как искра с s3 - не лучший инструмент для этой конкретной работы. В принципе, у меня есть миллионы файлов меньшего размера в ведро s3. По причинам, в которые я не могу вдаваться, эти файлы не могут быть объединены (они являются уникальными зашифрованными стенограммами). Я видел подобные вопросы, как этот, и каждое отдельное решение не дало хороших результатов. Первое, что я попытался было джокеры:Как справиться с миллионами меньших файлов s3 с искривлением apache

sc.wholeTextFiles(s3aPath + "/*/*/*/*.txt").count(); 

Примечание: граф был более доводка на сколько времени потребуется для обработки файлов. Эта работа почти заняла целый день с более чем 10 экземплярами, но все еще не удалась с ошибкой, размещенной в нижней части списка. Затем я нашел эту ссылку, где она в основном говорила, что это не оптимально: https://forums.databricks.com/questions/480/how-do-i-ingest-a-large-number-of-files-from-s3-my.html

Затем я решил попробовать другое решение, которое я не могу найти в данный момент, который сказал загрузить все пути, а затем объединить все от РДУ

ObjectListing objectListing = s3Client.listObjects(bucket); 
    List<JavaPairRDD<String, String>> rdds = new ArrayList<>(); 
    List<JavaPairRDD<String, String>> tempMeta = new ArrayList<>(); 
    //initializes objectListing 
    tempMeta.addAll(objectListing.getObjectSummaries().stream() 
      .map(func) 
      .filter(item -> item != null && item.getMediaType().equalsIgnoreCase("transcript")) 
      .map(item -> SparkConfig.getSparkContext().wholeTextFiles("s3a://" + bucket + "/" + item.getFileName())) 
      .collect(Collectors.toList())); 

    while(objectListing.isTruncated()) { 
     objectListing = s3Client.listNextBatchOfObjects(objectListing); 
     tempMeta.addAll(objectListing.getObjectSummaries().stream() 
       .map(func) 
       .filter(item -> item != null && item.getMediaType().equalsIgnoreCase("transcript")) 
       .map(item -> SparkConfig.getSparkContext().wholeTextFiles("s3a://" + bucket + "/" + item.getFileName())) 
       .collect(Collectors.toList())); 
     if (tempMeta.size() > 5000) { 
      rdds.addAll(tempMeta); 
      tempMeta = new ArrayList<>(); 
     } 
    } 

    if (!tempMeta.isEmpty()){ 
     rdds.addAll(tempMeta); 
    } 
    return SparkConfig.getSparkContext().union(rdds.get(0), rdds.subList(1, rdds.size())); 

Тогда, даже когда я установил установить emrfs-сайт конфигурации для:

{ 
    "Classification": "emrfs-site", 
    "Properties": { 
     "fs.s3.consistent.retryPolicyType": "fixed", 
     "fs.s3.consistent.retryPeriodSeconds": "15", 
     "fs.s3.consistent.retryCount": "20", 
     "fs.s3.enableServerSideEncryption": "true", 
     "fs.s3.consistent": "false" 
    } 
} 

Я получил эту ошибку в течение 6 часов каждый раз, когда я пытался запустить работу:

17/02/15 19:15:41 INFO AmazonHttpClient: Unable to execute HTTP request: randomBucket.s3.amazonaws.com:443 failed to respond 
org.apache.http.NoHttpResponseException: randomBucket.s3.amazonaws.com:443 failed to respond 

Итак, во-первых, есть ли способ использовать файлы меньшего размера с искровым из s3? Меня не волнует, является ли решение субоптимальным, я просто хочу попробовать что-то работать. Я подумал о попытке искрообразования, поскольку его внутренности немного отличаются от загрузки всех файлов. Затем я использовал fileStream и установил newFiles в false. Тогда я мог бы их обработать. Тем не менее, это не то, для чего искровой поток был построен, поэтому я не согласен с тем, чтобы идти по этому маршруту.

В качестве побочного примечания я сгенерировал миллионы небольших файлов в hdfs и пробовал одну и ту же работу, и она закончилась в течение часа. Это заставляет меня чувствовать, что это s3 специфично. Кроме того, я использую s3a, а не обычный s3.

ответ

4

Если вы используете EMR Amazon, вам нужно использовать URL-адреса s3: //; s3a: // - для релизов ASF.

Большая проблема заключается в том, сколько времени требуется, чтобы перечислить деревья каталогов в s3, особенно в том, что рекурсивное дерево перемещается. Искровный код предполагает свою быструю файловую систему, в которой перечисление каталогов и заявляющих файлов является низкой стоимостью, тогда как на самом деле каждая операция занимает 1-4 HTTPS-запроса, что даже при повторных использования соединений HTTP/1.1 болит. Это может быть так медленно вы can see the pauses in the log.

Там, где это действительно больно, это то, что это передний фронт, где происходит большая задержка, поэтому это сериализованный бит работы, который доводится до его колен.

Хотя есть некоторое убыстрение в treewalking на S3a приходит в Hadoop 2.8 как часть the S3a phase II work, шаблонные сканы / формы /*.txt не собираются, чтобы получить какое-либо ускорение. Моя рекомендация состоит в том, чтобы попытаться сгладить структуру вашего каталога, чтобы вы переместились с глубокого дерева на нечто мелкое, может быть, даже все в одном каталоге, чтобы его можно было сканировать без прогулки, за счет 1 HTTP-запроса на 5000 записей ,

Имейте ввиду, что многие небольшие файлы в любом случае довольно дороги, в том числе в HDFS, где они используют хранилище. Существует специальный формат агрегатов, HAR-файлы, которые похожи на файлы tar, за исключением того, что hasoop, hive и spark могут работать внутри самого файла.То, что может помочь, хотя я не видел никаких реальных показателей производительности.

+0

Отличный ответ, спасибо Стив. Я буду экспериментировать с ним сегодня. –

+0

без проблем. Из любопытства - и чтобы помочь сформировать будущие перфекционные тесты - каково ваше разбиение по каталогам? что-то вроде ГГГГ/ММ/ДД, например 2017/01/23 /? –

Смежные вопросы