
はじめに
前回はLineの投稿内容をDYNAMODBに入れてみましたが、今回はそれをストリームにして、可視化しようと思います。
DynamoDB Strema とは
DynamoDB Streamsは、DynamoDBテーブル内でのデータの変更(追加、更新、削除)をリアルタイムで検出し、その変更内容をストリームとして提供する機能です。これにより、他のサービスがDynamoDBの変更をトリガーとして処理を開始することができます。
例:
- LINEからの新しい投稿がDynamoDBに追加されると、その変更がDynamoDB Streamsに記録されます。
Kinesis Data Firehose:データの輸送と変換
Kinesis Data Firehoseは、リアルタイムでデータを収集し、指定した保存先(例:Amazon S3)に自動的に配信するサービスです。また、データを保存する前に変換処理(例:JSONからCSVへの変換)を行うことも可能です。
Amazon Athena:保存されたデータの分析
Amazon Athenaは、Amazon S3に保存されたデータに対して、SQLを使用して直接クエリを実行できるサービスです。これにより、データを移動させることなく、迅速に分析を行うことができます。
例:
- S3に保存されたLINEの投稿データをAthenaでクエリし、特定のキーワードを含む投稿の数を集計するなどの分析が可能です。
🔗 サービス連携の全体像
データの配信:Kinesis Data FirehoseがDynamoDB Streamsからのデータを受け取り、必要に応じて変換し、Amazon S3に保存します。
データの追加:ユーザーがLINEに投稿すると、そのデータがDynamoDBに保存されます。
変更の検出:DynamoDB Streamsが新しいデータの追加を検出します。
- DynamoDB Streamsからのデータ変更通知を受け取り、そのデータを整形してAmazon S3に保存します。
DynamoDBでストリームをONにする
DynamoDBの「エクスポートおよびストリーム」のタブをクリック

「オン」にするボタンをおします

新旧イメージを選択してオンにします

Kinesis Data Stream
Firehoseストリームを作成。ソースはDirect PUT 送信先はS3 、ストリーム名は適当につけてください

これらはチェック不要

データストリームを作成します
S3は今回は新規作成、汎用で十分

ここは有効にしなくて問題ない

この設定で作成

チェックしたら上記の既存IAMロールにはFirehouseがアタッチされていなかったのでアタッチします

正常に作成できるはずです

Lambda関数の作成
Lambdaで関数を作成します。もらったデータをS3に渡すためのコードとなります

適当な関数名をつけて、ランタイムを指定します

環境変数に 先ほど作成したFirehose ストリーム名を入れる

コードを書きます
import boto3 # AWSの各種サービスを使うためのライブラリ
import base64 # Base64形式のデータを扱うため(今回は使っていないが、他の用途で使う可能性あり)
import json # JSON形式のデータを扱うためのライブラリ
import os # 環境変数(Lambdaの設定など)を使うためのライブラリ
# Kinesis Data Firehoseのクライアントを作成
firehose = boto3.client('firehose')
# 環境変数からFirehoseの配信ストリーム名を取得(Lambdaの設定画面で指定しておく必要があります)
DELIVERY_STREAM_NAME = os.environ['FIREHOSE_STREAM']
def lambda_handler(event, context):
# DynamoDB Streams から受け取ったレコードを1つずつ処理する
for record in event['Records']:
# 必要な情報だけを抽出して、辞書型の payload を作成
payload = {
'eventID': record['eventID'], # 変更イベントの一意なID
'eventName': record['eventName'], # 追加(INSERT), 更新(MODIFY), 削除(REMOVE)などのイベント種別
'dynamodb': record['dynamodb'], # 実際に変更されたDynamoDBのデータ(新旧両方含む)
'eventSource': record['eventSource'] # イベントの発生元(通常は "aws:dynamodb")
}
# 作成したpayloadをJSON形式の文字列に変換してFirehoseに送信
firehose.put_record(
DeliveryStreamName=DELIVERY_STREAM_NAME, # 送信先のFirehose配信ストリーム名
Record={
'Data': json.dumps(payload) + "\n" # 改行を入れて複数レコードを区切れるようにする
}
)
# 最後に、ステータスコード200(成功)を返す
return {'statusCode': 200}
次にトリガーを設定します

トリガーの追加をクリック、DynamoDBを選択します。DBテーブルは作成済みのものを選択

DynamoDB Updte を使ってテストしてみます

テストは成功するはずです

成功したらS3にJSONデータが入ってくるはずです

テストに成功していれば下記のようなデータが入ってくるはずです

Athena でクエリ作成
次にAthanaでクエリを作成します

クエリ結果の場所としてS3をさします

S3を選択

下記のクエリを実行します
CREATE EXTERNAL TABLE IF NOT EXISTS default.line_bot2_events (
eventid string,
eventname string,
eventsource string,
dynamodb struct<
Keys: map<string, struct<S:string>>,
NewImage: map<string, struct<S:string>>,
StreamViewType: string,
SequenceNumber: string,
SizeBytes: int
>
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
'ignore.malformed.json' = 'true'
)
LOCATION 's3://linetofirehose/'
TBLPROPERTIES ('has_encrypted_data'='false');
上記を実行後にすべてクエリしてみて、成功すればAthana に関しては成功です

オンプレでGrafanaを動かす
次にDocker でgrafanaを動かします。docker-compose では下記のようなyamlを作成
version: "3.8"
services:
grafana:
image: grafana/grafana:latest
restart: always
volumes:
- grafana-data:/var/lib/grafana
ports:
- "3015:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
- AWS_REGION=us-east-1
networks:
- default
volumes:
grafana-data:
下記のように同一ディレクトリに、.env を入れておいてください。AWSのキーの生成方法については割愛します。下記のように.envを作ります
AWS_ACCESS_KEY_ID=AKIAYS2NWVGTHGBPOQEQ
AWS_SECRET_ACCESS_KEY=UMzt7Rg6V8M/V69PFp5UWDIDIsdfsdfsoINoaBAkE
docker compose up -d で起動します
Grafana Athenaプラグインで可視化
Administration >>> Plugins and Data からAthenaプラグインを導入します

インストールボタンを押してインストールします

Add new data source でデータソースを追加

AWSユーザのAccess key とsecret を入力します

テストはpassするはずです

試しに下記のようなSQLを発行すると
WITH words AS (
SELECT
word
FROM line_bot2_events
CROSS JOIN UNNEST(
regexp_split(LOWER(dynamodb.newimage['message'].s), '\\s+')
) AS t(word)
WHERE dynamodb.newimage['message'].s IS NOT NULL
)
SELECT
word,
COUNT(*) AS count
FROM words
WHERE word <> ''
GROUP BY word
ORDER BY count DESC
LIMIT 50;
Run query で下記のようなクエリができます

結構めんどくさい、もっと簡単なやり方はないだろうか?とは思いましたが、AWS Insight はちょっと個人で使うにはお値段が高いので、Grafana で試してみました