DynamoDB Streamでイベントソースマッピングを使用する(CDK, TypeScript)
#AWS
#DynamoDB Stream
#イベントソースマッピング
#CDK
#TypeScript
◇概要
DynamoStreamを利用する際、イベントソースマッピングを利用することでイベントの発火にフィルターをかけることができます
不要なイベントを減らすことでlambdaの実行数を減らし、コストを削減できるので積極的に取り入れたい
以下の構成でCDK, TypeScriptを使用して実装しました
◇ディレクトリ構成
root/
├ bin/
│ └ cdk.ts
└ lib/
├ TestStack.ts
└ utils/
└ testModule/
├ createDynamoDBTable.ts
├ createLambdaDDBStream.ts
◇DynamoDBのリソース作成
id: string meta: string timestamp: number
PrimaryIndex: id-meta GSI: meta-timestamp
作成したTableNameをSSMパラメーターストアに保存
- /lib/utils/testModule/createDynamoDBTable.ts
import { Stack } from "@aws-cdk/core";
import { AttributeType, BillingMode, StreamViewType, Table } from "@aws-cdk/aws-dynamodb";
import { CfnParameter } from "@aws-cdk/aws-ssm";
export interface CreateDynamoDBTableprops {
modulePrefix: string
}
export function createDynamoDBTable(
stack: Stack,
props: CreateDynamoDBTableprops
) {
const table = new Table(stack, `${props.modulePrefix}-ddb-table`, {
partitionKey: {
name: "id",
type: AttributeType.STRING
},
billingMode: BillingMode.PAY_PER_REQUEST,
pointInTimeRecovery: true,
sortKey: {
name: "meta",
type: AttributeType.STRING
},
stream: StreamViewType.NEW_IMAGE,
timeToLiveAttribute: "ttl"
});
table.addGlobalSecondaryIndex({
indexName: "meta-timestamp-idx",
partitionKey: {
name: "meta",
type: AttributeType.STRING
},
sortKey: {
name: "timestamp",
type: AttributeType.NUMBER
}
});
new CfnParameter(stack, `${props.modulePrefix}-table-name-parameter`, {
type: "String",
value: table.tableName,
description: "DynamoDB tableName for Test",
name: "/Test/DynamoDB/TableName"
});
return table;
}
◇Lambdaリソースの作成
moduleで以下を一括作成
- lambdaリソース
- lambdaRole
- LogGroup
- EventSourceMapping
- /lib/utils/testModule/createLambdaDDBStream.ts
import { Runtime, Tracing, StartingPosition, CfnEventSourceMapping } from "@aws-cdk/aws-lambda";
import { NodejsFunction } from "@aws-cdk/aws-lambda-nodejs";
import { Duration, Stack, Tags } from "@aws-cdk/core";
import { ManagedPolicy, PolicyStatement, Role, ServicePrincipal } from "@aws-cdk/aws-iam";
import { ITable } from "@aws-cdk/aws-dynamodb";
import { LogGroup, RetentionDays } from "@aws-cdk/aws-logs";
export interface CreateLambdaprops {
entry: string;
modulePrefix: string;
lambdaTimeout: number;
lambdaMemorySize: number;
expirationMonth: string;
topics: string[]
table: ITable;
envKey: string;
env: {
account: string;
region: string;
};
}
export function createLambdaDDBStream(
stack: Stack, props: CreateLambdaprops
) {
const lambdaRole = new Role(stack, `${props.modulePrefix}-lambda-role`, {
roleName: `${props.modulePrefix}-lambda-role`,
assumedBy: new ServicePrincipal("lambda.amazonaws.com"),
});
lambdaRole.addManagedPolicy(
ManagedPolicy.fromAwsManagedPolicyName("service-role/AWSLambdaBasicExecutionRole")
);
lambdaRole.addToPolicy(
new PolicyStatement({
resources: [props.table.tableArn, props.table.tableStreamArn as string],
actions: ["dynamodb:*"],
})
);
const lambda = new NodejsFunction(stack, `${props.modulePrefix}-lambda`, {
entry: props.entry,
handler: "handler",
runtime: Runtime.NODEJS_14_X,
timeout: Duration.seconds(props.lambdaTimeout),
functionName: `${props.modulePrefix}-lambda`,
memorySize: props.lambdaMemorySize,
role: lambdaRole,
tracing: Tracing.ACTIVE,
environment: {
REALTIME_TABLE_NAME: props.table.tableName,
EXPIRATION_MONTH: props.expirationMonth,
DDB_CLIENT_PARAM: JSON.stringify({
region: stack.region,
})
},
});
new LogGroup(stack, `${props.modulePrefix}-lambda-log-group`, {
logGroupName: `/aws/lambda/${lambda.functionName}`,
retention: RetentionDays.ONE_MONTH
});
Tags.of(lambda).add("env", props.envKey);
Tags.of(lambda).add("service", "fpm_app");
const prefixFilters = props.topics.map(topic => {
return {
prefix: `${topic}:`
};
});
new CfnEventSourceMapping(stack, `${props.modulePrefix}-event-source-mapping`, {
functionName: lambda.functionName,
batchSize: 50,
eventSourceArn: props.table.tableStreamArn,
filterCriteria: {
Filters: [
{
Pattern: `{ "eventName": ["INSERT"], "dynamodb": { "Keys": { "id": { "S": ${JSON.stringify(prefixFilters)} } } } }`
}
]
},
startingPosition: StartingPosition.TRIM_HORIZON
});
return lambda;
}
・EventSourceMappingについて解説
filterCriteriaのFiltersにPatternを設定することでDynamoDBStreamからlambdaへのイベントを絞ることができる
eventName == "INSERT" && 「 dynamodb.Keys.id.S 」のprefixが「 Test: 」 の場合以下の設定となる
{
Pattern: `{ "eventName": ["INSERT"], "dynamodb": { "Keys": { "id": { "S": { "prefix": "Test:" } } } } }`
}
◇DynamoDB Streamからのeventサンプル
{
Records: [
{
eventID: "0f4570a9b2d29d6916b38cde78651d68",
eventName: "INSERT",
eventVersion: "1.1",
eventSource: "aws:dynamodb",
awsRegion: "ap-northeast-1",
dynamodb: {
ApproximateCreationDateTime: 1644309749,
Keys: {
meta: {
S: "Test:1:1"
},
id: {
S: "Test:1:1:621d1946-c5d6-4187-872d-a766856331a5"
}
},
NewImage: {
meta: {
S: "Test:1:1"
},
lotId: {
S: "2306"
},
id: {
S: "Test:1:1:621d1946-c5d6-4187-872d-a766856331a5"
},
ttl: {
N: "1675845748"
},
timestamp: {
N: "1639560046429"
}
},
SequenceNumber: "200000000012909188526",
SizeBytes: 336,
StreamViewType: "NEW_IMAGE"
},
eventSourceARN: "arn:aws:dynamodb:ap-northeast-1:916308242640:table/TestStack-stgrealtimedashboardddbtable75480668-1H1DR8YZSTM46/stream/2022-02-08T07:21:25.727"
}
]
};
◇上位ファイル
- /lib/TestStack.ts
import { Construct, Stack, StackProps } from "@aws-cdk/core";
import { createDynamoDBTable } from "./utils/realTimeDashBoard/createDynamoDBTable";
import { createLambdaDDBStream } from "./utils/realTimeDashBoard/createLambdaDDBStream";
export interface TestStackprops extends StackProps {
envName: string;
envKey: string;
env: {
account: string;
region: string;
};
}
export class TestStack extends Stack {
constructor(scope: Construct, id: string, props: TestStackprops) {
super(scope, id, props);
const modulePrefix = `fpm-${props.envName}-realtime-dashboard`;
// DynamoDB作成
const table = createDynamoDBTable(this, {
modulePrefix
});
// DynamoDB Stream lambda
const dynamodbStreamLambda = createLambdaDDBStream(this, {
entry: "./src/lambda/test-ddb-stream-handler/index.ts",
modulePrefix: `${props.envName}-test-ddb-stream`,
lambdaTimeout: 900,
lambdaMemorySize: 1024,
expirationMonth: "12",
topics: ["Test", "HogeHoge", "HugaHuga"],
table,
envKey: props.envKey,
env: props.env
});
}
}
- /bin/cdk.ts
#!/usr/bin/env node import * as cdk from "@aws-cdk/core"; import "source-map-support/register"; import { TestStack } from "../lib/TestStack"; const app = new cdk.App(); const argContext = "environment"; const envKey = app.node.tryGetContext(argContext); if (envKey === undefined) { throw new Error(`環境名を指定してください。 ex) cdk deploy -c ${argContext}=dev`); } const envVals = app.node.tryGetContext(envKey); if (envVals === undefined) throw new Error(`${envKey}の環境設定がcdk.jsonに含まれていません`); new TestStack(app, "TestStack", { envName: envVals.envName, envKey, env: envVals.env, });