이전 Part 1에서는 실시간 비디오 스트리밍을 위한 기본 인프라를 구축했다. 이번 파트에서는 Amazon Rekognition을 활용한 실시간 얼굴 인식과 IoT 디바이스 제어 구현에 초점을 맞춘다.
Architecture
이번 파트의 핵심
- Rekognition을 통한 얼굴 인식 기능 구현
- Lambda를 통한 인식 결과 처리
- SNS와 IoT Core를 활용한 알림 및 디바이스 제어
이전 Part 1에서 구축한 스트리밍 기반 위에, 위 아키텍처의 빨간 박스 부분을 구현해보자.
Kinesis Video Stream으로 받는 영상으로 Rekognition Collection에 등록된 얼굴인지 Rekognition이 판단 후 Kinesis Data Stream에 전송
1. Faces Collection(Rekognition Collection)만들기
1-1. Rekognition Collection 생성
# Rekognition Collection 생성
aws rekognition create-collection \
--collection-id "face-detect" \
--profile default
# 생성된 Collection 확인
aws rekognition list-collections --profile default
2. Rekognition Collection에 S3 에서 업로드한 얼굴 이미지 등록하기
# S3의 이미지를 Collection에 등록
aws rekognition index-faces \
--image '{"S3Object":{"Bucket":"rekognition-test-img3","Name":"yoo1.jpg"}}' \
--collection-id "face-detect" \
--max-faces 1 \
--quality-filter "AUTO" \
--detection-attributes "ALL" \
--external-image-id "yoo1.jpg" \
--profile default
3. Rekognition Stream Processor 구성
- video stream arn과 kinesis data stream arn 넣기
aws rekognition create-stream-processor \
--name rekognition-video-stream-processor \
--role-arn arn:aws:iam::[YOUR-ACCOUNT-ID]:role/Rekognition-kinesis \
--settings '{"FaceSearch":{"CollectionId":"face-detect"}}' \
--input '{"KinesisVideoStream":{"Arn":"[YOUR-VIDEO-STREAM-ARN]"}}' \
--stream-processor-output '{"KinesisDataStream":{"Arn":"[YOUR-DATA-STREAM-ARN]"}}'
4. 트러블 슈팅
- Rekognirion stream processor를 실행시키던 중 다음과 같은 에러를 만나게 되었다.
4-1. Stream Processor 생성 시 IAM 권한 오류
An error occurred (AccessDeniedException) when calling the CreateStreamProcessor operation:
User: [IAM USER ARN] is not authorized to perform: iam:PassRole on resource:
[IAM ROLE ARN] because no identity-based policy allows the iam:PassRole action
4-2. 문제 원인
- iam:PassRole 권한 누락
4-3. 해결 방법
- IAM 정책에 다음 권한을 추가:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"iam:PassRole"
],
"Resource": [
"[IAM ROLE ARN]"
]
}
]
}
5. Rekognition Stream Processor 실행 및 모니터링
5-1. 스트림 프로세서 시작
aws rekognition start-stream-processor \
--name rekognition-video-stream-processor
스트림 프로세서 결과값 예
{
"SessionId": "0f394373a-43f5-0e3c-932e-345c6c5dgfad"
}
참고: 스트림 프로세서 종료 및 프로세서 세부정보 확인
# 스트림 프로세서 종료
aws rekognition stop-stream-processor \
--name rekognition-video-stream-processor
# 프로세서 상태 확인
aws rekognition describe-stream-processor \
--name rekognition-video-stream-processor
6. Kinesis Data Stream 모니터링
- 다음 Python 스크립트로 실시간 데이터 모니터링이 가능하다.
import boto3
import time
def monitor_stream():
# Kinesis 클라이언트 초기화
kinesis_client = boto3.client('kinesis', region_name='[YOUR_REGION]')
stream_name = '[YOUR_STREAM_NAME]'
# 샤드 ID 조회
response = kinesis_client.describe_stream(StreamName=stream_name)
shard_id = response['StreamDescription']['Shards'][0]['ShardId']
# 초기 샤드 이터레이터 획득
shard_iterator = kinesis_client.get_shard_iterator(
StreamName=stream_name,
ShardId=shard_id,
ShardIteratorType='LATEST'
)['ShardIterator']
while True:
records_response = kinesis_client.get_records(
ShardIterator=shard_iterator,
Limit=10
)
for record in records_response['Records']:
print(record['Data'].decode('utf-8'))
shard_iterator = records_response['NextShardIterator']
time.sleep(1)
if __name__ == "__main__":
monitor_stream()
6번 실행의 결과 값
6-1. 비디오에 얼굴이 감지되지 않은 경우
{
"InputInformation": {
"KinesisVideo": {
"StreamArn": "[KINESIS_VIDEO_STREAM_ARN]",
"FragmentNumber": "[FRAGMENT_NUMBER]",
"ServerTimestamp": 1.725433098685E9,
"ProducerTimestamp": 1.725433095115E9,
"FrameOffsetInSeconds": 0.0
}
},
"StreamProcessorInformation": {"Status": "RUNNING"},
"FaceSearchResponse": []
}
6-2. 비디오에 사전 등록된 얼굴을 감지했을 경우
{
"InputInformation": {"KinesisVideo": {...}},
"StreamProcessorInformation": {"Status": "RUNNING"},
"FaceSearchResponse": [{
"DetectedFace": {
"BoundingBox": {...},
"Confidence": 99.99053,
"Landmarks": [...],
"Pose": {...},
"Quality": {...}
},
"MatchedFaces": [{
"Similarity": 99.74969,
"Face": {
"FaceId": "[FACE_ID]",
"ExternalImageId": "[IMAGE_NAME]",
"Confidence": 99.9997
}
}]
}]
}
6-3. 비디오에 등록되지 않은 얼굴을 감지했을 경우
{
"InputInformation": {
"KinesisVideo": {
"StreamArn": "[KINESIS_VIDEO_STREAM_ARN]",
"FragmentNumber": "[FRAGMENT_NUMBER]",
"ServerTimestamp": 1.725509264843E9,
"ProducerTimestamp": 1.72550926229E9,
"FrameOffsetInSeconds": 0.9990000128746033
}
},
"StreamProcessorInformation": {"Status": "RUNNING"},
"FaceSearchResponse": [{
"DetectedFace": {
"BoundingBox": {
"Height": 0.6725677,
"Width": 0.31205362,
"Left": 0.1554201,
"Top": 0.2967734
},
"Confidence": 99.999344,
"Landmarks": [...],
"Pose": {
"Pitch": 0.22625916,
"Roll": -3.3147352,
"Yaw": -0.2569844
},
"Quality": {
"Brightness": 98.61881,
"Sharpness": 96.61495
}
},
"MatchedFaces": []
}]
}
알림 시스템 구축 - Kinesis Data Stream에서 SNS로 알림을 보내는 lambda함수 생성
1. SNS 생성 및 (이메일) 구독 생성 후 이메일 확인하기
1-1. SNS 주제 생성
- 주제 이름: sns-noti
- 주제 세부정보는 "표준"으로 선택한 뒤 "주제생성"을 클릭
1-2. SNS 구독 생성
- 생성한 주제(sns-noti)의 세부정보 페이지에서 "구독생성"을 클릭
- 구독생성 페이지에서 프로토콜은 "이메일"로, 엔드포인트는 "알림을 받을 이메일 주소"를 입력
1-3. 이메일 확인하기
- 구독 확인 이메일에서 "Confirm subscription"을 클릭.
2. Lambda 함수 설정
2-1. Lambda 함수 생성하기
- 함수 이름: lambda-to-iotcore2
- 런타임: python
2-2. IAM 역할 생성 및 권한 추가
- iot-core를 생성하면 자동으로 생성되는 IAM 역할이 있는데 해당 역할을 찾아서 권한추가의 인라인 정책 생성을 클릭
- 정책 이름: KinesisStreamAccessForLambda
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:GetRecords",
"kinesis:GetShardIterator",
"kinesis:DescribeStream",
"kinesis:DescribeStreamSummary",
"kinesis:ListShards",
"kinesis:ListStreams"
],
"Resource": "[KINESIS_STREAM_ARN]"
}
]
}
2-3. Lambda 트리거 추가
- 트리거 구성으로 kinesis를 클릭
- kinesis 스트림으로 face_detect를 선택
2-4. Lambda 대상 추가
- 소스: "비동기식 호출"
- 조건: "성공 시"
- 대상유형: "SNS 주제"
- 대상: "sns-noti"를 선택
2-5. Lambda에 코드 작성 후 Deploy
import json
import base64
import boto3
def lambda_handler(event, context):
sns_client = boto3.client('sns')
for record in event['Records']:
raw_data = record['kinesis']['data']
decode_data = base64.b64decode(raw_data).decode('utf-8')
json_record = json.loads(decode_data)
face_search_response = json_record.get('FaceSearchResponse', [])
for face_response in face_search_response:
matched_faces = face_response.get('MatchedFaces', [])
for face in matched_faces:
external_image_id = face.get('Face', {}).get('ExternalImageId', "")
if external_image_id == "yoo1.jpg":
print("Sending to SNS topic")
response = sns_client.publish(
TargetArn="[SNS_TOPIC_ARN]",
Message=f"ExternalImageId: {external_image_id} detected",
Subject="Alert: Registered Person Detected"
)
print("SNS response:", response)
return # 감지된 후 함수를 완전히 종료
3. Email 확인하기
- 설정된 이메일 주소로 알림이 전송되는지 확인
앞에서 생성한 Lambda 함수에 Data Stream와 IoT Core를 연결해서 라즈베리 파이로 MQTT 통신을 보내는 기능 추가하기
1. AWS IoT Core 사물 생성
- 사물 이름: kinesis-iot-core-sensor로 설정하여 생성
2. AWS IoT Core 정책 생성 및 인증서에 정책 연결
- 정책 이름: kinesis-sensor-connect-iot-core
- 정책 작업: iot:Connect, iot:Receive, iot:Subscribe 권한 추가
- 인증서 생성 및 연결 후 인증서와 키 다운로드
3. 라즈베리파이 로컬에서 Python 코드 작성
- IoT 연결, MQTT 메시지 수신, Light Sensor Blink 기능이 포함된 코드:
import json
import time
import os
import sys
import threading
import RPi.GPIO as GPIO
import AWSIoTPythonSDK.MQTTLib as AWSIoTPyMQTT
# LED 제어 스레드를 위한 전역 변수
light_thread = None
stop_signal = False
# GPIO 설정
GPIO.setmode(GPIO.BCM)
GPIO.setup(17, GPIO.OUT)
def turn_on_light():
global stop_signal
stop_signal = False
for i in range(1, 5):
if stop_signal:
print("Light turn on loop interrupted")
break
GPIO.output(17, GPIO.HIGH)
print("LED ON")
time.sleep(1)
GPIO.output(17, GPIO.LOW)
print("LED OFF")
time.sleep(1)
def turn_off_light():
global stop_signal
stop_signal = True
GPIO.output(17, GPIO.LOW)
print("Light turned off")
def callback(client, userdata, message):
global light_thread
print("MQTT message received, starting to process...")
payload_str = message.payload.decode('utf-8')
print(f"Message from IoT Core (Raw Payload): {payload_str}")
try:
message_json = json.loads(payload_str)
except json.JSONDecodeError as e:
print(f"Failed to decode JSON: {e}")
return
command_message = message_json.get('message', None)
print(f"Extracted Message: {command_message}")
if command_message == "execute_python_script":
print("Execute command received.")
if light_thread and light_thread.is_alive():
print("Light is already on, waiting for previous command to finish...")
else:
light_thread = threading.Thread(target=turn_on_light)
light_thread.start()
elif command_message == "stop_python_script":
print("Stop command received.")
turn_off_light()
def connect():
ENDPOINT = "[IOT_ENDPOINT]"
CLIENT_ID = "kinesis-iot-core-sensor"
CERTIFICATE = "[CERTIFICATE_PATH]"
PRIVATE_KEY = "[PRIVATE_KEY_PATH]"
ROOT_CA = "[ROOT_CA_PATH]"
TOPIC = "cloudwatch/alarm"
IoTClient = AWSIoTPyMQTT.AWSIoTMQTTClient(CLIENT_ID)
IoTClient.configureEndpoint(ENDPOINT, 8883)
IoTClient.configureCredentials(ROOT_CA, PRIVATE_KEY, CERTIFICATE)
IoTClient.configureOfflinePublishQueueing(-1)
IoTClient.configureDrainingFrequency(2)
IoTClient.configureConnectDisconnectTimeout(50)
IoTClient.configureMQTTOperationTimeout(50)
try:
IoTClient.connect()
print('Begin Subscription to Topic :' + TOPIC)
except:
raise Exception("Failed to connect to Topic " + TOPIC)
while True:
IoTClient.subscribe(TOPIC, 1, callback)
time.sleep(1)
print('Subscription End')
IoTClient.disconnect()
if __name__ == "__main__":
try:
connect()
finally:
GPIO.cleanup() # 프로그램 종료 시 GPIO 리소스 해제
4. 앞서 생성한 Lambda 함수('lambda-to-iotcore2')에 IAM 권한 추가 및 lambda 코드 수정
4-1. IAM 정책 연결하기
- lambda-connect-iot-core를 생성하면 자동으로 생성되는 IAM 역할이 있는데 해당 역할을 찾아서 인라인 정책 권한을 클릭한 후 권한을 추가해준다.
- 정책 이름: iot-publish-role
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"iot:Publish"
],
"Resource": "[IOT_THING_ARN]"
}
]
}
4-2. Lambda 코드 수정하기
import json
import base64
import boto3
# AWS 클라이언트 설정
sns_client = boto3.client('sns')
iot_client = boto3.client('iot-data', region_name='[YOUR_REGION]')
def lambda_handler(event, context):
face_detected = False # 등록된 사람의 얼굴 감지 여부를 추적
for record in event['Records']:
# Kinesis에서 수신한 데이터를 디코딩
raw_data = record['kinesis']['data']
decode_data = base64.b64decode(raw_data).decode('utf-8')
json_record = json.loads(decode_data)
face_search_response = json_record.get('FaceSearchResponse', [])
for face_response in face_search_response:
matched_faces = face_response.get('MatchedFaces', [])
if matched_faces:
for face in matched_faces:
external_image_id = face.get('Face', {}).get('ExternalImageId', "")
# 등록된 사람을 인식한 경우
if external_image_id == "[REGISTERED_IMAGE_NAME]":
face_detected = True
try:
# SNS로 알림 전송
sns_response = sns_client.publish(
TargetArn="[SNS_TOPIC_ARN]",
Message=f"ExternalImageId: {external_image_id} detected",
Subject="Alert: Registered Person Detected"
)
print(f"SNS Alert for registered person '{external_image_id}' sent.")
# IoT Core로 MQTT 메시지 전송 (등록된 사람 -> light 켜기)
iot_client.publish(
topic='cloudwatch/alarm',
qos=0,
payload=json.dumps({"message": "execute_python_script"})
)
print(f"Registered person '{external_image_id}' detected. Light ON message sent.")
except Exception as e:
print(f"Failed to send execute_python_script message or SNS alert: {e}")
break
else:
try:
# 등록되지 않은 사람 감지 -> stop_python_script 메시지 전송
iot_client.publish(
topic='cloudwatch/alarm',
qos=0,
payload=json.dumps({"message": "stop_python_script"})
)
print("Unregistered person detected. Light OFF message sent.")
except Exception as e:
print(f"Failed to send stop_python_script message: {e}")
if not face_detected:
try:
# 등록된 사람이 없을 경우 -> stop_python_script 메시지 전송
iot_client.publish(
topic='cloudwatch/alarm',
qos=0,
payload=json.dumps({"message": "stop_python_script"})
)
print("No registered person detected. Light OFF message sent.")
except Exception as e:
print(f"Failed to send stop_python_script message: {e}")
return {
'statusCode': 200,
'body': json.dumps('Success')
}
4-3. LED 센서 작동 확인하기
- 라즈베리파이에서 LED가 정상적으로 켜지고 꺼지는지 확인
마무리하며
이번 글에서는 AWS의 Rekognition과 IoT Core를 활용하여 실시간 얼굴 인식 시스템을 구현해보았다. 라즈베리파이는 단독으로 큰 AI 모델을 돌리기에는 CPU의 성능과 메모리 제약으로 인해 힘들 수 있지만, AWS의 클라우드 서비스와 연동하면 안정적이고 확장성 있는 시스템을 구축할 수 있다는 것을 확인할 수 있었다. 즉, 라즈베리 파이는 영상 촬영과 전송, 그리고 간단한 IoT 기능을 담당하고, 복잡한 AI 처리는 AWS가 담당하는 방식으로 시스템을 구성하니 훨씬 안정적으로 동작했다.
이런 방식은 개인 프로젝트나 학습용으로 좋은 실습 예제가 될 수 있다. AWS 서비스들을 실제로 연동해보면서 클라우드와 엣지 디바이스의 장단점을 이해하고, 두 기술을 적절히 조합하는 방법을 배울 수 있기 때문이다.
다음에는 또 다른 재미있는 주제로 찾아뵐 것이다. 관심 있는 부분이나 궁금한 점이 있다면 댓글로 남겨주기 바란다. 😊
'Cloud&Infra > AWS' 카테고리의 다른 글
Raspberry Pi와 AWS Service를 활용한 실시간 얼굴 인식 시스템 구축하기 - Part 1: 인프라 설정 (2) | 2024.11.09 |
---|---|
AWS SageMaker Neo와 AWS IoT Greengrass V2를 활용한 라즈베리파이 ML 모델 배포 가이드 (1) | 2024.10.27 |
Amazon Cognito 자격 증명 풀로 S3 파일 접근하기 (1) | 2024.03.13 |
S3 데이터를 RDS(Aurora MySQL) 클러스터로 로드하기 (1) | 2024.02.17 |
AWS QLDB와 Node.js 연결: 단계별 실습 (0) | 2023.05.14 |