Raspberry Pi와 AWS Service를 활용한 실시간 얼굴 인식 시스템 구축하기 - Part 2: Rekognition 기반 얼굴 인식 구현

이전 Part 1에서는 실시간 비디오 스트리밍을 위한 기본 인프라를 구축했다. 이번 파트에서는 Amazon Rekognition을 활용한 실시간 얼굴 인식과 IoT 디바이스 제어 구현에 초점을 맞춘다.

 

Architecture

 

이번 파트의 핵심

  • Rekognition을 통한 얼굴 인식 기능 구현
  • Lambda를 통한 인식 결과 처리
  • SNS와 IoT Core를 활용한 알림 및 디바이스 제어

 

이전 Part 1에서 구축한 스트리밍 기반 위에, 위 아키텍처의 빨간 박스 부분을 구현해보자.

 

Kinesis Video Stream으로 받는 영상으로 Rekognition Collection에 등록된 얼굴인지 Rekognition이 판단 후 Kinesis Data Stream에 전송

1. Faces Collection(Rekognition Collection)만들기

1-1. Rekognition Collection 생성

# Rekognition Collection 생성
aws rekognition create-collection \
    --collection-id "face-detect" \
    --profile default

# 생성된 Collection 확인
aws rekognition list-collections --profile default

 

2. Rekognition Collection에 S3 에서 업로드한 얼굴 이미지 등록하기

# S3의 이미지를 Collection에 등록
aws rekognition index-faces \
    --image '{"S3Object":{"Bucket":"rekognition-test-img3","Name":"yoo1.jpg"}}' \
    --collection-id "face-detect" \
    --max-faces 1 \
    --quality-filter "AUTO" \
    --detection-attributes "ALL" \
    --external-image-id "yoo1.jpg" \
    --profile default

 

 

3. Rekognition Stream Processor 구성

  • video stream arn과 kinesis data stream arn 넣기
aws rekognition create-stream-processor \
    --name rekognition-video-stream-processor \
    --role-arn arn:aws:iam::[YOUR-ACCOUNT-ID]:role/Rekognition-kinesis \
    --settings '{"FaceSearch":{"CollectionId":"face-detect"}}' \
    --input '{"KinesisVideoStream":{"Arn":"[YOUR-VIDEO-STREAM-ARN]"}}' \
    --stream-processor-output '{"KinesisDataStream":{"Arn":"[YOUR-DATA-STREAM-ARN]"}}'

 

 

4.  트러블 슈팅

  • Rekognirion stream processor를 실행시키던 중 다음과 같은 에러를 만나게 되었다. 

4-1. Stream Processor 생성 시 IAM 권한 오류

An error occurred (AccessDeniedException) when calling the CreateStreamProcessor operation: 
User: [IAM USER ARN] is not authorized to perform: iam:PassRole on resource: 
[IAM ROLE ARN] because no identity-based policy allows the iam:PassRole action

4-2. 문제 원인

  • iam:PassRole 권한 누락

4-3. 해결 방법

  • IAM 정책에 다음 권한을 추가:
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "iam:PassRole"
            ],
            "Resource": [
                "[IAM ROLE ARN]"
            ]
        }
    ]
}

 

5. Rekognition Stream Processor 실행 및 모니터링

5-1. 스트림 프로세서 시작

aws rekognition start-stream-processor \
    --name rekognition-video-stream-processor

스트림 프로세서 결과값 예

{
	"SessionId": "0f394373a-43f5-0e3c-932e-345c6c5dgfad"
}

참고: 스트림 프로세서 종료 및 프로세서 세부정보 확인

# 스트림 프로세서 종료
aws rekognition stop-stream-processor \
    --name rekognition-video-stream-processor

# 프로세서 상태 확인
aws rekognition describe-stream-processor \
    --name rekognition-video-stream-processor

 

 

6. Kinesis Data Stream 모니터링

  • 다음 Python 스크립트로 실시간 데이터 모니터링이 가능하다.
import boto3
import time

def monitor_stream():
    # Kinesis 클라이언트 초기화
    kinesis_client = boto3.client('kinesis', region_name='[YOUR_REGION]')
    stream_name = '[YOUR_STREAM_NAME]'

    # 샤드 ID 조회
    response = kinesis_client.describe_stream(StreamName=stream_name)
    shard_id = response['StreamDescription']['Shards'][0]['ShardId']

    # 초기 샤드 이터레이터 획득
    shard_iterator = kinesis_client.get_shard_iterator(
        StreamName=stream_name,
        ShardId=shard_id,
        ShardIteratorType='LATEST'
    )['ShardIterator']

    while True:
        records_response = kinesis_client.get_records(
            ShardIterator=shard_iterator, 
            Limit=10
        )
        
        for record in records_response['Records']:
            print(record['Data'].decode('utf-8'))
            
        shard_iterator = records_response['NextShardIterator']
        time.sleep(1)

if __name__ == "__main__":
    monitor_stream()

6번 실행의 결과 값

6-1. 비디오에 얼굴이 감지되지 않은 경우

{
    "InputInformation": {
        "KinesisVideo": {
            "StreamArn": "[KINESIS_VIDEO_STREAM_ARN]",
            "FragmentNumber": "[FRAGMENT_NUMBER]",
            "ServerTimestamp": 1.725433098685E9,
            "ProducerTimestamp": 1.725433095115E9,
            "FrameOffsetInSeconds": 0.0
        }
    },
    "StreamProcessorInformation": {"Status": "RUNNING"},
    "FaceSearchResponse": []
}

 

6-2. 비디오에 사전 등록된 얼굴을 감지했을 경우

{
    "InputInformation": {"KinesisVideo": {...}},
    "StreamProcessorInformation": {"Status": "RUNNING"},
    "FaceSearchResponse": [{
        "DetectedFace": {
            "BoundingBox": {...},
            "Confidence": 99.99053,
            "Landmarks": [...],
            "Pose": {...},
            "Quality": {...}
        },
        "MatchedFaces": [{
            "Similarity": 99.74969,
            "Face": {
                "FaceId": "[FACE_ID]",
                "ExternalImageId": "[IMAGE_NAME]",
                "Confidence": 99.9997
            }
        }]
    }]
}

6-3. 비디오에 등록되지 않은 얼굴을 감지했을 경우

{
    "InputInformation": {
        "KinesisVideo": {
            "StreamArn": "[KINESIS_VIDEO_STREAM_ARN]",
            "FragmentNumber": "[FRAGMENT_NUMBER]",
            "ServerTimestamp": 1.725509264843E9,
            "ProducerTimestamp": 1.72550926229E9,
            "FrameOffsetInSeconds": 0.9990000128746033
        }
    },
    "StreamProcessorInformation": {"Status": "RUNNING"},
    "FaceSearchResponse": [{
        "DetectedFace": {
            "BoundingBox": {
                "Height": 0.6725677,
                "Width": 0.31205362,
                "Left": 0.1554201,
                "Top": 0.2967734
            },
            "Confidence": 99.999344,
            "Landmarks": [...],
            "Pose": {
                "Pitch": 0.22625916,
                "Roll": -3.3147352,
                "Yaw": -0.2569844
            },
            "Quality": {
                "Brightness": 98.61881,
                "Sharpness": 96.61495
            }
        },
        "MatchedFaces": []
    }]
}

 

알림 시스템 구축 - Kinesis Data Stream에서 SNS로 알림을 보내는 lambda함수 생성

1. SNS 생성 및 (이메일) 구독 생성 후 이메일 확인하기

1-1. SNS 주제 생성

  • 주제 이름: sns-noti
  • 주제 세부정보는 "표준"으로 선택한 뒤 "주제생성"을 클릭

1-2. SNS 구독 생성

  • 생성한 주제(sns-noti)의 세부정보 페이지에서 "구독생성"을 클릭
  • 구독생성 페이지에서 프로토콜은 "이메일"로, 엔드포인트는 "알림을 받을 이메일 주소"를 입력

1-3. 이메일 확인하기

  • 구독 확인 이메일에서 "Confirm subscription"을 클릭.

 

2. Lambda 함수 설정

2-1. Lambda 함수 생성하기

  • 함수 이름: lambda-to-iotcore2
  • 런타임: python

2-2. IAM 역할 생성 및 권한 추가

  • iot-core를 생성하면 자동으로 생성되는 IAM 역할이 있는데 해당 역할을 찾아서 권한추가의 인라인 정책 생성을 클릭
  • 정책 이름: KinesisStreamAccessForLambda

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:GetRecords",
                "kinesis:GetShardIterator",
                "kinesis:DescribeStream",
                "kinesis:DescribeStreamSummary",
                "kinesis:ListShards",
                "kinesis:ListStreams"
            ],
            "Resource": "[KINESIS_STREAM_ARN]"
        }
    ]
}

 

2-3. Lambda 트리거 추가

  • 트리거 구성으로 kinesis를 클릭
  • kinesis 스트림으로 face_detect를 선택

2-4. Lambda 대상 추가

  • 소스: "비동기식 호출"
  • 조건: "성공 시"
  • 대상유형: "SNS 주제"
  • 대상: "sns-noti"를 선택

2-5. Lambda에 코드 작성 후 Deploy

import json
import base64
import boto3

def lambda_handler(event, context):
    sns_client = boto3.client('sns')
    
    for record in event['Records']:
        raw_data = record['kinesis']['data']
        decode_data = base64.b64decode(raw_data).decode('utf-8')
        json_record = json.loads(decode_data)
        
        face_search_response = json_record.get('FaceSearchResponse', [])
        for face_response in face_search_response:
            matched_faces = face_response.get('MatchedFaces', [])
            for face in matched_faces:
                external_image_id = face.get('Face', {}).get('ExternalImageId', "")
                
                if external_image_id == "yoo1.jpg":
                    print("Sending to SNS topic")
                    response = sns_client.publish(
                        TargetArn="[SNS_TOPIC_ARN]",
                        Message=f"ExternalImageId: {external_image_id} detected",
                        Subject="Alert: Registered Person Detected"
                    )
                    print("SNS response:", response)
                    return  # 감지된 후 함수를 완전히 종료

 

3. Email 확인하기

  • 설정된 이메일 주소로 알림이 전송되는지 확인

 

 

앞에서 생성한 Lambda 함수에 Data Stream와 IoT Core를 연결해서 라즈베리 파이로 MQTT 통신을 보내는 기능 추가하기

1. AWS IoT Core 사물 생성

  • 사물 이름: kinesis-iot-core-sensor로 설정하여 생성

 

2. AWS IoT Core 정책 생성 및 인증서에 정책 연결

  • 정책 이름: kinesis-sensor-connect-iot-core
  • 정책 작업: iot:Connect, iot:Receive, iot:Subscribe 권한 추가

  • 인증서 생성 및 연결 후 인증서와 키 다운로드

 

3. 라즈베리파이 로컬에서 Python 코드 작성

  • IoT 연결, MQTT 메시지 수신, Light Sensor Blink 기능이 포함된 코드:
import json
import time
import os
import sys
import threading
import RPi.GPIO as GPIO
import AWSIoTPythonSDK.MQTTLib as AWSIoTPyMQTT

# LED 제어 스레드를 위한 전역 변수
light_thread = None
stop_signal = False

# GPIO 설정
GPIO.setmode(GPIO.BCM)
GPIO.setup(17, GPIO.OUT)

def turn_on_light():
    global stop_signal
    stop_signal = False
    for i in range(1, 5):
        if stop_signal:
            print("Light turn on loop interrupted")
            break
        GPIO.output(17, GPIO.HIGH)
        print("LED ON")
        time.sleep(1)
        GPIO.output(17, GPIO.LOW)
        print("LED OFF")
        time.sleep(1)

def turn_off_light():
    global stop_signal
    stop_signal = True
    GPIO.output(17, GPIO.LOW)
    print("Light turned off")

def callback(client, userdata, message):
    global light_thread

    print("MQTT message received, starting to process...")
    
    payload_str = message.payload.decode('utf-8')
    print(f"Message from IoT Core (Raw Payload): {payload_str}")
    
    try:
        message_json = json.loads(payload_str)
    except json.JSONDecodeError as e:
        print(f"Failed to decode JSON: {e}")
        return
    
    command_message = message_json.get('message', None)
    print(f"Extracted Message: {command_message}")

    if command_message == "execute_python_script":
        print("Execute command received.")
        if light_thread and light_thread.is_alive():
            print("Light is already on, waiting for previous command to finish...")
        else:
            light_thread = threading.Thread(target=turn_on_light)
            light_thread.start()

    elif command_message == "stop_python_script":
        print("Stop command received.")
        turn_off_light()

def connect():
    ENDPOINT = "[IOT_ENDPOINT]"
    CLIENT_ID = "kinesis-iot-core-sensor"
    CERTIFICATE = "[CERTIFICATE_PATH]"
    PRIVATE_KEY = "[PRIVATE_KEY_PATH]"
    ROOT_CA = "[ROOT_CA_PATH]"
    TOPIC = "cloudwatch/alarm"

    IoTClient = AWSIoTPyMQTT.AWSIoTMQTTClient(CLIENT_ID)
    IoTClient.configureEndpoint(ENDPOINT, 8883)
    IoTClient.configureCredentials(ROOT_CA, PRIVATE_KEY, CERTIFICATE)
    IoTClient.configureOfflinePublishQueueing(-1)
    IoTClient.configureDrainingFrequency(2)
    IoTClient.configureConnectDisconnectTimeout(50)
    IoTClient.configureMQTTOperationTimeout(50)

    try:
        IoTClient.connect()
        print('Begin Subscription to Topic :' + TOPIC)
    except:
        raise Exception("Failed to connect to Topic " + TOPIC)

    while True:
        IoTClient.subscribe(TOPIC, 1, callback)
        time.sleep(1)

    print('Subscription End')
    IoTClient.disconnect()

if __name__ == "__main__":
    try:
        connect()
    finally:
        GPIO.cleanup() # 프로그램 종료 시 GPIO 리소스 해제

 

4. 앞서 생성한 Lambda 함수('lambda-to-iotcore2')에 IAM 권한 추가 및  lambda 코드 수정

4-1. IAM 정책 연결하기

  • lambda-connect-iot-core를 생성하면 자동으로 생성되는 IAM 역할이 있는데 해당 역할을 찾아서 인라인 정책 권한을 클릭한 후 권한을 추가해준다.
  • 정책 이름: iot-publish-role
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "iot:Publish"
            ],
            "Resource": "[IOT_THING_ARN]"
        }
    ]
}

 

4-2. Lambda 코드 수정하기

import json
import base64
import boto3

# AWS 클라이언트 설정
sns_client = boto3.client('sns')
iot_client = boto3.client('iot-data', region_name='[YOUR_REGION]')

def lambda_handler(event, context):
    face_detected = False  # 등록된 사람의 얼굴 감지 여부를 추적

    for record in event['Records']:
        # Kinesis에서 수신한 데이터를 디코딩
        raw_data = record['kinesis']['data']
        decode_data = base64.b64decode(raw_data).decode('utf-8')
        json_record = json.loads(decode_data)
        
        face_search_response = json_record.get('FaceSearchResponse', [])
        
        for face_response in face_search_response:
            matched_faces = face_response.get('MatchedFaces', [])
            
            if matched_faces:
                for face in matched_faces:
                    external_image_id = face.get('Face', {}).get('ExternalImageId', "")
                    
                    # 등록된 사람을 인식한 경우
                    if external_image_id == "[REGISTERED_IMAGE_NAME]":
                        face_detected = True
                        
                        try:
                            # SNS로 알림 전송
                            sns_response = sns_client.publish(
                                TargetArn="[SNS_TOPIC_ARN]",
                                Message=f"ExternalImageId: {external_image_id} detected",
                                Subject="Alert: Registered Person Detected"
                            )
                            print(f"SNS Alert for registered person '{external_image_id}' sent.")
                            
                            # IoT Core로 MQTT 메시지 전송 (등록된 사람 -> light 켜기)
                            iot_client.publish(
                                topic='cloudwatch/alarm',
                                qos=0,
                                payload=json.dumps({"message": "execute_python_script"})
                            )
                            print(f"Registered person '{external_image_id}' detected. Light ON message sent.")
                        except Exception as e:
                            print(f"Failed to send execute_python_script message or SNS alert: {e}")
                        
                        break
            else:
                try:
                    # 등록되지 않은 사람 감지 -> stop_python_script 메시지 전송
                    iot_client.publish(
                        topic='cloudwatch/alarm',
                        qos=0,
                        payload=json.dumps({"message": "stop_python_script"})
                    )
                    print("Unregistered person detected. Light OFF message sent.")
                except Exception as e:
                    print(f"Failed to send stop_python_script message: {e}")
    
    if not face_detected:
        try:
            # 등록된 사람이 없을 경우 -> stop_python_script 메시지 전송
            iot_client.publish(
                topic='cloudwatch/alarm',
                qos=0,
                payload=json.dumps({"message": "stop_python_script"})
            )
            print("No registered person detected. Light OFF message sent.")
        except Exception as e:
            print(f"Failed to send stop_python_script message: {e}")
    
    return {
        'statusCode': 200,
        'body': json.dumps('Success')
    }

4-3. LED 센서 작동 확인하기

  • 라즈베리파이에서 LED가 정상적으로 켜지고 꺼지는지 확인

 

마무리하며

이번 글에서는 AWS의 Rekognition과 IoT Core를 활용하여 실시간 얼굴 인식 시스템을 구현해보았다. 라즈베리파이는 단독으로 큰 AI 모델을 돌리기에는 CPU의 성능과 메모리 제약으로 인해 힘들 수 있지만, AWS의 클라우드 서비스와 연동하면 안정적이고 확장성 있는 시스템을 구축할 수 있다는 것을 확인할 수 있었다. 즉, 라즈베리 파이는 영상 촬영과 전송, 그리고 간단한 IoT 기능을 담당하고, 복잡한 AI 처리는 AWS가 담당하는 방식으로 시스템을 구성하니 훨씬 안정적으로 동작했다.

이런 방식은 개인 프로젝트나 학습용으로 좋은 실습 예제가 될 수 있다. AWS 서비스들을 실제로 연동해보면서 클라우드와 엣지 디바이스의 장단점을 이해하고, 두 기술을 적절히 조합하는 방법을 배울 수 있기 때문이다. 

다음에는 또 다른 재미있는 주제로 찾아뵐 것이다. 관심 있는 부분이나 궁금한 점이 있다면 댓글로 남겨주기 바란다. 😊