2016-09-02 3 views
0

У меня есть функция Java Lambda, которая запускается S3 Event каждые 15 минут. Я заметил, что в течение примерно трех часов каждый вызов Lambda включает в себя последний загруженный файл И все файлы, которые были загружены перед ним в течение 3-х часовых интервалов времени.Файлы S3 обрабатываются несколько раз в AWS Lambda

Итак, если при повторении через весь список он повторяет файлы, которые уже были обработаны при более раннем вызове лямбда.

Как я могу обработать только последний загруженный файл? В node.js существует context.suceed(), который я предполагаю, что это событие успешно обработано. Java, похоже, не имеет этого.

Ниже приведены журналы Cloudwatch.

08:35:16 START RequestId: 56c0dc17-6f77-11e6-a102-7517541f4ac3 Version: $LATEST 
08:35:26 TIME - AUTHENTICATE: 8101ms 
08:35:26 TIME - MESSAGE PARSE: 1ms 
08:35:26 data :: event/events/2016/ 08/31/2016 0831123000.export.csv 
08:35:35 Processed 147 events 
08:35:35 TIME - FILE PARSE: 9698 
08:35:35 Found 1 event files 
08:35:35 Total function took: 17800ms 
08:35:35 END RequestId: 56c0dc17-6f77-11e6-a102-7517541f4ac3 
08:35:35 REPORT RequestId: 56c0dc17-6f77-11e6-a102-7517541f4ac3 Duration: 19403.67 ms Billed Duration: 19500 ms Memory Size: 192 MB Max Memory Used: 116 MB 
08:45:03 START RequestId: bcb8e064-6f78-11e6-baea-a312004d2418 Version: $LATEST 
08:45:03 TIME - AUTHENTICATE: 119ms 
08:45:03 TIME - MESSAGE PARSE: 0ms 
08:45:03 data :: event/events/2016/ 08/31/2016 0831123000.export.csv 
08:45:05 Processed 147 events 
08:45:05 data :: event/events/2016/ 08/31/2016 0831124500.export.csv 
08:45:06 Processed 211 events 
08:45:06 TIME - FILE PARSE: 2499 
08:45:06 Found 2 event files 
08:45:06 Total function took: 2618ms 
08:45:06 END RequestId: bcb8e064-6f78-11e6-baea-a312004d2418 
08:45:06 REPORT RequestId: bcb8e064-6f78-11e6-baea-a312004d2418 Duration: 2796.25 ms Billed Duration: 2800 ms Memory Size: 192 MB Max Memory Used: 116 MB 
09:05:02 START RequestId: 8747aa 08-6f7b-11e6-80fd-f30a15cf07fc Version: $LATEST 
09:05:02 TIME - AUTHENTICATE: 98ms 
09:05:02 TIME - MESSAGE PARSE: 0ms 
09:05:02 data :: event/events/2016/ 08/31/2016 0831123000.export.csv 
09:05:03 Processed 147 events 
09:05:03 data :: event/events/2016/ 08/31/2016 0831124500.export.csv 
09:05:04 Processed 211 events 
09:05:04 data :: event/events/2016/ 08/31/2016 0831130000.export.csv 
09:05:04 Processed 204 events 
09:05:04 TIME - FILE PARSE: 2242 
09:05:04 Found 3 event files 
09:05:04 Total function took: 2340ms 
09:05:04 END RequestId: 8747aa 08-6f7b-11e6-80fd-f30a15cf07fc 

EDIT 1 Я считаю, что этот вопрос был дан ответ Майкл, однако ниже часть кода для всех остальных. Я действительно использую глобальный список для хранения записей.

общественного класса LambdaHandler {

private final List<GDELTEventFile> eventFiles = new ArrayList<>(); 
private AmazonS3Client s3Client; 
private final CSVFormat CSV_FORMAT = CSVFormat.TDF.withIgnoreEmptyLines().withTrim(); 

public void gdeltHandler(S3Event event, Context context) { 
    StopWatch sw = new StopWatch(); 
    long time = 0L; 

    sw.start(); 
    s3Client = new AmazonS3Client(new EnvironmentVariableCredentialsProvider()); 
    sw.split(); 
    System.out.println("TIME - AUTHENTICATE: " + sw.getSplitTime() + "ms"); 
    time += sw.getSplitTime(); 
    sw.reset(); 

    sw.start(); 
    processEvent(event); 
    sw.split(); 
    System.out.println("TIME - MESSAGE PARSE: " + sw.getSplitTime() + "ms"); 
    time += sw.getSplitTime(); 
    sw.reset(); 

    sw.start(); 
    processFiles(); 
    sw.split(); 
    System.out.println("TIME - FILE PARSE: " + sw.getSplitTime()); 
    time += sw.getSplitTime(); 

    System.out.println("Found " + eventFiles.size() + " event files"); 
    System.out.println("Total function took: " + time + "ms"); 
} 

private void processEvent(S3Event event) { 
    List<S3EventNotification.S3EventNotificationRecord> records = event.getRecords(); 
    for (S3EventNotification.S3EventNotificationRecord record : records) { 
     long filesize = record.getS3().getObject().getSizeAsLong(); 
     eventFiles.add(new GDELTEventFile(record.getS3().getBucket().getName(), record.getS3().getObject().getKey(), filesize)); 
    } 
} 

private void processFiles() { 
    for (GDELTEventFile event : eventFiles) { 
     try { 
      System.out.println(event.getBucket() + " :: " + event.getFilename()); 
      GetObjectRequest request = new GetObjectRequest(event.getBucket(), event.getFilename()); 
      S3Object file = s3Client.getObject(request); 
      try (BufferedReader reader = new BufferedReader(new InputStreamReader(file.getObjectContent()))) { 
       CSVParser parser = new CSVParser(reader, CSV_FORMAT); 
       int count = 0; 
       for (CSVRecord record : parser) { 
         count++; 
        } 
       } 
       System.out.println("Processed " + count + " events"); 
      } 
     } catch (IOException ioe) { 
      System.out.println("IOException :: " + ioe); 
     } 
    } 
} 
+0

Где находится информация в строках 'data ::'? Я никогда не видел, чтобы S3 отправлял больше одной записи за событие ... на самом деле мой код предназначен для остановки и исключения, если это когда-либо, потому что это неожиданно. Я подозреваю, что вам не удалось объяснить возможное повторное использование контейнера. Если событие Lambda повторно использует один и тот же контейнер из предыдущего вызова, возможно, у вас есть глобальная структура данных, которая удерживает старые события? –

+0

Привет, Майкл. Если вы спрашиваете, где печатается фактическая строка, напечатайте эти строки внутри функции. Это просто System.out.println. Сам файл генерируется службой gdelt каждые 15 минут, и я загружаю его на экземпляр ec2, а затем загружаю его через cli. Как мне учитывать повторное использование контейнера в лямбда? Никогда не слышал об этом раньше ... – Brooks

+0

Когда я говорю «учетную запись» для повторного использования контейнера, я имею в виду [помните, что это вещь] (https://aws.amazon.com/blogs/compute/container-reuse-in- lambda /) и код соответственно. Каждый раз, когда срабатывает функция, он запускается в незанятый процесс, запускающий вашу функцию Lambda ... но это может быть новый или может быть тем же самым * процессом, который запускал функцию в последний раз или некоторое время до этого, будучи повторно использован (как долго так как вы не загрузили новый код).Если вы нажимаете записи из S3 на глобальный массив, а не на какой-либо объект для вашего обработчика (например), тогда, когда вы повторяете этот массив, он будет * иногда * содержать старые события. –

ответ

3

Это случай кода, который выходит важный аспект container reuse Lambda в - повторное использование контейнера в Lambda включает в себя повторное использование процесса. Когда функция выполняется в повторно используемом контейнере, она также обязательно работает в том же процессе, который использовался и раньше.

Структура данных уведомления о событиях S3 такова, что она может включать в себя более одного объекта за одно событие, но я практикую, это никогда не происходит ... но толкание данных события в глобальную структуру означает, что если контейнер повторно используется, то более поздние вызовы функций будут видеть старые данные.

Хотя это может быть очень полезно в качестве кеша, оно имеет существенные последствия для того, как должен быть разработан код - всегда ожидайте, но никогда не предполагайте, что ваш процесс может пережить от одного вызова к будущему, последующему вызову и коду соответственно.

Обратите внимание, что повторное использование контейнера также означает, что вам нужно очистить любые временные файлы, если есть вероятность, что много повторений контейнера приведут к расширению пространства.

Обратите внимание, что повторное развертывание кода функции всегда означает, что старые контейнеры будут оставлены, а не повторно использованы для будущих вызовов последней версии.

Смежные вопросы