9

Я пытаюсь захватить изменения таблицы DynamoDB с использованием потоков DynamoDB, а AWS предоставил Java DynamoDB потоки Kinesis-адаптера. Я работаю с SDK Java AWS в приложении Scala.Обработка потоков DynamoDB с использованием потоков AWS Java DynamoDB Адаптер Kinesis

Я начал с просмотра AWS guide и просмотрел опубликованный AWS code example. Однако у меня возникают проблемы с получением собственного опубликованного кода Amazon в моей среде. Моя проблема связана с объектом KinesisClientLibConfiguration.

В примере кода KinesisClientLibConfiguration сконфигурирован с потоком ARN, предоставленным DynamoDB.

new KinesisClientLibConfiguration("streams-adapter-demo", 
    streamArn, 
    streamsCredentials, 
    "streams-demo-worker") 

Я последовал за аналогичную картину в моем приложении Scala от первого местоположения текущего ARN из моего Динамо стола:

lazy val streamArn = dynamoClient.describeTable(config.tableName) 
.getTable.getLatestStreamArn 

А потом создавать KinesisClientLibConfiguration с обеспеченным ARN:

lazy val kinesisConfig :KinesisClientLibConfiguration = 
new KinesisClientLibConfiguration(
    "testProcess", 
    streamArn, 
    defaultProviderChain, 
    "testWorker" 
).withMaxRecords(1000) 
    .withRegionName("eu-west-1") 
    .withMetricsLevel(MetricsLevel.NONE) 
    .withIdleTimeBetweenReadsInMillis(500) 
    .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON) 

Я проверил предоставленный поток ARN, и все соответствует тому, что я вижу на консоли AWS.

Во время выполнения я в конечном итоге получаю исключение о том, что предоставленная ARN не является допустимым именем потока:

com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncTask call 
SEVERE: Caught exception while sync'ing Kinesis shards and leases 
com.amazonaws.services.kinesis.model.AmazonKinesisException: 1 validation  
error detected: Value 'arn:aws:dynamodb:eu-west-1:STREAM ARN' at 
'streamName' failed to satisfy constraint: Member must satisfy regular 
expression pattern: [a-zA-Z0-9_.-]+ (Service: AmazonKinesis; Status Code: 
400; Error Code: ValidationException; Request ID:) 

Глядя на документацию, представленную на KinesisClientLibConfiguration этом имеет смысл в качестве второго параметра указан как streamName без упоминания ARN.

Я не могу найти что-либо на KinesisClientLibConfiguration, что связано с ARN. Поскольку я работаю с потоком DynamoDB, а не с потоком Kinesis, я также не уверен, как найти имя моего потока.

На данный момент я не уверен, что мне не хватает в опубликованном примере AWS, похоже, что они могут использовать гораздо более старую версию KCL. Я использую версию 1.7.0 amazon-kinesis-client.

ответ

3

Вопрос фактически закончился вне моего KinesisClientLibConfiguration.

Мне удалось обойти эту проблему, используя ту же конфигурацию и предоставляя как адаптер потока, входящий в состав библиотеки адаптеров потока DynamoDB, так и клиентов для DynamoDB и CloudWatch.

Теперь мое рабочее решение выглядит так.

Определение конфигурации клиента Kinesis.

//Kinesis config for DynamoDB streams 
lazy val kinesisConfig :KinesisClientLibConfiguration = 
    new KinesisClientLibConfiguration(
     getClass.getName, //DynamoDB shard lease table name 
     streamArn, //pulled from the dynamo table at runtime 
     dynamoCredentials, //DefaultAWSCredentialsProviderChain 
     KeywordTrackingActor.NAME //Lease owner name 
    ).withMaxRecords(1000) //using AWS recommended value 
    .withIdleTimeBetweenReadsInMillis(500) //using AWS recommended value 
    .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON) 

Определение адаптера потока и клиент CloudWatch

val streamAdapterClient :AmazonDynamoDBStreamsAdapterClient = new AmazonDynamoDBStreamsAdapterClient(dynamoCredentials) 
streamAdapterClient.setRegion(region) 

val cloudWatchClient :AmazonCloudWatchClient = new AmazonCloudWatchClient(dynamoCredentials) 
cloudWatchClient.setRegion(region) 

Создать экземпляр RecordProcessorFactory, это до вас, чтобы определить класс, который реализует KCL при условии IRecordProcessorFactory и возвращенный IRecordProcessor.

val recordProcessorFactory :RecordProcessorFactory = new RecordProcessorFactory(context, keywordActor, config.keywordColumnName) 

И часть, которую мне не хватает, все это должно быть предоставлено вашему работнику.

val worker :Worker = 
    new Worker.Builder() 
    .recordProcessorFactory(recordProcessorFactory) 
    .config(kinesisConfig) 
    .kinesisClient(streamAdapterClient) 
    .dynamoDBClient(dynamoClient) 
    .cloudWatchClient(cloudWatchClient) 
    .build() 

//this will start record processing 
streamExecutorService.submit(worker) 
0

В качестве альтернативы, вы можете использовать com.amazonaws.services.dynamodbv2.streamsadapter.StreamsWorker вместо com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker , который внутренне использует тот AmazonDynamoDBStreamsAdapterClient.

т.е.

lazy val kinesisConfig :KinesisClientLibConfiguration = 
new KinesisClientLibConfiguration(
    getClass.getName, //DynamoDB shard lease table name 
    streamArn, //pulled from the dynamo table at runtime 
    dynamoCredentials, //DefaultAWSCredentialsProviderChain 
    KeywordTrackingActor.NAME //Lease owner name 
).withMaxRecords(1000) //using AWS recommended value 
.withIdleTimeBetweenReadsInMillis(500) //using AWS recommended value 
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON) 

val worker = new com.amazonaws.services.dynamodbv2.streamsadapter.StreamsWorker(recordProcessorFactory, kinesisConfig) 
0

Просто, чтобы ответить на то, что проблема была - вы обеспечивали ARN, когда он просто хотел имя потока.

0

Недавно я сделал PR в этом проекте gfc-aws-kinesis, и теперь вы можете использовать его, просто передав адаптер и введя реализацию KinesisRecordAdapter.

В примере я использую Scanamo для разбора Hashmap

Создать клиенту

val streamAdapterClient: AmazonDynamoDBStreamsAdapterClient = 
    new AmazonDynamoDBStreamsAdapterClient() 

передать его в конфигурации:

val streamConfig = KinesisStreamConsumerConfig[Option[A]](
    applicationName, 
    config.stream, //the full dynamodb stream arn 
    regionName = Some(config.region), 
    checkPointInterval = config.checkpointInterval, 
    initialPositionInStream = config.streamPosition, 
    dynamoDBKinesisAdapterClient = Some(streamAdapterClient) 
) 
KinesisStreamSource(streamConfig).mapMaterializedValue(_ => NotUsed) 

Создание неявного записи читателя, подходящий для данные о динамических событиях:

implicit val kinesisRecordReader 
    : KinesisRecordReader[Option[A]] = 
    new KinesisRecordReader[Option[A]] { 
    override def apply(record: Record): Option[A] = { 
     record match { 
     case recordAdapter: RecordAdapter => 
      val dynamoRecord: DynamoRecord = 
      recordAdapter.getInternalObject 
      dynamoRecord.getEventName match { 
      case "INSERT" => 
       ScanamoFree 
       .read[A](
        dynamoRecord.getDynamodb.getNewImage) 
       .toOption 
      case _ => None 
      } 
     case _ => None 
     } 
    } 
    } 
+0

Вы должны улучшить свой ответ, добавив здесь пример и краткое объяснение. Возможно, чтение [this] (https://stackoverflow.com/help/how-to-answer) помогает вам улучшить ваш ответ. – Markus

Смежные вопросы