SQS LambdaからSESでメール送信を行いたい

#AWS
#CDK
#Python
#Lambda
#SQS
#SES

Amazon SESを使ってメールを送信する際、秒間に送信できる数にはクォータが設定されています。上限緩和申請すれば枠の拡張はできますが、複数のサービスでメール処理を行う場合バッチ処理で大量の並列処理でメールを送信すると簡単にクォータを超えてしまうと思われます。

今回、SQS -> lambda -> SESという構成を組むことでアカウント全体でメールを送信する処理は1箇所で行うようにし、メール送信の速度も調整できるようにしたいと思います。

SQS -> lambda -> SESイメージ

SESサービスを使用するにはまず、送信元のアドレスを登録する必要があります

SESコンソール画面で「IDの作成」を押下、画面に沿ってドメイン登録もしくはEメールアドレス登録を行います

詳細は こちら を参照ください

構成管理ツールのCDKを用いて、lambda SQS IAM等の構築を一括で行います

import os from aws_cdk import ( Duration, aws_sqs as sqs, aws_lambda_event_sources as lambda_event_sources, ) from constructs import Construct from cdk_resources.modules.lambda_module import LambdaModule env = os.getenv("ENVIRONMENT", "dev") commit_ref_name = os.getenv("COMMIT_REF_NAME") class SendEmailSqsResource(Construct): def __init__( self, scope: Construct, id: str, backend_dirname: str, environment={}, layers=[], layer_hashes=[] ): super().__init__(scope, id) service_name = self.node.try_get_context("service_name") ses_region = self.node.try_get_context("ses_region") ctx_stage = self.node.try_get_context(env) sender_email = ctx_stage["sender_email"] application_url = "https://" + ctx_stage["domain_name"] if env == "feat": stage_name = "Feat" + commit_ref_name[:10] application_url = "http://localhost:3000" else: stage_name = env.capitalize() # SQS send_email_queue = sqs.Queue( self, "SendEmailQueue", content_based_deduplication=False, queue_name=f"{service_name}{stage_name}SendEmailQueue.fifo", receive_message_wait_time=Duration.seconds(20), fifo=True, visibility_timeout=Duration.minutes(15) ) sqs_consume_props = { "name": "EmailSendSqs", "path": "functions/src/sqs/email_send_sqs", "memory_size": 512, "reserved_concurrent_executions": None, "metric_group_name": "sqs", "role_props": { "consume_queues": [send_email_queue], "is_ses": True }, "environment": { "SES_REGION": ses_region, "SENDER_EMAIL": sender_email, "APPLICATION_URL": application_url, }, "timeout": 29 } sqs_consume_lambda_env = sqs_consume_props["environment"] sqs_consume_lambda_env.update(environment) sqs_consume_lambda_module = LambdaModule( self, name=sqs_consume_props["name"], path=os.path.join( backend_dirname, sqs_consume_props["path"]), handler="app.lambda_handler", environment=sqs_consume_lambda_env, timeout=sqs_consume_props["timeout"], memory_size=sqs_consume_props["memory_size"], reserved_concurrent_executions=sqs_consume_props["reserved_concurrent_executions"], layers=layers, layer_hashes=layer_hashes, metric_group_name=sqs_consume_props["metric_group_name"], role_props=sqs_consume_props["role_props"] ) sqs_consume_lambda_module.create_lambda_alias() sqs_consume_lambda_function_alias = sqs_consume_lambda_module.lambda_function_alias sqs_consume_lambda_function_alias.add_event_source( lambda_event_sources.SqsEventSource( send_email_queue, batch_size=5 ) ) self.send_email_queue = send_email_queue

ここではSQSとlambdaリソースを定義しています

必要に応じてlambdaで処理するbatch_sizeとlambdaのreserved_concurrent_executionsを調整することで メールを処理する速度を調整します

 

import os from aws_cdk import ( AssetHashType, BundlingOptions, Duration, RemovalPolicy, aws_lambda as lambda_, aws_logs as logs, aws_sqs as sqs, aws_iam as iam, ) from typing import TypedDict env = os.getenv("ENVIRONMENT", "dev") stage_name = env.capitalize() if env == "feat": stage_name += os.getenv("COMMIT_REF_NAME") class RolePropsType(TypedDict, total=False): send_queues: list[sqs.Queue] consume_queues: list[sqs.Queue] is_ses: bool class LambdaModule: def __init__( self, scope, name: str, path: str, handler: str, environment: dict, timeout: int, memory_size: int, reserved_concurrent_executions: int, layers: list, layer_hashes=None, role_props: RolePropsType = {} ): self.scope = scope self.name = name self.path = path self.handler = handler self.environment = environment self.timeout = timeout self.memory_size = memory_size self.reserved_concurrent_executions = reserved_concurrent_executions self.layers = layers self.layer_hashes = layer_hashes self.role_props = role_props self.service_name = scope.node.try_get_context("service_name") def joined_hash(self): if self.layer_hashes is None: return "" return (", ").join(self.layer_hashes) def create_lambda_function(self): lambda_function = lambda_.Function( self.scope, f"{self.name}Function", code=lambda_.Code.from_asset( self.path, asset_hash_type=AssetHashType.SOURCE, bundling=BundlingOptions( image=lambda_.Runtime.PYTHON_3_10.bundling_image, command=[ "bash", "-c", " && ".join( [ "pip install -r requirements.txt -t /asset-output", "cp -au . /asset-output", ] ), ], user="root:root" ), ), handler=self.handler, runtime=lambda_.Runtime.PYTHON_3_10, environment=self.environment, description=f"stage: {self.environment["STAGE"]}, branch: {self.environment["COMMIT_REF_NAME"]}, name: {self.name}", timeout=Duration.seconds( self.timeout), memory_size=self.memory_size, reserved_concurrent_executions=self.reserved_concurrent_executions, current_version_options={ "removal_policy": RemovalPolicy.RETAIN }, layers=self.layers, log_retention=logs.RetentionDays.TWO_MONTHS, architecture=lambda_.Architecture.X86_64 ) self.lambda_function = lambda_function def create_role_lambda(self): role_props = self.role_props lambda_function = self.lambda_function if role_props.get("send_queues"): for send_queue in role_props["send_queues"]: send_queue.grant_send_messages(lambda_function) if role_props.get("consume_queues"): for consume_queue in role_props["consume_queues"]: consume_queue.grant_consume_messages(lambda_function) if role_props.get("is_ses"): lambda_function.add_to_role_policy(statement=iam.PolicyStatement( actions=["ses:SendEmail", "ses:SendRawEmail"], resources=["*"])) def create_lambda_alias(self): self.create_lambda_function() self.create_role_lambda() self.assign_metrics() self.lambda_function_alias: lambda_.Alias = self.lambda_function.add_alias( "Live")

lambdaの定義はモジュール化して1箇所にまとめると個人的には便利です

grant_xxメゾットと使用すると簡単にIAM権限をlambdaへ付与することができます

descriptionを設定することでAWSコンソールからlambdaを探す際に楽になります

 

{ "service_name": "XXXXXXXXXXXXXXXXXXXXX", "ses_region": "ap-northeast-1", "feat": { "hosted_zone_id": "XXXXXXXXXXXXXX", "domain_name": "feat.XXXXXXXXXXXXXX.dev.XXXXXXX.com", "log_level": "DEBUG", "sender_email": "XXXXXXX@dev.XXXXXXX.com" }, "dev": { "hosted_zone_id": "XXXXXXXXXXXXXX", "domain_name": "XXXXXXXXXXXXXX.dev.XXXXXXX.com", "log_level": "INFO", "sender_email": "XXXXXXX@dev.XXXXXXX.com" }, "prod": { "hosted_zone_id": "XXXXXXX", "domain_name": "XXXXXXXXXXXXXX.XXXXXXX.com", "log_level": "WARNING", "sender_email": "XXXXXXX@XXXXXXX.com" } }

ステージごとの変数はcdk.context.jsonに定義します

import boto3 import os from aws_lambda_powertools import Logger from email_send_sqs_modules.email_send_sqs_processor import EmaiSendSqsProcessor logger = Logger() ses_client = boto3.client("ses", region_name=os.environ["SES_REGION"]) email_send_sqs_processor = EmaiSendSqsProcessor( ses_client=ses_client ) @logger.inject_lambda_context(log_event=True) def lambda_handler(event, context): records = event["Records"] for record in records: try: email_send_sqs_processor.email_send_sqs_processor( record=record ) except Exception: logger.exception(record["body"])

SQSのeventはRecordsにリスト型で入ってくるのでループで一件ずつ処理を行います

 

import os from aws_lambda_powertools import Logger from common_modules.ses.ses_module import SesModule from email_send_sqs_modules.type_1_sender import Type1Sender from email_send_sqs_modules.type_2_sender import Type2Sender dirname = os.path.dirname(__file__) logger = Logger() class EmaiSendSqsProcessor: def __init__( self, ses_client ) -> None: self.ses_module = SesModule( ses_client=ses_client ) def email_send_sqs_processor(self, record): message_group_id = record["attributes"]["MessageGroupId"] if message_group_id == "message_type_1": Type1Sender( ses_module=self.ses_module ).sender_processor( record=record ) elif message_group_id == "message_type_2": Type2Sender( ses_module=self.ses_module ).sender_processor( record=record ) else: logger.warning( f"message_group_id: {message_group_id} is not process target")

message_group_idによって、送信内容のカテゴリーを分けて処理します

 

import os import json from aws_lambda_powertools import Logger from common_modules.common.util import Common dirname = os.path.dirname(__file__) logger = Logger() class Type1Sender: def __init__( self, ses_module ) -> None: self.ses_module = ses_module def sender_processor(self, record): body = json.loads(record["body"]) recipient = body.get("email") sender = os.environ["SENDER_EMAIL"] body_contents = self.get_body_contents(body) logger.info( f"recipient: {recipient}, sender: {sender}, body_contents: {body_contents}" ) self.ses_module.send_email( recipient=recipient, sender=sender, charset="UTF-8", body_text=body_contents["BODY_TEXT"], body_html=body_contents["BODY_HTML"], subject=body_contents["SUBJECT"] ) @classmethod def get_body_contents(cls, body): organization_name = body["organization_name"] user_name = body["user_name"] test_label = "" if os.environ["STAGE"] == "prod" else "【テスト】" body_content = {} body_content["SUBJECT"] = test_label + \ "【XXXサービス】お知らせ" body_content["BODY_TEXT"] = ( f"{organization_name} {user_name}様\r\n\r\n" "XXXXXのお知らせします\r\n" ) body_content["BODY_HTML"] = ( "<html>" "<head></head>" "<body>" f"{organization_name} {user_name}様<br>" "XXXXXのお知らせします<br>" "</body>" "</html>" ) return body_content

ここでHTML用とTEXT用で二種類メールの内容を作っています

recipientに送信先emailアドレス、senderに送信元emailアドレスを設定します

 

class SesModule: def __init__( self, ses_client ) -> None: self.ses_client = ses_client def send_email( self, recipient, sender, charset, body_text, subject, body_html=None, ): message_content = { "Body": { "Text": { "Charset": charset, "Data": body_text, }, }, "Subject": { "Charset": charset, "Data": subject, }, } if body_html is not None: message_content["Body"]["Html"] = { "Charset": charset, "Data": body_html, } response = self.ses_client.send_email( Destination={ "ToAddresses": [ recipient, ], }, Message=message_content, Source=sender ) return response

SES処理の共通部分はモジュール化しています

ここでは、boto3.clientから取得したses_clientを使い、send_emailでメールを送信します

 

CDKはモジュール化すると使い回しに便利だと思います

コンソールでSESの初期設定を行うところだけCDKで一括立ち上げできないので、そこが手間でした