2016-10-12 4 views
3

У меня есть функция Lambda в Node.js, которая обрабатывает новые изображения, добавленные в мое ведро. Я хочу запустить функцию для всех существующих объектов. Как я могу это сделать? Я решил, что самый простой способ - «переложить» каждый объект, вызвать функцию, но я не уверен, как это сделать.Запуск функции Lambda S3-put-triggered на существующих объектах S3?

Чтобы быть ясным - я хочу запускать один раз на каждом из существующих объектов. Триггер уже работает для новых объектов, мне просто нужно запустить его на объектах, которые были вставлены до была создана функция лямбда.

+0

Пожалуйста, позвольте мне правильно понять ваше требование, именно то, что вам нужно, - это то, что каждый раз, когда вы добавляете изображение в свое ведро, вы хотите перерабатывать ВСЕ существующие изображения? или сколько раз вы хотите обрабатывать каждое изображение? – imTachu

+0

@ TachúSalamanca Обновлено OP –

ответ

3

Следующая функция Lambda будет делать то, что вам требуется.

Он будет перебирать каждый файл в целевом ведомом S3 и для каждого из них будет выполнять желаемую функцию лямбда против него, эмулируя операцию put.

Вы, вероятно, будете хотеть, чтобы поставить очень долго надбавку время выполнения против этой функции

var TARGET_BUCKET="my-bucket-goes-here"; 
var TARGET_LAMBDA_FUNCTION_NAME="TestFunct"; 
var S3_PUT_SIMULATION_PARAMS={ 
    "Records": [ 
    { 
     "eventVersion": "2.0", 
     "eventTime": "1970-01-01T00:00:00.000Z", 
     "requestParameters": { 
     "sourceIPAddress": "127.0.0.1" 
     }, 
     "s3": { 
     "configurationId": "testConfigRule", 
     "object": { 
      "eTag": "abcdefabcdef", 
      "sequencer": "0A1B2C3D4E5F678901", 
      "key": "HappyFace.jpg", 
      "size": 1024 
     }, 
     "bucket": { 
      "arn": "arn:aws:s3:::mybucket", 
      "name": "sourcebucket", 
      "ownerIdentity": { 
      "principalId": "EXAMPLE" 
      } 
     }, 
     "s3SchemaVersion": "1.0" 
     }, 
     "responseElements": { 
     "x-amz-id-2": "EXAMPLE123/5678abcdefghijklambdaisawesome/mnopqrstuvwxyzABCDEFGH", 
     "x-amz-request-id": "EXAMPLE123456789" 
     }, 
     "awsRegion": "us-east-1", 
     "eventName": "ObjectCreated:Put", 
     "userIdentity": { 
     "principalId": "EXAMPLE" 
     }, 
     "eventSource": "aws:s3" 
    } 
    ] 
}; 

var aws = require('aws-sdk'); 
var s3 = new aws.S3(); 
var lambda = new aws.Lambda(); 


exports.handler = (event, context, callback) => { 
    retrieveS3BucketContents(TARGET_BUCKET, function(s3Objects){ 
     simulateS3PutOperation(TARGET_BUCKET, s3Objects, simulateS3PutOperation, function(){ 
      console.log("complete."); 
     }); 
    }); 
}; 

function retrieveS3BucketContents(bucket, callback){ 
    s3.listObjectsV2({ 
     Bucket: TARGET_BUCKET 
    }, function(err, data) { 
     callback(data.Contents); 
    }); 
} 

function simulateS3PutOperation(bucket, s3ObjectStack, callback, callbackEmpty){ 
    var params = { 
     FunctionName: TARGET_LAMBDA_FUNCTION_NAME, 
     Payload: "" 
    }; 

    if(s3ObjectStack.length > 0){ 
     var s3Obj = s3ObjectStack.pop(); 
     var p = S3_PUT_SIMULATION_PARAMS; 
     p.Records[0].s3.bucket.name = bucket; 
     p.Records[0].s3.object.key = s3Obj.Key; 
     params.Payload = JSON.stringify(p, null, 2); 
     lambda.invoke(params, function(err, data) { 
      if (err) console.log(err, err.stack); // an error occurred 
      else{ 
       callback(bucket, s3ObjectStack, callback, callbackEmpty); 
      } 
     }); 
    } 
    else{ 
     callbackEmpty(); 
    } 
} 

Ниже приводится полный курс, что ваш лямбда-запрос нужно будет выполнить этот метод, он позволяет R/W к журналам CloudWatch и доступу ListObject к S3. Вам необходимо заполнить свои данные ведро, где вы видите MY-КОВШ-GOES-HERE

{ 
    "Version": "2012-10-17", 
    "Statement": [ 
     { 
      "Sid": "Stmt1477382207000", 
      "Effect": "Allow", 
      "Action": [ 
       "s3:ListBucket" 
      ], 
      "Resource": [ 
       "arn:aws:s3:::MY-BUCKET-GOES-HERE/*" 
      ] 
     }, 
     { 
     "Effect": "Allow", 
     "Action": [ 
     "logs:CreateLogGroup", 
     "logs:CreateLogStream", 
     "logs:PutLogEvents" 
     ], 
     "Resource": "arn:aws:logs:*:*:*" 
    } 
    ] 
} 
+0

Точно о чем я думал, спасибо! Ps. ваш сайт не работает. –

+0

Спасибо Ротшильд! Я понятия не имел ... Привет! –

+0

Эй, Ксавье, я получаю доступ к этому, без каких-либо объяснений. Какая политика роли мне нужна? Я парень-героику, так извиняюсь, но лямбда слишком хороша, чтобы уйти. –

0

хорошо в основном то, что вам нужно использовать некоторые API вызовов (Boto, например, если вы используете Python) и перечислить все новые объекты или все объекты в s3 ведро, а затем обработать эти объекты

вот фрагмент:

from boto.s3.connection import S3Connection 

conn = S3Connection() 
source = conn.get_bucket(src_bucket) 
src_list = set([key.name for key in source.get_all_keys(headers=None, prefix=prefix)]) 
//and then you can go over this src list 
for entry in src_list: 
    do something 
+0

Я не уверен, что это точно ответит на вопрос. Бит, за которым я застрял, запускает функцию Lambda на объектах. Кажется, что он запускается только с помощью триггера, то есть вставки объектов. Можно ли получить все объекты, а затем вызвать функцию лямбда на каждом из них? Кстати, я использую Node. –

+0

Чтобы уточнить, функция преобразует изображение в различные размеры. Уже есть фотографии в ведре, прежде чем я внедрил эту функцию, и мне нужно запустить ее на этих объектах. –

+0

, чтобы вы могли запускать функцию при вставке объектов, а затем запускать ее на всех объектах в ведре? –

0

Что вам нужно сделать, это создать один раз скрипт, который использует в AWS SDK для вызова вашей функции лямбды. Это решение не требует от вас «повторной установки» объекта.

Я собираюсь основать свой ответ на AWS JS SDK.

Чтобы быть ясным - я хочу запускать один раз на каждый из существующих объектов . Триггер уже работает для новых объектов, мне просто нужно , чтобы запустить его на объектах, которые были вставлены перед функцией лямбда .

Как у вас есть рабочая функция лямбды, которая принимает S3 поместить события, что вам нужно сделать, это найти весь необработанный объект в S3 (Если у вас есть DB запись для каждого S3 объекта выше должно быть легко, если нет, то вам может найти функцию списка объектов S3, удобную http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/S3.html#listObjectsV2-property).

Затем для каждого полученного необработанного объекта S3 создается объект JSON, который выглядит как сообщение о событии S3 Put (показано ниже) и вызывает функцию вызова Lambda с указанным выше объектом JSON в качестве полезной нагрузки.

Вы можете найти функциональные документы лямбды Вызывать при http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Lambda.html#invoke-property

При создании фальшивого S3 Put объекта Event Message для лямбда-функции вы можете игнорировать большинство реальных свойств объекта в зависимости от вашего лямбда-функции. Я предполагаю, что вам нужно будет задать наименьшее значение, это имя и имя объекта.

S3 Поместить сообщение Event Structure http://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html

{ 
    "Records":[ 
     { 
     "eventVersion":"2.0", 
     "eventSource":"aws:s3", 
     "awsRegion":"us-east-1", 
     "eventTime":"1970-01-01T00:00:00.000Z", 
     "eventName":"ObjectCreated:Put", 
     "userIdentity":{ 
      "principalId":"AIDAJDPLRKLG7UEXAMPLE" 
     }, 
     "requestParameters":{ 
      "sourceIPAddress":"127.0.0.1" 
     }, 
     "responseElements":{ 
      "x-amz-request-id":"C3D13FE58DE4C810", 
      "x-amz-id-2":"FMyUVURIY8/IgAtTv8xRjskZQpcIZ9KG4V5Wp6S7S/JRWeUWerMUE5JgHvANOjpD" 
     }, 
     "s3":{ 
      "s3SchemaVersion":"1.0", 
      "configurationId":"testConfigRule", 
      "bucket":{ 
       "name":"mybucket", 
       "ownerIdentity":{ 
        "principalId":"A3NL1KOZZKExample" 
       }, 
       "arn":"arn:aws:s3:::mybucket" 
      }, 
      "object":{ 
       "key":"HappyFace.jpg", 
       "size":1024, 
       "eTag":"d41d8cd98f00b204e9800998ecf8427e", 
       "versionId":"096fKKXTRTtl3on89fVO.nfljtsv6qko", 
       "sequencer":"0055AED6DCD90281E5" 
      } 
     } 
     } 
    ] 
} 
2

Как я должен был сделать это на очень большой ведро, и лямбда-функции имеют макс. время выполнения 10 минут, я закончил выполнение сценария с Ruby AWS-SDK.

require 'aws-sdk-v1' 

class LambdaS3Invoker 

    BUCKET_NAME = "HERE_YOUR_BUCKET" 
    FUNCTION_NAME = "HERE_YOUR_FUNCTION_NAME" 
    AWS_KEY = "HERE_YOUR_AWS_KEY" 
    AWS_SECRET = "HERE_YOUR_AWS_SECRET" 
    REGION = "HERE_YOUR_REGION" 

    def execute 
    bucket.objects({ prefix: 'products'}).each do |o| 
     lambda_invoke(o.key) 
    end 
    end 

    private 

    def lambda_invoke(key) 
    lambda.invoke({ 
     function_name: FUNCTION_NAME, 
     invocation_type: 'Event', 
     payload: JSON.generate({ 
     Records: [{ 
      s3: { 
      object: { 
       key: key, 
      }, 
      bucket: { 
       name: BUCKET_NAME, 
      } 
      } 
     }] 
     }) 
    }) 
    end 

    def lambda 
    @lambda ||= Aws::Lambda::Client.new(
     region: REGION, 
     access_key_id: AWS_KEY, 
     secret_access_key: AWS_SECRET 
    ) 
    end 

    def resource 
    @resource ||= Aws::S3::Resource.new(
     access_key_id: AWS_KEY, 
     secret_access_key: AWS_SECRET 
    ) 
    end 

    def bucket 
    @bucket ||= resource.bucket(BUCKET_NAME) 
    end 
end 

И тогда вы можете назвать это нравится:

LambdaS3Invoker.new.execute 
0

Эта нить помог толкнуть меня в правильном направлении, как мне нужно, чтобы вызвать функцию лямбды на файл для существующей 50k файлов в двух ведрах. Я решил написать его в python и ограничить количество лямбда-функций, работающих одновременно до 500 (ограничение параллелизма для многих областей aws равно 1000).

Сценарий создает рабочий пул из 500 потоков, которые передают очередь ключей ведра. Каждый рабочий ждет, пока их лямбда не будет закончена, прежде чем поднять другую. Поскольку выполнение этого скрипта в отношении моих 50k файлов займет пару часов, я просто запускаю его с моей локальной машины. Надеюсь, это поможет кому-то!

#!/usr/bin/env python 

# Proper imports 
import json 
import time 
import base64 
from queue import Queue 
from threading import Thread 
from argh import dispatch_command 

import boto3 
from boto.s3.connection import S3Connection 

client = boto3.client('lambda') 

def invoke_lambdas(): 
    try: 
     # replace these with your access keys 
     s3 = S3Connection(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY) 
     buckets = [s3.get_bucket('bucket-one'), s3.get_bucket('bucket-two')] 

     queue = Queue() 
     num_threads = 500 

     # create a worker pool 
     for i in range(num_threads): 
      worker = Thread(target=invoke, args=(queue,)) 
      worker.setDaemon(True) 
      worker.start() 

     for bucket in buckets: 
      for key in bucket.list(): 
       queue.put((bucket.name, key.key)) 

     queue.join() 

    except Exception as e: 
     print(e) 

def invoke(queue): 
    while True: 
     bucket_key = queue.get() 

     try: 
      print('Invoking lambda with bucket %s key %s. Remaining to process: %d' 
       % (bucket_key[0], bucket_key[1], queue.qsize())) 
      trigger_event = { 
       'Records': [{ 
        's3': { 
         'bucket': { 
          'name': bucket_key[0] 
         }, 
         'object': { 
          'key': bucket_key[1] 
         } 
        } 
       }] 
      } 

      # replace lambda_function_name with the actual name 
      # InvocationType='RequestResponse' means it will wait until the lambda fn is complete 
      response = client.invoke(
       FunctionName='lambda_function_name', 
       InvocationType='RequestResponse', 
       LogType='None', 
       ClientContext=base64.b64encode(json.dumps({}).encode()).decode(), 
       Payload=json.dumps(trigger_event).encode() 
      ) 
      if response['StatusCode'] != 200: 
       print(response) 

     except Exception as e: 
      print(e) 
      print('Exception during invoke_lambda') 

     queue.task_done() 

if __name__ == '__main__': 
    dispatch_command(invoke_lambdas) 
Смежные вопросы