Pythonの並列処理でDynamoDBをリクエストしてみる

#Python
#AWS DynamoDB
#concurrent.futures
#ThreadPoolExecutor

バッチ処理で数種類の大量件数のデータを取得したい、Lambdaを使うので時間制限以内に完了させたい、ということがありました

Pythonの並列処理を調べてみるとマルチプロセス、マルチスレッドのように色々方法があるみたいで、

DBへリクエストするような外部ネットワークアクセスのようなCPUに負荷のかからない処理の場合はマルチスレッドが適しているようでした

import concurrent.futures import boto3 from boto3.dynamodb.conditions import Key, Attr dynamodb = boto3.resource('dynamodb') table = dynamodb.Table(os.environ['TABLE_NAME']) def worker_func(prefix, date): query_input = { { 'IndexName': 'meta-id-index', 'KeyConditionExpression': Key('date').eq( meta) & Key('id').begins_with(f'{prefix}:') } } query_output = table.query(**query_input) accumed_response = [] accumed_response += query_output['Items'] while 'LastEvaluatedKey' in query_output: query_input["ExclusiveStartKey"] = query_output['LastEvaluatedKey'] query_output = table.query(**query_input) accumed_response += query_output['Items'] return accumed_response def create_animal_dictionary(event, context): with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: date = "2022-04-11" f1 = executor.submit( worker_func, date, "Dog") f2 = executor.submit( worker_func, date, "Cat") f3 = executor.submit( worker_func, date, "Bird") dogs = f1.result() cats = f2.result() birds = f3.result() animal_dict = { "dogs": dogs, "cats": cats, "birds": birds, } return animal_dict

まず、DynamoDBには1度のQuery操作で取得できるサイズ上限は1MBでそれ以上データがあると、レスポンスに"LastEvaluatedKey"が入り、リクエストの際、"ExclusiveStartKey"に"LastEvaluatedKey"を代入して再度queryすると続きを取得できます

 

また、マルチスレッドで並列処理をするライブラリにconcurrent.futuresのThreadPoolExecutorを使用します

executor.submit()の第一引数に処理を行う関数、第二引数移行に関数で使う引数を代入します

その返り値で.result()を実行すると各関数が処理を終えるまで待機します

 

あとは取得したデータをCSVファイルを作るのに加工したり、データを集計してランキングを作ったりで活用していきます

今回は以上、