Я написал потоковое задание Flink в Java, которое загружает файл csv, который содержит данные подписчика (4 столбца), а затем считывает данные из потока сокетов при сопоставлении с данными подписчика.Работа с Flink зависает при отправке при загрузке большого файла
Первоначально я использовал небольшой CSV-файл (8 МБ) и все работает отлично:
# flink run analytics-flink.jar 19001 /root/minisubs.csv /root/output.csv
loaded 200000 subscribers from csv file
11/02/2015 16:36:59 Job execution switched to status RUNNING.
11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> Stream Sink(1/1) switched to SCHEDULED
11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> Stream Sink(1/1) switched to DEPLOYING
11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> Stream Sink(1/1) switched to RUNNING
Я включил файл CSV в большей один (~ 45 Мб), и теперь я вижу это :
# flink run analytics-flink.jar 19001 /root/subs.csv /root/output.csv
loaded 1173547 subscribers from csv file
Обратите внимание, что число подписчиков выше - количество строк в файле. Я попытался найти любые таймауты в конфигурации Flink, но я не смог их найти.
Любая помощь очень ценится!
Edit: Csv загружается с помощью этого метода путем использования Викисклада CSV 1.2 библиотеки:
private static HashMap<String, String> loadSubscriberGroups(
String referenceDataFile) throws IOException {
HashMap<String,String> subscriberGroups = new HashMap<String, String>();
File csvData = new File(referenceDataFile);
CSVParser parser = CSVParser.parse(csvData, Charset.defaultCharset(), CSVFormat.EXCEL);
for (CSVRecord csvRecord : parser) {
String imsi = csvRecord.get(0);
String groupStr = csvRecord.get(3);
if(groupStr == null || groupStr.isEmpty()) {
continue;
}
subscriberGroups.put(imsi, groupStr);
}
return subscriberGroups;
}
и вот образец файла (я знаю, что есть запятая в конце концов, последний столбец пуст на данный момент):
450000000000001,450000000001,7752,Tier-2,
450000000000002,450000000002,1112,Tier-1,
450000000000003,450000000003,6058,Tier-2,
Как выполнить загрузку CSV-файла? Можете ли вы предоставить фрагмент вашей программы, который читает CSV-файл? –
сообщение отредактировано для добавления метода загрузки csv –
Спасибо за обновление. Что происходит с данными CSV? Как вы вводите его в программу Flink? –