2017-01-09 1 views
1

Пожалуйста, смотрите ниже пример кодаБудет эффективный способ вызова запроса HTTP и читать InputStream в искре MapTask

JavaRDD<String> mapRDD = filteredRecords 
      .map(new Function<String, String>() { 

       @Override 
       public String call(String url) throws Exception { 
        BufferedReader in = null; 
        URL formatURL = new URL((url.replaceAll("\"", "")) 
          .trim()); 
        try { 
         HttpURLConnection con = (HttpURLConnection) formatURL 
           .openConnection(); 
         in = new BufferedReader(new InputStreamReader(con 
           .getInputStream())); 

         return in.readLine(); 
        } finally { 
         if (in != null) { 
          in.close(); 
         } 
        } 
       } 
      }); 

здесь URL является HTTP GET запрос. пример

http://ip:port/cyb/test?event=movie&id=604568837&name=SID&timestamp_secs=1460494800&timestamp_millis=1461729600000&back_up_id=676700166 

Этот код очень медленный. IP и порт являются случайными, и загрузка распределяется, поэтому ip может иметь 20 различных значений с портом, поэтому я не вижу узкое место.

Когда я комментирую

in = new BufferedReader(new InputStreamReader(con 
          .getInputStream())); 

        return in.readLine(); 

код слишком быстро. ПРИМЕЧАНИЕ. Входные данные для обработки - 10 ГБ. Использование искры для чтения с S3.

что-то не так я делаю с BufferedReader или InputStreamReader любой альтернативой. Я не могу использовать foreach в искрах, поскольку мне нужно вернуть ответ с сервера и вам нужно сохранить JAVARdd как textFile на HDFS.

если мы используем mappartition код что-то, как показано ниже

JavaRDD<String> mapRDD = filteredRecords.mapPartitions(new FlatMapFunction<Iterator<String>, String>() { 

     @Override 
     public Iterable<String> call(Iterator<String> tuple) throws Exception { 

      final List<String> rddList = new ArrayList<String>(); 
      Iterable<String> iterable = new Iterable<String>() { 

       @Override 
       public Iterator<String> iterator() { 
        return rddList.iterator(); 
       } 
      }; 
      while(tuple.hasNext()) { 
       URL formatURL = new URL((tuple.next().replaceAll("\"", "")) 
         .trim()); 
       HttpURLConnection con = (HttpURLConnection) formatURL 
         .openConnection(); 
       try(BufferedReader br = new BufferedReader(new InputStreamReader(con 
         .getInputStream()))) { 

        rddList.add(br.readLine()); 

       } catch (IOException ex) { 
        return rddList; 
       } 
      } 
      return iterable; 
     } 
    }); 

здесь также для каждой записи мы делаем то же самое .. не правда ли?

ответ

1

В настоящее время вы используете

карты функцию

который создает запрос на URL-адрес для каждой строки в перегородке.

Вы можете использовать

mapPartition

который сделает код работать быстрее, поскольку он создает соединение с сервером только один раз, то есть только одно соединение на перегородке.

+0

Да звучит хорошо .. позвольте мне попробовать. спасибо –

+0

Я вижу, что одна проблема здесь обновила кодовый блок –

0

Большие затраты здесь связаны с подключением TCP/HTTPS. Это усугубляется тем фактом, что даже если вы только читаете первую (короткую) строку большого файла, в попытке повторно использовать соединения HTTP/1.1 лучше, современные HTTP-клиенты пытаются прочитать() в конце файла , поэтому избегайте прерывания соединения. Это хорошая стратегия для небольших файлов, но не для тех, что в MB.

Существует решение: установите длину содержимого на чтение, чтобы читать только меньший блок, уменьшая стоимость функции close(); переработка соединения затем снижает затраты на установку HTTPS. Это то, что делает последний клиент Hadoop/Spark S3A, если вы устанавливаете fadvise = random в соединении: запрашивает блоки, а не весь файл с несколькими GB. Имейте в виду, что: этот дизайн на самом деле очень плохой, если вы отправляете байты за байтом через файл ...

Смежные вопросы