, так что эта проблема заводила меня с ума, и она начинает ощущаться как искра с s3 - не лучший инструмент для этой конкретной работы. В принципе, у меня есть миллионы файлов меньшего размера в ведро s3. По причинам, в которые я не могу вдаваться, эти файлы не могут быть объединены (они являются уникальными зашифрованными стенограммами). Я видел подобные вопросы, как этот, и каждое отдельное решение не дало хороших результатов. Первое, что я попытался было джокеры:Как справиться с миллионами меньших файлов s3 с искривлением apache
sc.wholeTextFiles(s3aPath + "/*/*/*/*.txt").count();
Примечание: граф был более доводка на сколько времени потребуется для обработки файлов. Эта работа почти заняла целый день с более чем 10 экземплярами, но все еще не удалась с ошибкой, размещенной в нижней части списка. Затем я нашел эту ссылку, где она в основном говорила, что это не оптимально: https://forums.databricks.com/questions/480/how-do-i-ingest-a-large-number-of-files-from-s3-my.html
Затем я решил попробовать другое решение, которое я не могу найти в данный момент, который сказал загрузить все пути, а затем объединить все от РДУ
ObjectListing objectListing = s3Client.listObjects(bucket);
List<JavaPairRDD<String, String>> rdds = new ArrayList<>();
List<JavaPairRDD<String, String>> tempMeta = new ArrayList<>();
//initializes objectListing
tempMeta.addAll(objectListing.getObjectSummaries().stream()
.map(func)
.filter(item -> item != null && item.getMediaType().equalsIgnoreCase("transcript"))
.map(item -> SparkConfig.getSparkContext().wholeTextFiles("s3a://" + bucket + "/" + item.getFileName()))
.collect(Collectors.toList()));
while(objectListing.isTruncated()) {
objectListing = s3Client.listNextBatchOfObjects(objectListing);
tempMeta.addAll(objectListing.getObjectSummaries().stream()
.map(func)
.filter(item -> item != null && item.getMediaType().equalsIgnoreCase("transcript"))
.map(item -> SparkConfig.getSparkContext().wholeTextFiles("s3a://" + bucket + "/" + item.getFileName()))
.collect(Collectors.toList()));
if (tempMeta.size() > 5000) {
rdds.addAll(tempMeta);
tempMeta = new ArrayList<>();
}
}
if (!tempMeta.isEmpty()){
rdds.addAll(tempMeta);
}
return SparkConfig.getSparkContext().union(rdds.get(0), rdds.subList(1, rdds.size()));
Тогда, даже когда я установил установить emrfs-сайт конфигурации для:
{
"Classification": "emrfs-site",
"Properties": {
"fs.s3.consistent.retryPolicyType": "fixed",
"fs.s3.consistent.retryPeriodSeconds": "15",
"fs.s3.consistent.retryCount": "20",
"fs.s3.enableServerSideEncryption": "true",
"fs.s3.consistent": "false"
}
}
Я получил эту ошибку в течение 6 часов каждый раз, когда я пытался запустить работу:
17/02/15 19:15:41 INFO AmazonHttpClient: Unable to execute HTTP request: randomBucket.s3.amazonaws.com:443 failed to respond
org.apache.http.NoHttpResponseException: randomBucket.s3.amazonaws.com:443 failed to respond
Итак, во-первых, есть ли способ использовать файлы меньшего размера с искровым из s3? Меня не волнует, является ли решение субоптимальным, я просто хочу попробовать что-то работать. Я подумал о попытке искрообразования, поскольку его внутренности немного отличаются от загрузки всех файлов. Затем я использовал fileStream и установил newFiles в false. Тогда я мог бы их обработать. Тем не менее, это не то, для чего искровой поток был построен, поэтому я не согласен с тем, чтобы идти по этому маршруту.
В качестве побочного примечания я сгенерировал миллионы небольших файлов в hdfs и пробовал одну и ту же работу, и она закончилась в течение часа. Это заставляет меня чувствовать, что это s3 специфично. Кроме того, я использую s3a, а не обычный s3.
Отличный ответ, спасибо Стив. Я буду экспериментировать с ним сегодня. –
без проблем. Из любопытства - и чтобы помочь сформировать будущие перфекционные тесты - каково ваше разбиение по каталогам? что-то вроде ГГГГ/ММ/ДД, например 2017/01/23 /? –