Skip to content

cosicosilife.com

ライフハックについていろいろ書きます

Primary Menu
  • サイトマップ
  • クラウド技術
  • 安全資産投資
  • 旅行
  • ライフハック
  • プライバシーポリシー
  • 運営者情報とご連絡先
  • Home
  • クラウド技術
  • 【初心者向け】AWS Kinesis + Lambda + S3 + Athenaによるリアルタイムデータ分析の構築手順(CloudFormationテンプレート付き)
  • クラウド技術

【初心者向け】AWS Kinesis + Lambda + S3 + Athenaによるリアルタイムデータ分析の構築手順(CloudFormationテンプレート付き)

david 2025年5月20日
aws

こんにちは!今回は、AWSを使ってリアルタイムにデータを収集・保存・分析するシステムを、初心者の方でも理解できるように丁寧に解説します。

この記事では、以下のような構成を作ります:

この記事の目次(クリックしたらジャンプ)

Toggle
  • 🔧 システム全体構成図
  • ✅ 用語の解説
  • 🧱 CloudFormationテンプレート
  • 各サービスの説明と連携
    • AWS Lambda
    • Amazon Kinesis Data Streams
    • Amazon Kinesis Data Firehose
    • Amazon S3
    • AWS Glue(データカタログとクローラー)
    • Amazon Athena
  • Lambda関数の実装例(Python)
  • Glue と Athena による自動スキーマ認識とクエリ
  • 🗃️ S3に保存されたデータをAthenaで分析する簡単な実行例
    • ステップ1:Glue Crawlerを作成してS3のデータをカタログ化する
    • ステップ2:AthenaでSQLクエリを実行して分析する
  • 🔚 まとめ

🔧 システム全体構成図

[送信スクリプト] → [Kinesis Data Stream] → [Lambda関数] → [S3] → [Athenaでクエリ分析]

これらの構成を、AWS CloudFormation(以下CFn)という仕組みを使って、一括で構築できるようにしていきます。


✅ 用語の解説

AWS Lambda:ログ生成・取得や前処理を担うサーバレス関数。イベントを受け取るとデータを整形して Kinesis Data Streams に送信します。

Amazon Kinesis Data Streams:高スループットなストリーミングデータサービス。Lambda から送られてきたログレコードをシャードに蓄積し、Firehose のソースとして利用します。

Amazon Kinesis Data Firehose:ストリーミングデータを取り込み、指定したストレージ(ここでは S3)に配信するサービス。Kinesis Stream からのデータを受け取り、バッファリングしつつファイルに書き出しますまた Firehose ではデータ変換機能があり、JSON を Parquet/ORC に変換して効率を高めることも可能です

Amazon S3:オブジェクトストレージ。Firehose により出力されたログデータを格納しますS3 上では日付やサービス名などでプレフィックス(パーティション)分けすることで、Athena クエリの性能向上やコスト削減が期待できます。

AWS Glue:サーバレスなデータ準備・カタログサービス。Glue クローラーを使い S3 上の新規データを走査してスキーマを自動推定し、Glue Data Catalog にテーブル定義を作成しますこうして作られたテーブルを Athena が参照してクエリを実行できます。

Amazon Athena:サーバレスな対話型クエリサービス。Glue Data Catalog 上のテーブルを使い、S3 上のデータに対して標準的なSQLで分析します。CloudFormation では Athena のワークグループや NamedQuery を定義しておくことも可能です


🧱 CloudFormationテンプレート

以下は、Kinesis + Lambda + S3 を構成するCloudFormationテンプレートの一例です。

AWSTemplateFormatVersion: '2010-09-09'
Description: >
  Kinesis + Firehose + S3 + Glue + Lambda 構成
  - Lambda で Kinesis にサンプルデータ送信
  - Firehose 経由で GZIP 圧縮し S3 保存
  - Glue で Crawler 実行 → Athena クエリ可能

Resources:

  # S3バケット(GZIPログ保存用)
  MyS3Bucket:
    Type: AWS::S3::Bucket
    Properties:
      VersioningConfiguration:
        Status: Enabled

  # Kinesisストリーム(1シャード)
  KinesisStream:
    Type: AWS::Kinesis::Stream
    Properties:
      ShardCount: 1

  # Firehose用IAMロール
  FirehoseRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: firehose.amazonaws.com
            Action: sts:AssumeRole
      Policies:
        - PolicyName: FirehosePolicy
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - s3:PutObject
                  - s3:PutObjectAcl
                  - s3:GetBucketLocation
                Resource:
                  - !GetAtt MyS3Bucket.Arn
                  - !Sub "${MyS3Bucket.Arn}/*"
              - Effect: Allow
                Action:
                  - kinesis:DescribeStream
                  - kinesis:GetShardIterator
                  - kinesis:GetRecords
                Resource: !GetAtt KinesisStream.Arn

  # Firehose Delivery Stream
  FirehoseDeliveryStream:
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties:
      DeliveryStreamType: KinesisStreamAsSource
      KinesisStreamSourceConfiguration:
        KinesisStreamARN: !GetAtt KinesisStream.Arn
        RoleARN: !GetAtt FirehoseRole.Arn
      ExtendedS3DestinationConfiguration:
        BucketARN: !GetAtt MyS3Bucket.Arn
        RoleARN: !GetAtt FirehoseRole.Arn
        Prefix: logs/web_access/
        CompressionFormat: GZIP
        BufferingHints:
          IntervalInSeconds: 60
          SizeInMBs: 5

  # Lambda実行ロール
  LambdaExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
            Action: sts:AssumeRole
      Policies:
        - PolicyName: LambdaKinesisPolicy
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: Allow
                Action:
                  - kinesis:PutRecord
                  - kinesis:PutRecords
                Resource: !GetAtt KinesisStream.Arn
              - Effect: Allow
                Action:
                  - logs:CreateLogGroup
                  - logs:CreateLogStream
                  - logs:PutLogEvents
                Resource: "*"

  # Lambda関数(サンプルデータをKinesisへ送信)
  SampleDataLambda:
    Type: AWS::Lambda::Function
    Properties:
      Handler: index.lambda_handler
      Runtime: python3.12
      Role: !GetAtt LambdaExecutionRole.Arn
      Timeout: 10
      Code:
        ZipFile: |
          import boto3
          import json
          import os
          import random
          import time

          def lambda_handler(event, context):
              kinesis = boto3.client('kinesis')
              data = {
                  "ip": f"192.168.1.{random.randint(1, 254)}",
                  "timestamp": int(time.time()),
                  "user_agent": "Mozilla/5.0"
              }
              kinesis.put_record(
                  StreamName=os.environ["KINESIS_STREAM_NAME"],
                  Data=json.dumps(data),
                  PartitionKey="partitionKey"
              )
              return {"statusCode": 200, "body": "Data sent"}

      Environment:
        Variables:
          KINESIS_STREAM_NAME: !Ref KinesisStream

  # EventBridgeルール(Lambdaを1分ごとに実行)
  SampleDataScheduleRule:
    Type: AWS::Events::Rule
    Properties:
      ScheduleExpression: rate(1 minute)
      Targets:
        - Arn: !GetAtt SampleDataLambda.Arn
          Id: "TargetLambdaFunction"

  # Lambda に EventBridge 実行権限付与
  LambdaInvokePermission:
    Type: AWS::Lambda::Permission
    Properties:
      FunctionName: !Ref SampleDataLambda
      Action: lambda:InvokeFunction
      Principal: events.amazonaws.com
      SourceArn: !GetAtt SampleDataScheduleRule.Arn

  # Glueのロール(S3 + Glue用)
  GlueServiceRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Principal:
              Service: glue.amazonaws.com
            Action: sts:AssumeRole
      Policies:
        - PolicyName: GlueAccess
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: Allow
                Action:
                  - s3:GetObject
                  - s3:PutObject
                  - s3:ListBucket
                Resource:
                  - !GetAtt MyS3Bucket.Arn
                  - !Sub "${MyS3Bucket.Arn}/*"
              - Effect: Allow
                Action:
                  - logs:CreateLogGroup
                  - logs:CreateLogStream
                  - logs:PutLogEvents
                Resource: "*"
              - Effect: Allow
                Action:
                  - glue:*
                Resource: "*"

  # Glue Database
  GlueDatabase:
    Type: AWS::Glue::Database
    Properties:
      CatalogId: !Ref AWS::AccountId
      DatabaseInput:
        Name: web_access_db

  # Glue Crawler
  GlueCrawler:
    Type: AWS::Glue::Crawler
    Properties:
      Name: web-access-crawler
      Role: !GetAtt GlueServiceRole.Arn
      DatabaseName: !Ref GlueDatabase
      Targets:
        S3Targets:
          - Path: !Sub "s3://${MyS3Bucket}/logs/web_access/"
      SchemaChangePolicy:
        UpdateBehavior: UPDATE_IN_DATABASE
        DeleteBehavior: LOG

  # Glue Trigger(毎時 Crawler 実行)
  GlueCrawlerScheduleTrigger:
    Type: AWS::Glue::Trigger
    Properties:
      Name: web-access-trigger
      Type: SCHEDULED
      Schedule: "cron(0 * * * ? *)"  # 毎時0分
      Actions:
        - CrawlerName: !Ref GlueCrawler
      StartOnCreation: true

Outputs:

  S3BucketName:
    Description: ログ保存用S3バケット名
    Value: !Ref MyS3Bucket

  KinesisStreamName:
    Description: Kinesis Stream名
    Value: !Ref KinesisStream

  FirehoseName:
    Description: Firehose Delivery Stream名
    Value: !Ref FirehoseDeliveryStream

  GlueDatabaseName:
    Description: Glue データベース名
    Value: !Ref GlueDatabase

  AthenaQueryExample:
    Description: Athena で実行可能なクエリ例
    Value: |
      SELECT * FROM "web_access_db"."logs_web_access" LIMIT 10;


各サービスの説明と連携

AWS Lambda

AWS Lambda はイベント駆動型のサーバレス関数で、ここではログを受け取って処理し、Kinesis Data Streams に送信する役割を担います。Lambda 関数内では AWS SDK (boto3) を使い、kinesis.put_record でストリームにデータを送信できます。例えば、JSON化したログメッセージを put_record の Data パラメータに渡し、PartitionKey でシャード分散を指定します。CloudFormation では AWS::Lambda::Function リソースで関数を定義し、IAM ロールに対して Kinesis への書き込み権限(kinesis:PutRecord)を付与します。

Amazon Kinesis Data Streams

Kinesis Data Streams はスケーラブルなストリーミングデータサービスで、ストリーム名・シャード数を指定して作成します。Lambda 関数から送られた各レコードは JSON 文字列などで put_record メソッドを用いてストリームに書き込まれます.

これにより、データは指定したシャードに蓄積され、後続処理で並列に取り出すことが可能です。CloudFormation では AWS::Kinesis::Stream リソースを使い、ShardCount や保持期間などを設定します。Kinesis ストリームにデータが入ると、次段の Firehose がこのストリームをソース(DeliveryStreamType: KinesisStreamAsSource)として読み込みます

Amazon Kinesis Data Firehose

Kinesis Data Firehose(配信ストリーム)は、ストリーミングデータを自動的に指定先へ書き出すサービスです。CloudFormation では AWS::KinesisFirehose::DeliveryStream で定義し、KinesisStreamSourceConfiguration にストリームの ARN を指定して Kinesis ストリームをソースとします。Firehose は受信したレコードを一定量または一定時間ごとにまとめてバッファリングし、ExtendedS3DestinationConfiguration で指定した S3 バケットにファイルとして出力します。Firehose にはデータ変換機能もあり、JSONデータを列指向フォーマット(Parquet/ORC)に変換して Athena クエリを高速化できます。例として、JSON レコードを Parquet に変換するよう設定すれば、保存容量とクエリコストを削減できます。Firehose の S3 出力設定では、バッファサイズや圧縮方式、ファイル名のプレフィックスなどを細かく指定します。

Amazon S3

Amazon S3 は長期的なオブジェクトストレージであり、Firehose から受け取ったログデータを蓄積する役割を担います。CloudFormation では AWS::S3::Bucket でバケットを作成し、Firehose の出力先にその ARN を指定します。S3 では、Firehose が吐き出すJSONファイルやParquetファイルを保存します。一般にログ分析では、/year=YYYY/month=MM/day=DD/ のように日付別のプレフィックスを付けることで、Athena のクエリ効率を上げることができます。また、Firehose の「S3 バックアップ」モードを有効化しておくと、配信失敗したレコードを別バケットに保存するなどの冗長化も可能です。

AWS Glue(データカタログとクローラー)

AWS Glue はサーバレスの ETL およびデータカタログサービスです。ここでは主に Glue データカタログを利用し、S3 に保存されたログのスキーマを管理します。CloudFormation では AWS::Glue::Database でデータベースを作成し、AWS::Glue::Crawler でクローラーを設定します。クローラーの Targets.S3Targets.Path にログ保存先の S3 パスを指定し、実行すると S3 上のファイルを走査してテーブルとカラムを自動生成します。また、TablePrefix でテーブル名のプレフィックスを設定し、SchemaChangePolicy で列追加時の動作(既存テーブルへの更新許可など)を指定できます。例えば以下のように設定します:

# Glue Crawler 定義例 (CloudFormation YAML)
GlueCrawler:
  Type: AWS::Glue::Crawler
  Properties:
    Name: !Sub Crawler-${AWS::AccountId}-${AWS::Region}
    Role: <GlueCrawler実行ロールARN>
    DatabaseName: !Ref GlueDatabase
    Targets:
      S3Targets:
        - Path: !Ref DataS3BucketPath    # ログデータのS3パス
    TablePrefix: 'my_logs_'
    SchemaChangePolicy:
      UpdateBehavior: UPDATE_IN_DATABASE
      DeleteBehavior: LOG

この例では、S3 の指定パスをターゲットにし、my_logs_ というプレフィックスでテーブルを作成します。SchemaChangePolicy で UPDATE_IN_DATABASE を設定すると、クローラー実行時にスキーマが更新されてもテーブル定義に自動的に反映されます。さらに詳細には、クローラー出力設定で "AddOrUpdateBehavior": "MergeNewColumns" を指定しておけば、新しい列が追加されてもマージされます

Amazon Athena

Amazon Athena は S3 のデータを SQL でクエリするサーバレスサービスです。Glue Data Catalog 上のテーブルを元に、Presto SQL ライクなクエリを実行できます。Athena は内部で Glue のメタデータを参照してデータを処理するため、ユーザーは通常のデータベースと同様にテーブルに対して SELECT 文を発行できます CloudFormation では AWS::Athena::WorkGroup でワークグループを作成し、出力場所(S3)や暗号化を設定できます。また、AWS::Athena::NamedQuery を使ってよく使うクエリを事前に定義することも可能です。例えば以下のように定義すると、Glue で作成されたデータベース・テーブルに対して SQL を実行できます

# Athena NamedQuery 定義例 (CloudFormation YAML)
SampleQuery:
  Type: AWS::Athena::NamedQuery
  Properties:
    Name: "Sample-Query"
    Database: !Ref GlueDatabase
    QueryString: >-
      SELECT * 
      FROM "mylogsdb"."my_logs_sample"
      ORDER BY event_time DESC;
    WorkGroup: !Ref AthenaWorkGroup

この例では、Glueデータベース mylogsdb のテーブル my_logs_sample に対して全件取得クエリを定義しています。こうして定義したクエリは Athena コンソールや API から実行できます。

Lambda関数の実装例(Python)

Lambda 関数内では Python を使い、boto3 ライブラリで Kinesis Data Streams にデータを送信します 以下はサンプルコードです:

import boto3
import json
import datetime

kinesis = boto3.client('kinesis')

def lambda_handler(event, context):
    # イベントからログデータを生成または整形
    data = {
        'message': 'サンプルログデータ',
        'timestamp': datetime.datetime.utcnow().isoformat()
    }
    # Kinesis Data Streams に JSON 文字列を送信
    response = kinesis.put_record(
        StreamName='MyKinesisStream',
        Data=json.dumps(data),         # JSONにシリアライズ
        PartitionKey='partition_key'   # シャード分散のキー
    )
    print("PutRecord Response:", response)

このコードでは、boto3.client('kinesis') でクライアントを作成し、put_record メソッドにストリーム名、JSON文字列、パーティションキーを渡しています。PartitionKey は同じ値のレコードを同じシャードに送るキーで、例では固定値ですが日時やユーザーIDなど動的にすることもできます。レスポンスには書き込み先シャードIDやシーケンス番号が含まれます。CloudFormation ではこの Lambda に対し、Kinesis ストリームへの PutRecord 権限を持つ IAM ロールを割り当てます。

Glue と Athena による自動スキーマ認識とクエリ

Glue クローラーを実行すると、S3 バケット内の新規データを自動的に走査してスキーマを推定し、Glue Data Catalog にテーブル定義が作成されます。これにより、データのスキーマを手動で作成せずに済むため、素早く分析を始められます。Glue ではディレクトリ構成に基づくパーティションも自動生成でき、例えば年月日フォルダ毎にパーティションを分けていた場合はクローラーがそれを検出してテーブルに組み込みます。さらに、CloudFormation の Crawler 設定でパーティション更新ポリシーを有効化すれば、新たなパーティションが現れた際にも自動でテーブルに反映されます

Athena では、Glue カタログのテーブルを「テーブル」として SQL クエリが可能です たとえば、SELECT 文でグループ集計を行ったり、特定条件でフィルタリングできます。Athena のクエリ実行結果は指定した S3 バケットにファイル出力され、コンソール上にテーブル形式で表示されます。また、Athena は Glue カタログを参照してデータを処理するため、スキーマ変更があっても Glue テーブルを最新化することでシームレスに分析が行えます。以下のように、Glue で作成されたデータベースとテーブルに対して SQL を記述できます(前節の NamedQuery 定義例も参照)

🗃️ S3に保存されたデータをAthenaで分析する簡単な実行例

ステップ1:Glue Crawlerを作成してS3のデータをカタログ化する

  1. AWSコンソールから「Glue」を開きます。
  2. 左側のナビゲーションで「クローラー」をクリック → 「クローラーの追加」を選択します。

Run Crowlerを実行すれば、テーブルが作成、もしくは既存のテーブルがアップデートされます

問題なければ成功するはずです

ステップ2:AthenaでSQLクエリを実行して分析する

  1. AWSコンソールで「Athena」を開きます。
  2. クエリエディタで、先ほどGlueクローラーで作成されたデータベースを選択します。
  1. 以下のようなSQLクエリを実行します:
SELECT * FROM web_access LIMIT 100;

無事クエリできたら下記のようになります


🔚 まとめ

このブログでは、以下のステップを通じてAWSによるリアルタイムデータ処理基盤を構築しました:

  1. CloudFormationでKinesis + Lambda + S3を自動構築
  2. Lambdaで処理しS3へ保存
  3. Glue + AthenaでSQL分析

最初は難しく感じるかもしれませんが、CloudFormationテンプレートを活用することで、構成の再現性が高まり、学習・運用の効率が大きく向上します。

ご希望があれば、この構成に API Gateway や リアルタイム可視化(QuickSight) を追加したり、 Parquet形式 に変換する例などもご紹介できます。

AWS初心者の方も、ぜひステップバイステップで挑戦してみてください!

Continue Reading

Previous: オンプレのGrafana で DyanmoDBの内容を可視化する
Next: Argo CD に LAN 内の IP アドレスとポートでアクセスする方法(初心者向け)

Related Stories

A realistic whale with the word Docker in a manga style, without making the eyes too cute
  • クラウド技術

david 2025年6月2日
A realistic whale with the word Docker in a manga style, without making the eyes too cute
  • クラウド技術

Ubuntu上にArgo CDをインストールし、ブラウザアクセスできるようにする手順(k8s.cosicosilife.com対応)

david 2025年6月1日
A realistic whale with the word Docker in a manga style, without making the eyes too cute
  • クラウド技術

【初心者向け】Argo CD に初めてのアプリケーションをデプロイしてみよう!

david 2025年6月1日
  • argocdの導入
  • (タイトルなし)
  • オンプレUbuntuでHelm中心にKubernetes運用・CI/CD構築
  • Ubuntu上にArgo CDをインストールし、ブラウザアクセスできるようにする手順(k8s.cosicosilife.com対応)
  • 【初心者向け】Argo CD に初めてのアプリケーションをデプロイしてみよう!
  • サイトマップ
  • クラウド技術
  • 安全資産投資
  • 旅行
  • ライフハック
  • プライバシーポリシー
  • 運営者情報とご連絡先
  • サイトマップ
  • クラウド技術
  • 安全資産投資
  • 旅行
  • ライフハック
  • プライバシーポリシー
  • 運営者情報とご連絡先
Copyright © Cosicosilife | MoreNews by AF themes.