2015-11-02 4 views
1

Я написал потоковое задание Flink в Java, которое загружает файл csv, который содержит данные подписчика (4 столбца), а затем считывает данные из потока сокетов при сопоставлении с данными подписчика.Работа с Flink зависает при отправке при загрузке большого файла

Первоначально я использовал небольшой CSV-файл (8 МБ) и все работает отлично:

# flink run analytics-flink.jar 19001 /root/minisubs.csv /root/output.csv 
loaded 200000 subscribers from csv file 
11/02/2015 16:36:59 Job execution switched to status RUNNING. 
11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> Stream Sink(1/1) switched to SCHEDULED 
11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> Stream Sink(1/1) switched to DEPLOYING 
11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> Stream Sink(1/1) switched to RUNNING 

Я включил файл CSV в большей один (~ 45 Мб), и теперь я вижу это :

# flink run analytics-flink.jar 19001 /root/subs.csv /root/output.csv 
loaded 1173547 subscribers from csv file 

Обратите внимание, что число подписчиков выше - количество строк в файле. Я попытался найти любые таймауты в конфигурации Flink, но я не смог их найти.

Любая помощь очень ценится!

Edit: Csv загружается с помощью этого метода путем использования Викисклада CSV 1.2 библиотеки:

private static HashMap<String, String> loadSubscriberGroups(
      String referenceDataFile) throws IOException { 
     HashMap<String,String> subscriberGroups = new HashMap<String, String>(); 

     File csvData = new File(referenceDataFile); 
     CSVParser parser = CSVParser.parse(csvData, Charset.defaultCharset(), CSVFormat.EXCEL); 
     for (CSVRecord csvRecord : parser) { 
      String imsi = csvRecord.get(0); 
      String groupStr = csvRecord.get(3); 

      if(groupStr == null || groupStr.isEmpty()) { 
       continue; 
      } 
      subscriberGroups.put(imsi, groupStr); 
     } 

     return subscriberGroups; 
    } 

и вот образец файла (я знаю, что есть запятая в конце концов, последний столбец пуст на данный момент):

450000000000001,450000000001,7752,Tier-2, 
450000000000002,450000000002,1112,Tier-1, 
450000000000003,450000000003,6058,Tier-2, 
+0

Как выполнить загрузку CSV-файла? Можете ли вы предоставить фрагмент вашей программы, который читает CSV-файл? –

+0

сообщение отредактировано для добавления метода загрузки csv –

+0

Спасибо за обновление. Что происходит с данными CSV? Как вы вводите его в программу Flink? –

ответ

4

От Роберта Meztger (апач разработчик Flink):

Я могу объяснить, почему ваш первый подход не приста k:

Вы пытались отправить CSV-файлы с клиента Flink в кластер , используя нашу систему RPC (Akka). Когда вы отправляете задание Flink, , мы сериализуем все созданные пользователем объекты (mappers, sources, ...) и отправляем их в кластер. Существует метод StreamExecutionEnvironment.fromElements (..), который позволяет пользователям сериализовать несколько объектов вместе с представлением задания. Но сумма данных, которые вы можете передать так, ограничена размером кадра Akka. В нашем случае я думаю, что по умолчанию 10 мегабайт. После этого Akka будет , возможно, просто отбросит или отклонит сообщение о развертывании.

Решение было бы использовать богатый оператор вместо обычного оператора (например, RichMapFunction вместо MapFunction), перекрывая метод Open() и загрузка файла CSV внутри этого метода.

Спасибо, Роберт!

Смежные вопросы