AWS IoT EventsをCDKで構築してみた(TypeScript)

#AWS
#IoT Events
#CDK
#TypeScript

 

実務でIoT Eventsを触ったので活用例を記事にしてみた の続きです

 

AWS IoT Eventsの設定をCDKで構築してみました。 まずはアーキテクチャイメージから、 アーキテクチャイメージ図

 

root/ ├ bin/ │ └ cdk.ts └ lib/ IotEventsStack.ts └ utils/ └ iotEventsModule/ ├ createSnsTopic.ts ├ createIotEvents.ts ├ createInputMessageLambda └ createSqsLambda

 

スタック単位の定義ファイルです。 ここで各々のモジュールを呼び出しています。

import { Construct, Stack, StackProps } from "@aws-cdk/core"; import { createSnsTopic } from "./utils/iotEventsModule/createSnsTopic"; import { createIotEvents } from "./utils/iotEventsModule/createIotEvents"; import { createInputMessageLambda } from "./utils/iotEventsModule/createInputMessageLambda"; import { createSqsLambda } from "./utils/iotEventsModule/createSqsLambda"; import { lookupStream, lookupLambdaNetwork } from "./utils/common/lookups"; export interface IoTEventsStackprops extends StackProps { vpcName: string; lambdaSecurityGroupId: string; lambdaPrivateSubnet1aId: string; lambdaPrivateSubnet1cId: string; streamName: string; envName: string; envKey: string; env: { account: string; region: string; }; } export class IoTEventsStack extends Stack { constructor(scope: Construct, id: string, props: IoTEventsStackprops) { super(scope, id, props); const stream = lookupStream(this, props.streamName); const { vpc, lambdaSubnets, lambdaSecurityGroup } = lookupLambdaNetwork(this, props); const IoTEventsToErrorSnsTopic = createSnsTopic(this, { modulePrefix: `${props.envName}-IoTEvents-to-error` }); const IoTEventsToNormalSnsTopic = createSnsTopic(this, { modulePrefix: `${props.envName}-IoTEvents-to-normal` }); const IoTEventsIotEvents = createIotEvents(this, { modulePrefix: `${props.envName}-IoTEvents`, toErrorTopicArn: IoTEventsToErrorSnsTopic.topicArn, toNormalTopicArn: IoTEventsToNormalSnsTopic.topicArn }); // IotEvents前処理lambda const IoTEventsInputMessageLambda = createInputMessageLambda(this, { entry: "./src/lambda/IoTEvents-put-message-iotevents-handler/index.ts", modulePrefix: `${props.envName}-IoTEvents-put-message-iotevents`, lambdaTimeout: 900, lambdaMemorySize: 1024, modelInput: IoTEventsIotEvents.modelInput, stream, envKey: props.envKey, env: props.env }); const IoTEventsToErrorSqsLambda = createSqsLambda(this, { vpc, subnets: lambdaSubnets, securityGroups: [lambdaSecurityGroup], entry: "./src/lambda/IoTEvents-to-error-handler/index.ts", modulePrefix: `${props.envName}-IoTEvents-to-error-lambda`, sqsRetentionPeriod: 60 * 3, sqsVisibilityTimeout: 60, topic: IoTEventsToErrorSnsTopic, lambdaTimeout: 59, lambdaMemorySize: 512, envKey: props.envKey }); const IoTEventsToNormalSqsLambda = createSqsLambda(this, { vpc, subnets: lambdaSubnets, securityGroups: [lambdaSecurityGroup], entry: "./src/lambda/IoTEvents-to-normal-handler/index.ts", modulePrefix: `${props.envName}-IoTEvents-to-normal`, sqsRetentionPeriod: 60 * 3, sqsVisibilityTimeout: 60, topic: IoTEventsToNormalSnsTopic, lambdaTimeout: 59, lambdaMemorySize: 512, envKey: props.envKey }); } }

 

ここでは実際のCDKモジュールを呼び出してリソースを定義しています。

import { Stack } from "@aws-cdk/core"; import { CfnInput, CfnDetectorModel } from "@aws-cdk/aws-iotevents"; import { Role, ServicePrincipal, PolicyStatement } from "@aws-cdk/aws-iam"; export interface CreateIotEventsprops { modulePrefix: string; toErrorTopicArn: string; toNormalTopicArn: string; } export function createIotEvents( stack: Stack, props: CreateIotEventsprops ) { const modelInput = new CfnInput(stack, `${props.modulePrefix}-input`, { inputName: `${props.modulePrefix}_input`.replace(/-/g, "_"), inputDefinition: { attributes: [ { jsonPath: "id" }, { jsonPath: "status" }, { jsonPath: "type" }, { jsonPath: "timestamp" } ] } }); const modelRole = new Role(stack, `${props.modulePrefix}-model-role`, { assumedBy: new ServicePrincipal("iotevents.amazonaws.com"), }); modelRole.addToPolicy( new PolicyStatement({ resources: [props.toErrorTopicArn, props.toNormalTopicArn], actions: ["sns:Publish"], }) ); const detectorModelDefinition = { initialStateName: "Normal", states: [ { stateName: "Normal", onInput: { events: [], transitionEvents: [ { eventName: "to_error", condition: `$input.${modelInput.inputName}.status == 1`, actions: [ { sns: { targetArn: props.toErrorTopicArn, payload: { contentExpression: `'{\"id\": \"\${$input.${modelInput.inputName}.id}\", \"status\": \${$input.${modelInput.inputName}.status}, \"type\": \"\${$input.${modelInput.inputName}.type}\", \"timestamp\": \${$input.${modelInput.inputName}.timestamp}}'`, type: "JSON" } } }, { setVariable: { variableName: "previousTimestamp", value: `$input.${modelInput.inputName}.timestamp` } }, { setVariable: { variableName: "id", value: `$input.${modelInput.inputName}.id` } } ], nextState: "Error" } ] }, onEnter: { events: [] }, onExit: { events: [] } }, { stateName: "Error", onInput: { events: [], transitionEvents: [ { eventName: "to_normal", condition: `$input.${modelInput.inputName}.status == 0 && $variable.previousTimestamp < $input.${modelInput.inputName}.timestamp`, actions: [ { sns: { targetArn: props.toNormalTopicArn, payload: { contentExpression: `'{\"id\": \"\${$variable.id}\", \"status\": \${$input.${modelInput.inputName}.status}, \"type\": \"\${$input.${modelInput.inputName}.type}\", \"timestamp\": \${$input.${modelInput.inputName}.timestamp}}'`, type: "JSON" } } } ], nextState: "Normal" } ] }, onEnter: { events: [] }, onExit: { events: [] } } ] }; const detectorModel = new CfnDetectorModel(stack, `${props.modulePrefix}-model`, { detectorModelDefinition, detectorModelName: `${props.modulePrefix}-model`, key: "type", evaluationMethod: "BATCH", roleArn: modelRole.roleArn, }); const iotEvents = { modelInput, detectorModel }; return iotEvents; }

ここでの主なリソースはCfnInput, CfnDetectorModel, Roleです。

  • CfnInput 検知器(detector)に送るデータスキーマを定義

 

  • CfnDetectorModel ここで検知器のリソースを定義、プロパティについて少し解説します detectorModelDefinitionでは検知器の構成を定義しています。ここは一から作るのは大変なのでAWSコンソールのGUIで作ってからexportするのが楽でした。 上記の例ではstatesの中にNormalとErrorの2つのState(状態)を定義しています。 onInput.transitionEventsでイベントを定義することができ、SNSのpublish、変数などを編集できます。 イベントの条件はconditionで設定され、上記ではstatusが1となったタイミングで遷移するように設定しています。

 

  • Role IAMのロールを定義しています。 上記では検知器からSNSへpublishする必要があるため、SNSの操作権限を付与しています。

 

デプロイするとこのような検知器が作られます 検知器イメージ図

SNSリソースを定義しています。 テンプレートとして残しておきます。

import { PolicyStatement, ServicePrincipal } from "@aws-cdk/aws-iam"; import { Stack } from "@aws-cdk/core"; import { Topic, TopicPolicy } from "@aws-cdk/aws-sns"; export interface CreateSnsTopicsprops { modulePrefix: string } export function createSnsTopic( stack: Stack, props: CreateSnsTopicsprops ) { const topic = new Topic(stack, `${props.modulePrefix}-topic`); const topicPolicy = new TopicPolicy(stack, `${props.modulePrefix}-topic-policy`, { topics: [topic] }); topicPolicy.document.addStatements(new PolicyStatement({ actions: ["SNS:Publish"], principals: [new ServicePrincipal("iotevents.amazonaws.com")], resources: [topic.topicArn] })); return topic; }

 

こちらは検知器へPutMessageするためのLambdaを定義しています。 "iotevents:BatchPutMessage"で検知器へPutMessageするための権限、 "kinesis:GetRecords"でKinesis Streamからレコードを取得するための権限を定義しています。 Lambda内の処理は割愛します。

import { Runtime, Tracing, StartingPosition } from "@aws-cdk/aws-lambda"; import { NodejsFunction } from "@aws-cdk/aws-lambda-nodejs"; import { Duration, Stack, Tags } from "@aws-cdk/core"; import { ManagedPolicy, PolicyStatement, Role, ServicePrincipal } from "@aws-cdk/aws-iam"; import { CfnInput } from "@aws-cdk/aws-iotevents"; import { IStream } from "@aws-cdk/aws-kinesis"; import { LogGroup, RetentionDays } from "@aws-cdk/aws-logs"; import { KinesisEventSource } from "@aws-cdk/aws-lambda-event-sources"; export interface CreateInputMessageLambdaprops { entry: string; modulePrefix: string; lambdaTimeout: number; lambdaMemorySize: number; modelInput: CfnInput; stream: IStream; envKey: string; env: { account: string; region: string; }; } export function createInputMessageLambda( stack: Stack, props: CreateInputMessageLambdaprops ) { const lambdaRole = new Role(stack, `${props.modulePrefix}-lambda-role`, { roleName: `${props.modulePrefix}-lambda-role`, assumedBy: new ServicePrincipal("lambda.amazonaws.com"), }); lambdaRole.addManagedPolicy( ManagedPolicy.fromAwsManagedPolicyName("service-role/AWSLambdaBasicExecutionRole") ); lambdaRole.addToPolicy( new PolicyStatement({ resources: [`arn:aws:iotevents:${props.env.region}:${props.env.account}:input/${props.modelInput.inputName}`], actions: ["iotevents:BatchPutMessage"], }) ); lambdaRole.addToPolicy( new PolicyStatement({ resources: ["*"], actions: ["kinesis:GetRecords"], }) ); const lambda = new NodejsFunction(stack, `${props.modulePrefix}-lambda`, { entry: props.entry, handler: "handler", runtime: Runtime.NODEJS_14_X, timeout: Duration.seconds(props.lambdaTimeout), functionName: `${props.modulePrefix}-lambda`, memorySize: props.lambdaMemorySize, role: lambdaRole, tracing: Tracing.ACTIVE, environment: { IOT_EVENTS_INPUT_NAME: props.modelInput.inputName as string }, }); new LogGroup(stack, `${props.modulePrefix}-lambda-log-group`, { logGroupName: `/aws/lambda/${lambda.functionName}`, retention: RetentionDays.ONE_MONTH }); Tags.of(lambda).add("env", props.envKey); Tags.of(lambda).add("service", "app"); // Kinesis-Lambda setting lambda.addEventSource(new KinesisEventSource(props.stream, { batchSize: 100, startingPosition: StartingPosition.TRIM_HORIZON })); return lambda; }

 

検知器の後続処理のLambdaを定義しています。 SNSのサブスクリプション、SQSリソース定義、Lambdaリソース定義を一括で設定しています。

import { ISecurityGroup, ISubnet, IVpc } from "@aws-cdk/aws-ec2"; import { Runtime, Tracing } from "@aws-cdk/aws-lambda"; import { NodejsFunction } from "@aws-cdk/aws-lambda-nodejs"; import { Duration, Stack, Tags } from "@aws-cdk/core"; import { ManagedPolicy, PolicyStatement, Role, ServicePrincipal } from "@aws-cdk/aws-iam"; import { Queue, QueuePolicy } from "@aws-cdk/aws-sqs"; import { ITopic } from "@aws-cdk/aws-sns"; import { SqsSubscription } from "@aws-cdk/aws-sns-subscriptions"; import { LogGroup, RetentionDays } from "@aws-cdk/aws-logs"; import { SqsEventSource } from "@aws-cdk/aws-lambda-event-sources"; export interface CreateSqsLambdaprops { vpc: IVpc; subnets: ISubnet[]; securityGroups: ISecurityGroup[]; entry: string; modulePrefix: string; sqsRetentionPeriod: number; sqsVisibilityTimeout: number; topic: ITopic; lambdaTimeout: number; lambdaMemorySize: number; envKey: string; } export function createSqsLambda( stack: Stack, props: CreateSqsLambdaprops ) { // SQS setting const queue = new Queue(stack, `${props.modulePrefix}-queue`, { receiveMessageWaitTime: Duration.seconds(20), retentionPeriod: Duration.seconds(props.sqsRetentionPeriod), visibilityTimeout: Duration.seconds(props.sqsVisibilityTimeout) }); const queuePolicy = new QueuePolicy(stack, `${props.modulePrefix}-queue-policy`, { queues: [queue] }); queuePolicy.document.addStatements(new PolicyStatement({ actions: ["sqs:SendMessage"], principals: [new ServicePrincipal("sns.amazonaws.com")], resources: [queue.queueArn], conditions: { ArnEquals: { "aws:SourceArn": props.topic.topicArn } } })); // Lambda setting const lambdaRole = new Role(stack, `${props.modulePrefix}-lambda-role`, { roleName: `${props.modulePrefix}-lambda-role`, assumedBy: new ServicePrincipal("lambda.amazonaws.com"), }); lambdaRole.addManagedPolicy( ManagedPolicy.fromAwsManagedPolicyName("service-role/AWSLambdaVPCAccessExecutionRole") ); lambdaRole.addToPolicy( new PolicyStatement({ resources: [queue.queueArn], actions: ["sqs:ReceiveMessage", "sqs:DeleteMessage", "sqs:GetQueueAttributes"], }) ); lambdaRole.addToPolicy( new PolicyStatement({ actions: ["ssm:GetParameter"], resources: [`arn:aws:ssm:${stack.region}:${stack.account}:parameter/HogeHoge/Aurora/Secrets`], }) ); const lambda = new NodejsFunction(stack, `${props.modulePrefix}-lambda`, { entry: props.entry, handler: "handler", runtime: Runtime.NODEJS_14_X, timeout: Duration.seconds(props.lambdaTimeout), functionName: `${props.modulePrefix}-lambda`, memorySize: props.lambdaMemorySize, role: lambdaRole, vpc: props.vpc, vpcSubnets: props.vpc.selectSubnets({ subnets: props.subnets, }), securityGroups: props.securityGroups, tracing: Tracing.ACTIVE, environment: { AURORA_ACCESS_SECRETS_NAME: "/HogeHoge/Aurora/Secrets", S3_CLIENT_PARAM: JSON.stringify({ region: stack.region, signatureVersion: "v4", }), SSM_CLIENT_PARAM: JSON.stringify({ region: stack.region, }), }, bundling: { commandHooks: { beforeInstall(inputDir: string, outputDir: string): string[] { return [``]; }, beforeBundling(inputDir: string, outputDir: string): string[] { return [``]; }, afterBundling(inputDir: string, outputDir: string): string[] { return [ `cp ${inputDir}/node_modules/.prisma/client/query-engine-rhel-openssl-1.0.x ${outputDir}`, `cp ${inputDir}/prisma/schema.prisma ${outputDir}`, ]; }, }, }, }); new LogGroup(stack, `${props.modulePrefix}-lambda-log-group`, { logGroupName: `/aws/lambda/${lambda.functionName}`, retention: RetentionDays.ONE_MONTH }); Tags.of(lambda).add("env", props.envKey); Tags.of(lambda).add("service", "app"); //SQS-SNS setting props.topic.addSubscription(new SqsSubscription(queue)); // SQS-Lambda setting queue.grantConsumeMessages(lambda); lambda.addEventSource(new SqsEventSource(queue, {})); return lambda; }

 

トップレベルのファイルです。 このファイルを実行することでCDKのデプロイを実行できます。

#!/usr/bin/env node import * as cdk from "@aws-cdk/core"; import "source-map-support/register"; import { IotEventsStack } from "../lib/IotEventsStack"; const app = new cdk.App(); const argContext = "environment"; const envKey = app.node.tryGetContext(argContext); if (envKey === undefined) { throw new Error(`環境名を指定してください。 ex) cdk deploy -c ${argContext}=dev`); } const envVals = app.node.tryGetContext(envKey); if (envVals === undefined) throw new Error(`${envKey}の環境設定がcdk.jsonに含まれていません`); new IotEventsStack(app, "IotEventsStack", { vpcName: envVals.vpcName, lambdaSecurityGroupId: envVals.lambdaSecurityGroupId, lambdaPrivateSubnet1aId: envVals.lambdaPrivateSubnet1aId, lambdaPrivateSubnet1cId: envVals.lambdaPrivateSubnet1cId, streamName: envVals.IotEventsStreamName, envName: envVals.envName, envKey, env: envVals.env, });

 

最初のapp.node.tryGetContext(argContext)で後述するcdkコマンドの-cに入る値が代入されます。 次のapp.node.tryGetContext(envKey)はcdk.jsonで定義した値を引っ張っています。

 

// CloudFormationテンプレート出力 $ cdk synth -c environment=dev IotEventsStack --profile <profile名> // 差分表示 $ cdk diff -c environment=dev IotEventsStack --profile <profile名> // デプロイ $ cdk deploy -c environment=dev IotEventsStack --profile <profile名>