Пожалуйста, смотрите ниже пример кодаБудет эффективный способ вызова запроса HTTP и читать InputStream в искре MapTask
JavaRDD<String> mapRDD = filteredRecords
.map(new Function<String, String>() {
@Override
public String call(String url) throws Exception {
BufferedReader in = null;
URL formatURL = new URL((url.replaceAll("\"", ""))
.trim());
try {
HttpURLConnection con = (HttpURLConnection) formatURL
.openConnection();
in = new BufferedReader(new InputStreamReader(con
.getInputStream()));
return in.readLine();
} finally {
if (in != null) {
in.close();
}
}
}
});
здесь URL является HTTP GET запрос. пример
http://ip:port/cyb/test?event=movie&id=604568837&name=SID×tamp_secs=1460494800×tamp_millis=1461729600000&back_up_id=676700166
Этот код очень медленный. IP и порт являются случайными, и загрузка распределяется, поэтому ip может иметь 20 различных значений с портом, поэтому я не вижу узкое место.
Когда я комментирую
in = new BufferedReader(new InputStreamReader(con
.getInputStream()));
return in.readLine();
код слишком быстро. ПРИМЕЧАНИЕ. Входные данные для обработки - 10 ГБ. Использование искры для чтения с S3.
что-то не так я делаю с BufferedReader или InputStreamReader любой альтернативой. Я не могу использовать foreach в искрах, поскольку мне нужно вернуть ответ с сервера и вам нужно сохранить JAVARdd как textFile на HDFS.
если мы используем mappartition код что-то, как показано ниже
JavaRDD<String> mapRDD = filteredRecords.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {
@Override
public Iterable<String> call(Iterator<String> tuple) throws Exception {
final List<String> rddList = new ArrayList<String>();
Iterable<String> iterable = new Iterable<String>() {
@Override
public Iterator<String> iterator() {
return rddList.iterator();
}
};
while(tuple.hasNext()) {
URL formatURL = new URL((tuple.next().replaceAll("\"", ""))
.trim());
HttpURLConnection con = (HttpURLConnection) formatURL
.openConnection();
try(BufferedReader br = new BufferedReader(new InputStreamReader(con
.getInputStream()))) {
rddList.add(br.readLine());
} catch (IOException ex) {
return rddList;
}
}
return iterable;
}
});
здесь также для каждой записи мы делаем то же самое .. не правда ли?
Да звучит хорошо .. позвольте мне попробовать. спасибо –
Я вижу, что одна проблема здесь обновила кодовый блок –