DynamoDB Streamでイベントソースマッピングを使用する(CDK, TypeScript)

#AWS
#DynamoDB Stream
#イベントソースマッピング
#CDK
#TypeScript

DynamoStreamを利用する際、イベントソースマッピングを利用することでイベントの発火にフィルターをかけることができます

 

不要なイベントを減らすことでlambdaの実行数を減らし、コストを削減できるので積極的に取り入れたい

以下の構成でCDK, TypeScriptを使用して実装しました

アーキテクチャ図1

 

root/
  ├ bin/
  │  └ cdk.ts
  └ lib/
     ├ TestStack.ts
     └ utils/
          └ testModule/
                 ├ createDynamoDBTable.ts
                 ├ createLambdaDDBStream.ts

 

id: string meta: string timestamp: number

 

PrimaryIndex: id-meta GSI: meta-timestamp

 

作成したTableNameをSSMパラメーターストアに保存

 

  • /lib/utils/testModule/createDynamoDBTable.ts
import { Stack } from "@aws-cdk/core";
import { AttributeType, BillingMode, StreamViewType, Table } from "@aws-cdk/aws-dynamodb";
import { CfnParameter } from "@aws-cdk/aws-ssm";

export interface CreateDynamoDBTableprops {
  modulePrefix: string
}

export function createDynamoDBTable(
  stack: Stack,
  props: CreateDynamoDBTableprops
) {
  const table = new Table(stack, `${props.modulePrefix}-ddb-table`, {
    partitionKey: {
      name: "id",
      type: AttributeType.STRING
    },
    billingMode: BillingMode.PAY_PER_REQUEST,
    pointInTimeRecovery: true,
    sortKey: {
      name: "meta",
      type: AttributeType.STRING
    },
    stream: StreamViewType.NEW_IMAGE,
    timeToLiveAttribute: "ttl"
  });
  table.addGlobalSecondaryIndex({
    indexName: "meta-timestamp-idx",
    partitionKey: {
      name: "meta",
      type: AttributeType.STRING
    },
    sortKey: {
      name: "timestamp",
      type: AttributeType.NUMBER
    }
  });

  new CfnParameter(stack, `${props.modulePrefix}-table-name-parameter`, {
    type: "String",
    value: table.tableName,
    description: "DynamoDB tableName for Test",
    name: "/Test/DynamoDB/TableName"
  });

  return table;
}

 

moduleで以下を一括作成

  • lambdaリソース
  • lambdaRole
  • LogGroup
  • EventSourceMapping

 

  • /lib/utils/testModule/createLambdaDDBStream.ts
import { Runtime, Tracing, StartingPosition, CfnEventSourceMapping } from "@aws-cdk/aws-lambda";
import { NodejsFunction } from "@aws-cdk/aws-lambda-nodejs";
import { Duration, Stack, Tags } from "@aws-cdk/core";
import { ManagedPolicy, PolicyStatement, Role, ServicePrincipal } from "@aws-cdk/aws-iam";
import { ITable } from "@aws-cdk/aws-dynamodb";
import { LogGroup, RetentionDays } from "@aws-cdk/aws-logs";

export interface CreateLambdaprops {
  entry: string;
  modulePrefix: string;
  lambdaTimeout: number;
  lambdaMemorySize: number;
  expirationMonth: string;
  topics: string[]
  table: ITable;
  envKey: string;
  env: {
    account: string;
    region: string;
  };
}

export function createLambdaDDBStream(
  stack: Stack, props: CreateLambdaprops
) {
  const lambdaRole = new Role(stack, `${props.modulePrefix}-lambda-role`, {
    roleName: `${props.modulePrefix}-lambda-role`,
    assumedBy: new ServicePrincipal("lambda.amazonaws.com"),
  });

  lambdaRole.addManagedPolicy(
    ManagedPolicy.fromAwsManagedPolicyName("service-role/AWSLambdaBasicExecutionRole")
  );

  lambdaRole.addToPolicy(
    new PolicyStatement({
      resources: [props.table.tableArn, props.table.tableStreamArn as string],
      actions: ["dynamodb:*"],
    })
  );

  const lambda = new NodejsFunction(stack, `${props.modulePrefix}-lambda`, {
    entry: props.entry,
    handler: "handler",
    runtime: Runtime.NODEJS_14_X,
    timeout: Duration.seconds(props.lambdaTimeout),
    functionName: `${props.modulePrefix}-lambda`,
    memorySize: props.lambdaMemorySize,
    role: lambdaRole,
    tracing: Tracing.ACTIVE,
    environment: {
      REALTIME_TABLE_NAME: props.table.tableName,
      EXPIRATION_MONTH: props.expirationMonth,
      DDB_CLIENT_PARAM: JSON.stringify({
        region: stack.region,
      })
    },
  });

  new LogGroup(stack, `${props.modulePrefix}-lambda-log-group`, {
    logGroupName: `/aws/lambda/${lambda.functionName}`,
    retention: RetentionDays.ONE_MONTH
  });

  Tags.of(lambda).add("env", props.envKey);
  Tags.of(lambda).add("service", "fpm_app");

  const prefixFilters = props.topics.map(topic => {
    return {
      prefix: `${topic}:`
    };
  });

  new CfnEventSourceMapping(stack, `${props.modulePrefix}-event-source-mapping`, {
    functionName: lambda.functionName,
    batchSize: 50,
    eventSourceArn: props.table.tableStreamArn,
    filterCriteria: {
      Filters: [
        {
          Pattern: `{ "eventName": ["INSERT"], "dynamodb": { "Keys": { "id": { "S": ${JSON.stringify(prefixFilters)} } } } }`
        }
      ]
    },
    startingPosition: StartingPosition.TRIM_HORIZON
  });

  return lambda;
}

 

filterCriteriaのFiltersにPatternを設定することでDynamoDBStreamからlambdaへのイベントを絞ることができる

 

eventName == "INSERT" && 「 dynamodb.Keys.id.S 」のprefixが「 Test: 」 の場合以下の設定となる

{
  Pattern: `{ "eventName": ["INSERT"], "dynamodb": { "Keys": { "id": { "S": { "prefix": "Test:" } } } } }`
}
{
  Records: [
    {
      eventID: "0f4570a9b2d29d6916b38cde78651d68",
      eventName: "INSERT",
      eventVersion: "1.1",
      eventSource: "aws:dynamodb",
      awsRegion: "ap-northeast-1",
      dynamodb: {
        ApproximateCreationDateTime: 1644309749,
        Keys: {
          meta: {
            S: "Test:1:1"
          },
          id: {
            S: "Test:1:1:621d1946-c5d6-4187-872d-a766856331a5"
          }
        },
        NewImage: {
          meta: {
            S: "Test:1:1"
          },
          lotId: {
            S: "2306"
          },
          id: {
            S: "Test:1:1:621d1946-c5d6-4187-872d-a766856331a5"
          },
          ttl: {
            N: "1675845748"
          },
          timestamp: {
            N: "1639560046429"
          }
        },
        SequenceNumber: "200000000012909188526",
        SizeBytes: 336,
        StreamViewType: "NEW_IMAGE"
      },
      eventSourceARN: "arn:aws:dynamodb:ap-northeast-1:916308242640:table/TestStack-stgrealtimedashboardddbtable75480668-1H1DR8YZSTM46/stream/2022-02-08T07:21:25.727"
    }
  ]
};

 

  • /lib/TestStack.ts
import { Construct, Stack, StackProps } from "@aws-cdk/core";
import { createDynamoDBTable } from "./utils/realTimeDashBoard/createDynamoDBTable";
import { createLambdaDDBStream } from "./utils/realTimeDashBoard/createLambdaDDBStream";

export interface TestStackprops extends StackProps {
  envName: string;
  envKey: string;
  env: {
    account: string;
    region: string;
  };
}

export class TestStack extends Stack {
  constructor(scope: Construct, id: string, props: TestStackprops) {
    super(scope, id, props);

    const modulePrefix = `fpm-${props.envName}-realtime-dashboard`;

    // DynamoDB作成
    const table = createDynamoDBTable(this, {
      modulePrefix
    });

    // DynamoDB Stream lambda
    const dynamodbStreamLambda = createLambdaDDBStream(this, {
      entry: "./src/lambda/test-ddb-stream-handler/index.ts",
      modulePrefix: `${props.envName}-test-ddb-stream`,
      lambdaTimeout: 900,
      lambdaMemorySize: 1024,
      expirationMonth: "12",
      topics: ["Test", "HogeHoge", "HugaHuga"],
      table,
      envKey: props.envKey,
      env: props.env
    });
  }
}

 

  • /bin/cdk.ts
#!/usr/bin/env node import * as cdk from "@aws-cdk/core"; import "source-map-support/register"; import { TestStack } from "../lib/TestStack"; const app = new cdk.App(); const argContext = "environment"; const envKey = app.node.tryGetContext(argContext); if (envKey === undefined) { throw new Error(`環境名を指定してください。 ex) cdk deploy -c ${argContext}=dev`); } const envVals = app.node.tryGetContext(envKey); if (envVals === undefined) throw new Error(`${envKey}の環境設定がcdk.jsonに含まれていません`); new TestStack(app, "TestStack", { envName: envVals.envName, envKey, env: envVals.env, });