実務でIoT Eventsを触ったので活用例を記事にしてみた
概要
IoT Eventsを使う機会があったので活用例として紹介したいと思います。
要件
- データのタイプは複数である
- 異常が起きた時間、異常が終わった時間が分かるようにデータ化する
- リアルタイムに処理する
- ストリームで送られてくるデータは識別コードと0,1のデータのみ(0 => 正常、1 => 異常)
やりたいこと
識別コードごとにデータを保持し、それぞれ異常が発生した時間、異常が終わった時間をひとつのレコードとして保存し、蓄積していきたい。
例) typeが「hoge」のデータが
0 -> 0 -> 1 -> 1 -> 0 -> 0 -> 1 -> 1 -> 0
のように送られてきたら、
{ id: 1, type: "hoge", start_timestamp: 1650287719, end_timestamp: 1650291319, }, { id: 2, type: "hoge", start_timestamp: 1650301300, end_timestamp: 1650301500, }
のようなレコードを生成する
このような時、typeなどのキーに対して状態を保持する処理を担うものにIot Eventsを活用することができました。
アーキテクチャ例
Kinesis Data Streamからリアルタイムで送られてくるデータを前処理Lambdaで加工してIoT Eventsへpushし、IoT Eventsでステート管理し、後続のLambdaへと渡し、DBへ保存しています
IoT Eventsの検知器イメージ
AWS IoT Eventsでは以下図のようにGUIで検知器を設定することができます。 また設定した検知器の設定はテンプレートとして出力することができ、CDKなどの設定に役立てることもできます。 検知器モデルではStart(開始位置)、State(状態)、Transition event(遷移条件)の3つで構成されます。 以下スクショでは、Normal, Errorと書かれているものがState、to_error, to_normalはTransition eventです。
以下のようなJSONオブジェクトをAWS CLIかAWSコンソールから直接入力することができ、まずはStartに入りtypeの値をkeyとして状態が保持されます。
{ "messages": [ { "inputName": "events_input", "messageId": "e3cf47d2-94ef-4d7a-b5c8-ce6709f77503", "payload": "{\"status\":1,\"type\":\"1-1-events-0\",\"id\":\"test0001\",\"timestamp\":1639560046000}" } ] }
NormalからErrorへ遷移のルールは、statusが 0->1 ErrorからNormalへは、statusが 1->0 として設定しています。 なので、上記のレコードがinputされた場合、statusが1なのでto_errorを経由してErrorに遷移することになります。
また、to_errorやto_normalを経由することでSNSへデータをpublishするなど様々なeventを起こすことができます。直接Lambdaに送信することも可能です。
Normalへの遷移は、
{ "messages": [ { "inputName": "events_input", "messageId": "b4cf45d2-04ee-4d7a-5sdc5-c56302f47608", "payload": "{\"status\":0,\"type\":\"1-1-events-0\",\"id\":\"test0002\",\"timestamp\":1639560047000}" } ] }
のように同じtypeかつstatusが1のレコードが送られてくるとStateがNormalに遷移されます。
ひとまずここまで、次はCDK使って一括で構成する方法を掲載したいと思います。