Я пишу приложение, которое обрабатывает «L» часть процесса ETL. Это довольно простая предпосылка: захватить некоторые файлы из хранилища Google Cloud Platform (где каждый файл содержит данные, соответствующие одной таблице Cassandra, а каждая строка представляет собой одну запись, которая будет вставлена в указанную таблицу) и вставить данные в Cassandra.BufferedReader SocketException: Сброс соединения при вставке данных Cassandra
Самый большой файл (и единственный в настоящее время дает мне вопрос) составляет около 900 КБ (~ 25 тыс. Строк, 4 столбца, может быть, 50 символов). Следующий самый большой файл - около 300 КБ.
Проблема заключается в том, что при попытке вставить записи из файла 900 КБ работа становится примерно на полпути, может быть, немного меньше, прежде чем я получу java.net.SocketException: Connection reset
.
Вот код:
*The GCP related stuff*
Storage storage = StorageFactory.getService();
Storage.Objects.Get listRequest = storage.objects().list(bucketName);
List<StorageObject> results = new ArrayList<>();
Objects objects;
do {
objects = listRequest.execute();
results.addAll(objects.getItems());
listRequest.setPageToken(objects.getNextPageToken());
} while (objects.getNextPageToken() != null);
List<Storage.Objects.Get> files = new ArrayList<>();
for (StorageObject storageObject : storageObjects) {
files.add(storage.objects().get(bucketName, storageObject.getName());
}
*The processing stuff*
for (Storage.Objects.Get file : files) {
file.getMediaHttpDownloader().setDirectDownloadEnabled(true);
try (BufferedReader reader = new BufferedReader(new InputStreamReader(file.executeMediaAsInputStream(), "UTF-8"))) {
String line = reader.readLine();
while (line != null) {
String[] columnData = line.trim().split("\\|");
DomainObject domainObject = convertLineToObject(columnData);
domainObjectRepository.saveObject(domainObject);
line = reader.readLine();
}
} catch (Exception ex) {
log.error("log stuff" + ex.toString);
}
Этот код разбит через несколько различных классов. Попытка предоставить полный код без посторонних деталей. Функция convertLineToObject
просто берет массив String, создает new DomainObject()
и устанавливает каждый индекс в массиве columnData
в соответствующее поле.
Я использую внутреннюю библиотеку, которая создает поставщика DAO и обрабатывает операции с базой данных. domainObjectRepository.saveDomainObject()
вызов только одна строка кода вызова этой библиотеки:
domainObjectDAOProvider.getDAO(DomainObject.class).insert(domainObject);
insert()
вызов строит BoundStatement и вызывает execute()
на это заявление.
Вот трассировки стека я получаю:
java.net.SocketInputStream.read(SocketInputStream.java:209)
java.net.SocketInputStream.read(SocketInputStream.java:141)
sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
sun.security.ssl.InputRecord.readV3Record(InputRecord.java:593)
sun.security.ssl.InputRecord.read(InputRecord.java:532)
sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:973)
sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:930)
sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
java.io.BufferedInputStream.read1(BufferedInputStream.java:284)
java.io.BufferedInputStream.read(BufferedInputStream.java:345)
sun.net.www.MeteredStream.read(MeteredStream.java:134)
java.io.FilterInputStream.read(FilterInputStream.java:133)
sun.net.www.protocol.http.HttpURLConnection$HttpInputStream.read(HttpURLConnection.java:3336)
com.google.api.client.http.javanet.NetHttpResponse$SizeValidatingInputStream.read(NetHttpResponse.java:169)
sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
java.io.InputStreamReader.read(InputStreamReader.java:184)
java.io.BufferedReader.fill(BufferedReader.java:161)
java.io.BufferedReader.readLine(BufferedReader.java:324)
java.io.BufferedReader.readLine(BufferedReader.java:389)
*The application call - referring to line = reader.readLine()*
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43), java.lang.reflect.Method.invoke(Method.java:497)
org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:65)
org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
Когда я комментирую только база данных называют это летит через файл в не время квартиры. Когда этот вызов активен, производительность медленнее, и он выбрасывает это исключение где-то между строками 8000 и 11000. Я проверил данные в файле, и все в порядке. Никаких искаженных данных, никаких странностей там вообще.