2016-08-11 4 views
0

Я пишу приложение, которое обрабатывает «L» часть процесса ETL. Это довольно простая предпосылка: захватить некоторые файлы из хранилища Google Cloud Platform (где каждый файл содержит данные, соответствующие одной таблице Cassandra, а каждая строка представляет собой одну запись, которая будет вставлена ​​в указанную таблицу) и вставить данные в Cassandra.BufferedReader SocketException: Сброс соединения при вставке данных Cassandra

Самый большой файл (и единственный в настоящее время дает мне вопрос) составляет около 900 КБ (~ 25 тыс. Строк, 4 столбца, может быть, 50 символов). Следующий самый большой файл - около 300 КБ.

Проблема заключается в том, что при попытке вставить записи из файла 900 КБ работа становится примерно на полпути, может быть, немного меньше, прежде чем я получу java.net.SocketException: Connection reset.

Вот код:

*The GCP related stuff* 
Storage storage = StorageFactory.getService(); 
Storage.Objects.Get listRequest = storage.objects().list(bucketName); 
List<StorageObject> results = new ArrayList<>(); 
Objects objects; 

do { 
    objects = listRequest.execute(); 
    results.addAll(objects.getItems()); 
    listRequest.setPageToken(objects.getNextPageToken()); 
} while (objects.getNextPageToken() != null); 

List<Storage.Objects.Get> files = new ArrayList<>(); 
for (StorageObject storageObject : storageObjects) { 
    files.add(storage.objects().get(bucketName, storageObject.getName()); 
} 

*The processing stuff* 
for (Storage.Objects.Get file : files) { 
    file.getMediaHttpDownloader().setDirectDownloadEnabled(true); 
    try (BufferedReader reader = new BufferedReader(new InputStreamReader(file.executeMediaAsInputStream(), "UTF-8"))) { 
     String line = reader.readLine(); 
     while (line != null) { 
      String[] columnData = line.trim().split("\\|"); 
      DomainObject domainObject = convertLineToObject(columnData); 
      domainObjectRepository.saveObject(domainObject); 
      line = reader.readLine(); 
     } 
    } catch (Exception ex) { 
     log.error("log stuff" + ex.toString); 
    } 

Этот код разбит через несколько различных классов. Попытка предоставить полный код без посторонних деталей. Функция convertLineToObject просто берет массив String, создает new DomainObject() и устанавливает каждый индекс в массиве columnData в соответствующее поле.

Я использую внутреннюю библиотеку, которая создает поставщика DAO и обрабатывает операции с базой данных. domainObjectRepository.saveDomainObject() вызов только одна строка кода вызова этой библиотеки:

domainObjectDAOProvider.getDAO(DomainObject.class).insert(domainObject); 

insert() вызов строит BoundStatement и вызывает execute() на это заявление.

Вот трассировки стека я получаю:

java.net.SocketInputStream.read(SocketInputStream.java:209) 
java.net.SocketInputStream.read(SocketInputStream.java:141) 
sun.security.ssl.InputRecord.readFully(InputRecord.java:465) 
sun.security.ssl.InputRecord.readV3Record(InputRecord.java:593) 
sun.security.ssl.InputRecord.read(InputRecord.java:532) 
sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:973) 
sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:930) 
sun.security.ssl.AppInputStream.read(AppInputStream.java:105) 
java.io.BufferedInputStream.read1(BufferedInputStream.java:284) 
java.io.BufferedInputStream.read(BufferedInputStream.java:345) 
sun.net.www.MeteredStream.read(MeteredStream.java:134) 
java.io.FilterInputStream.read(FilterInputStream.java:133) 
sun.net.www.protocol.http.HttpURLConnection$HttpInputStream.read(HttpURLConnection.java:3336) 
com.google.api.client.http.javanet.NetHttpResponse$SizeValidatingInputStream.read(NetHttpResponse.java:169) 
sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284) 
sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326) 
sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) 
java.io.InputStreamReader.read(InputStreamReader.java:184) 
java.io.BufferedReader.fill(BufferedReader.java:161) 
java.io.BufferedReader.readLine(BufferedReader.java:324) 
java.io.BufferedReader.readLine(BufferedReader.java:389) 
*The application call - referring to line = reader.readLine()* 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43), java.lang.reflect.Method.invoke(Method.java:497) 
org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:65) 
org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
java.lang.Thread.run(Thread.java:745) 

Когда я комментирую только база данных называют это летит через файл в не время квартиры. Когда этот вызов активен, производительность медленнее, и он выбрасывает это исключение где-то между строками 8000 и 11000. Я проверил данные в файле, и все в порядке. Никаких искаженных данных, никаких странностей там вообще.

ответ

0

Если вы оставите соединение открытым достаточно долго, не потребляя больше данных, Google Cloud Storage завершит соединение как незанятое. Если ваши вызовы базы данных занимают слишком много времени, кешируйте входные данные локально. Вы также могли бы, предположительно, создать класс-оболочку, который произвел новый InputStreamReader s при последнем смещении чтения, когда вы получите SocketException с использованием MediaHttpDownloader.setBytesDownloaded при последующих вызовах, но это отбрасывает как клиентские, так и серверные ресурсы, поддерживающие соединение дольше, чем необходимо.

Смежные вопросы