はじめに
前提条件
S3の設定



IAMの設定

「ポリシーをアタッチ」をクリック

許可ポリシーで下記の4つを選択
AmazonRekognitionFullAccess
AmazonS3FullAccess
AmazonDynamoDBFullAccess
- ComprehendFullAccess
許可を追加 で許可をクリック

Lambdaのコード修正
既存のコードを書き直します
import json
import boto3
from datetime import datetime
import os
import traceback
import re
from boto3.dynamodb.conditions import Key, Attr
from linebot import LineBotApi
from linebot.models import TextSendMessage
from linebot.exceptions import LineBotApiError
# 環境変数から設定値を取得
LINE_CHANNEL_ACCESS_TOKEN = os.environ.get('LINE_CHANNEL_ACCESS_TOKEN')
LINE_BOT_API = LineBotApi(LINE_CHANNEL_ACCESS_TOKEN)
IS_TEST_ENV = os.environ.get('IS_TEST_ENV', 'false').lower() == 'true'
# AWSリソースの初期化
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('LINE_BOT2')
s3 = boto3.client('s3')
rekognition = boto3.client('rekognition')
# S3バケット名(事前に作成)
S3_BUCKET_NAME = os.environ.get('S3_BUCKET_NAME', 'your-s3-bucket-name')
def lambda_handler(event, context):
print(f"Received event: {json.dumps(event, indent=2)}")
try:
if 'body' not in event:
raise KeyError("'body' key is missing in the event")
try:
body = json.loads(event['body'])
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON in event body: {str(e)}")
if 'events' not in body:
raise KeyError("'events' key is missing in the request body")
for line_event in body['events']:
handle_event(line_event)
return {
'statusCode': 200,
'body': json.dumps('OK')
}
except Exception as e:
print(f"Unexpected error: {str(e)}")
print(traceback.format_exc())
return error_response(500, "Internal Server Error")
def handle_event(event):
try:
user_id = event['source']['userId']
if event['type'] == 'message':
if event['message']['type'] == 'text':
message_text = event['message']['text']
# 「検索」形式のメッセージをチェック
search_match = re.match(r'検索\s+((\d{8}-\d{8})\s+)?(.+)', message_text)
if search_match:
date_range = search_match.group(2)
keyword = search_match.group(3)
search_messages(event.get('replyToken'), user_id, keyword, date_range)
else:
save_to_dynamodb(user_id, message_text)
send_response(event.get('replyToken'), "メッセージを保存しました。")
elif event['message']['type'] == 'image':
handle_image_message(event, user_id)
except Exception as e:
print(f"Unexpected error in handle_event: {str(e)}")
print(traceback.format_exc())
def handle_image_message(event, user_id):
try:
message_id = event['message']['id']
content = LINE_BOT_API.get_message_content(message_id)
# S3に画像アップロード
file_name = f"{user_id}/{message_id}.jpg"
s3.put_object(
Bucket=S3_BUCKET_NAME,
Key=file_name,
Body=content.content,
ContentType='image/jpeg'
)
# Rekognitionで画像解析
response = rekognition.detect_labels(
Image={'S3Object': {'Bucket': S3_BUCKET_NAME, 'Name': file_name}},
MaxLabels=10,
MinConfidence=70
)
labels = [label['Name'] for label in response['Labels']]
# DynamoDBに保存
save_to_dynamodb(user_id, f"Image analyzed: {', '.join(labels)}")
# 結果をLINEに送信
send_response(event.get('replyToken'), f"画像解析結果: {', '.join(labels)}")
except Exception as e:
print(f"Error handling image message: {str(e)}")
print(traceback.format_exc())
def save_to_dynamodb(user_id, message_text):
try:
current_time = datetime.now()
timestamp = int(current_time.timestamp() * 1000)
item = {
'userId': user_id,
'timestamp': timestamp,
'message': message_text,
'date': current_time.strftime('%Y-%m-%d'),
'time': current_time.strftime('%H:%M:%S')
}
table.put_item(Item=item)
except Exception as e:
print(f"Error saving to DynamoDB: {str(e)}")
print(traceback.format_exc())
def search_messages(reply_token, user_id, keyword, date_range=None):
try:
keyword = keyword.lower()
filter_expr = Attr('userId').eq(user_id)
if date_range:
start_date, end_date = date_range.split('-')
filter_expr &= Attr('date').between(
datetime.strptime(start_date, '%Y%m%d').strftime('%Y-%m-%d'),
datetime.strptime(end_date, '%Y%m%d').strftime('%Y-%m-%d')
)
response = table.scan(FilterExpression=filter_expr)
messages = [
m for m in response['Items']
if keyword in m['message'].lower()
]
if messages:
result_text = f"「{keyword}」を含むメッセージ:\n" + "\n".join(
[f"{m['date']} {m['time']}: {m['message']}" for m in messages]
)
else:
result_text = f"指定条件でメッセージが見つかりませんでした。"
send_response(reply_token, result_text)
except Exception as e:
print(f"Error searching messages: {str(e)}")
print(traceback.format_exc())
def send_response(reply_token, message):
if IS_TEST_ENV:
print(f"テスト環境: 以下のメッセージを送信します\n{message}")
else:
if reply_token:
try:
LINE_BOT_API.reply_message(
reply_token,
TextSendMessage(text=message)
)
except LineBotApiError as e:
print(f"Error sending message to LINE: {str(e)}")
def error_response(status_code, message):
return {
'statusCode': status_code,
'body': json.dumps({'error': message})
}
書き直したら Deploy ボタンを押します
