AWS SageMaker 기반 저조도 환경에서 유재석 감지 시 자동 LED 제어 시스템 구축 part2: 데이터 처리

 

이전 part1에서는 프로젝트 개요 및 데이터 수집을 진행했다. 이번 파트에서는 Lambda 함수와 Glue를 사용하여 데이터 처리를 구현하는 것에 초점을 맞춘다. 

 

Architecture

 

이번 파트의 핵심

  • Lambda 함수 개발 및 데이터 병합
  • AWS Glue ETL을 통한 데이터 통합 및 전처리

 

3. Lambda 함수 개발 및 데이터 병합

  • 함수 생성
    • 새로작성
    • 함수 이름 지정
    • 런타임 : Python 3.9
  • 트리거 추가
    • S3 버킷에 새 이미지 업로드 시 Lambda 함수 자동 호출
  • Lambda 코드 작성
import boto3
import csv
import json
import urllib.parse
import os
from datetime import datetime, timedelta
from io import BytesIO, StringIO
from PIL import Image
import decimal

# AWS 클라이언트 설정
s3_client = boto3.client("s3")
rekognition_client = boto3.client("rekognition")
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('LightSensorData')

# Rekognition Collection ID
COLLECTION_ID = "face-detect"

# S3 저장소 정보
S3_BUCKET_NAME = "[Your S3 Bucket Name]"
S3_PROCESSED_PREFIX = "processed-images/"  # 최종 CSV 저장 위치

# Helper class for DynamoDB decimal conversion
class DecimalEncoder(json.JSONEncoder):
    def default(self, o):
        if isinstance(o, decimal.Decimal):
            return float(o) if o % 1 else int(o)
        return super(DecimalEncoder, self).default(o)


def get_closest_light_data(target_time):
    """ DynamoDB에서 대상 시간과 가장 가까운 조도 데이터 조회 """
    try:
        # 타임스탬프가 문자열이 아니면 ISO 포맷으로 변환
        if not isinstance(target_time, str):
            target_time = target_time.isoformat()

        # 대상 시간 전후 30초 범위의 데이터 조회를 위한 시간 계산
        # UTC 타임존 정보가 있는 경우 제거하여 naive datetime으로 변환
        if '+00:00' in target_time or 'Z' in target_time:
            target_dt = datetime.fromisoformat(target_time.replace('Z', '+00:00'))
            # offset-aware를 offset-naive로 변환
            target_dt = target_dt.replace(tzinfo=None)
        else:
            target_dt = datetime.fromisoformat(target_time)

        start_time = (target_dt - timedelta(seconds=30)).isoformat()
        end_time = (target_dt + timedelta(seconds=30)).isoformat()

        print(f"🔍 다음 시간 범위의 조도 데이터 검색: {start_time} ~ {end_time}")

        # DynamoDB 쿼리 - 대상 시간 범위의 데이터 조회
        response = table.scan(
            FilterExpression="#ts BETWEEN :start_time AND :end_time",
            ExpressionAttributeNames={
                '#ts': 'timestamp'  # 예약어 timestamp에 대한 별칭 사용
            },
            ExpressionAttributeValues={
                ':start_time': start_time,
                ':end_time': end_time
            }
        )

        items = response.get('Items', [])

        # 결과가 없으면 가장 최근 데이터 1개 조회
        if not items:
            print("⚠ 대상 시간 범위 내 데이터 없음, 최근 데이터 조회")
            # 최근 10개 항목 스캔하여 가장 최신 항목 선택
            response = table.scan(
                Limit=10,
                ProjectionExpression="lightValue, #ts",
                ExpressionAttributeNames={'#ts': 'timestamp'}
            )

            items = sorted(response.get('Items', []),
                           key=lambda x: x['timestamp'],
                           reverse=True)[:1]

            if not items:
                print("❌ 조도 데이터를 찾을 수 없음, 기본값 사용")
                return 0, "N/A"

            print(f"⚠ 가장 최근 데이터 사용 ({items[0]['timestamp']})")
            return items[0]['lightValue'], items[0]['timestamp']

        # 타임스탬프와 가장 가까운 항목 찾기
        closest_item = min(items,
                           key=lambda x: abs(datetime.fromisoformat(x['timestamp']) - target_dt))

        time_diff = abs(datetime.fromisoformat(closest_item['timestamp']) - target_dt).total_seconds()
        print(f"✅ 가장 가까운 조도 데이터 찾음: {closest_item['timestamp']} (시간차: {time_diff:.1f}초)")

        return closest_item['lightValue'], closest_item['timestamp']

    except Exception as e:
        print(f"❌ DynamoDB에서 조도 데이터 조회 중 오류: {e}")
        return 0, "N/A"


def convert_to_baseline_jpeg(image_bytes):
    """ 이미지를 Rekognition에서 지원하는 Baseline JPEG로 변환 """
    try:
        img = Image.open(BytesIO(image_bytes))
        if img.format != "JPEG":
            print("⚠ 이미지가 JPEG 형식이 아님 → 변환 중...")
            output = BytesIO()
            img.convert("RGB").save(output, format="JPEG", quality=95)
            return output.getvalue()
        return image_bytes
    except Exception as e:
        print(f"❌ 이미지 변환 실패: {e}")
        return None


def save_results_to_s3(image_key, label, light_value, light_timestamp):
    """ Rekognition & 조도 데이터 병합 후 CSV 파일로 만들어 S3에 저장 """
    try:
        csv_buffer = StringIO()
        csv_writer = csv.writer(csv_buffer)

        # CSV 헤더 추가
        csv_writer.writerow(["image name", "s3 path", "yoo", "light-value", "light-timestamp"])

        # S3 경로 포함하여 결과 추가
        image_s3_path = f"s3://{S3_BUCKET_NAME}/{image_key}"
        image_name = os.path.splitext(image_key.split("/")[-1])[0]  # 확장자 제거

        # Decimal 타입인 경우 변환
        if isinstance(light_value, decimal.Decimal):
            light_value = float(light_value)

        csv_writer.writerow([image_name, image_s3_path, label, light_value, light_timestamp])

        # S3 저장 경로
        csv_filename = f"{image_name}.csv"  # 확장자 제거된 파일명 사용
        csv_s3_key = f"{S3_PROCESSED_PREFIX}{csv_filename}"

        # CSV 파일을 S3에 업로드
        s3_client.put_object(
            Bucket=S3_BUCKET_NAME,
            Key=csv_s3_key,
            Body=csv_buffer.getvalue(),
            ContentType="text/csv",
        )
        print(f"✅ CSV 저장 완료: {csv_s3_key}")

    except Exception as e:
        print(f"❌ CSV 저장 실패: {e}")


def lambda_handler(event, context):
    """S3에 업로드된 이미지를 Rekognition으로 분석 + 이미지 타임스탬프와 가장 가까운 조도 데이터를 찾아 CSV로 저장"""
    print(f"📥 Lambda Received Event: {json.dumps(event, indent=4)}")

    # 'Records' 키가 없는 경우 처리
    if 'Records' not in event:
        print("⚠ 'Records' 키가 없는 이벤트입니다. 테스트 또는 직접 호출된 이벤트일 수 있습니다.")
        return {
            "statusCode": 400,
            "body": "Invalid event structure. Expected S3 event with 'Records' key."
        }

    for record in event["Records"]:
        bucket_name = record["s3"]["bucket"]["name"]
        image_key = urllib.parse.unquote_plus(record["s3"]["object"]["key"])

        # 이미지 파일만 처리하도록 필터 추가
        if not image_key.lower().endswith((".jpg", ".jpeg", ".png")):
            print(f"⚠ 이미지 파일이 아님, Rekognition 분석 생략: {image_key}")
            return {"statusCode": 200, "body": "Skipped non-image file"}

        print(f"📂 감지된 파일: s3://{bucket_name}/{image_key}")

        try:
            # S3 객체 메타데이터에서 마지막 수정 시간 가져오기
            response = s3_client.head_object(Bucket=bucket_name, Key=image_key)
            last_modified = response['LastModified']

            print(f"📅 이미지 업로드 시간: {last_modified}")

            # 이미지 업로드 시간과 가장 가까운 조도 데이터 가져오기
            light_value, light_timestamp = get_closest_light_data(last_modified)
            print(f"💡 조도 센서 값 확인: {light_value}, 기록 시간: {light_timestamp}")

            # S3에서 이미지 다운로드
            response = s3_client.get_object(Bucket=bucket_name, Key=image_key)
            image_bytes = response["Body"].read()

            # 이미지가 JPEG가 아니라면 변환
            processed_image = convert_to_baseline_jpeg(image_bytes)
            if processed_image is None:
                return {"statusCode": 500, "body": "Image conversion failed"}

            # Rekognition으로 얼굴 검색
            rekognition_response = rekognition_client.search_faces_by_image(
                CollectionId=COLLECTION_ID,
                Image={"Bytes": processed_image},
                MaxFaces=1,
                FaceMatchThreshold=95,
            )

            is_yoo_detected = len(rekognition_response["FaceMatches"]) > 0
            label = "Y" if is_yoo_detected else "N"
            print(f"✅ Rekognition 결과: {label}")

            # 결과를 CSV로 저장
            save_results_to_s3(image_key, label, light_value, light_timestamp)

        except rekognition_client.exceptions.InvalidParameterException as e:
            # 얼굴 검색 실패 (얼굴이 이미지에 없는 경우)
            print(f"⚠ Rekognition 얼굴 검색 실패 (얼굴 없음): {e}")

            # 가장 최근 조도 데이터 가져오기
            light_value, light_timestamp = get_closest_light_data(datetime.utcnow())

            # 얼굴이 없으므로 "N" 라벨로 저장
            save_results_to_s3(image_key, "N", light_value, light_timestamp)

        except Exception as e:
            print(f"❌ 처리 중 오류 발생: {e}")
            return {"statusCode": 500, "body": json.dumps({"error": str(e)})}

    return {"statusCode": 200, "body": json.dumps({"message": "Processing complete"})}

 

    • 시간 기반 조도 데이터 조회 함수
      • 이미지 업로드 시간(LastModified) 전후 30초 범위 내 DynamoDB에서 가장 가까운 조도 데이터 검색
      • timestamp 예약어 처리를 위한 ExpressionAttributeNames 사용
      • 데이터가 없을 경우 가장 최근 조도 데이터 사용
        • 속성: lightValue (Decimal 타입), ttl (24시간 자동 만료)
        • 예약어 처리를 위한 ExpressionAttributeNames 사용
    • 이미지가 JPEG 형식이 아닌 경우 JPEG로 변환 함수
    • 데이터 병합 및 CSV 형식으로 S3에 저장하는 함수
      • 데이터 병합 결과를 CSV 파일로 변환하고
      • S3의 processed-images/ 경로에 저장
    • Lambda 핸들러 함수 
      • S3에서 이미지 가져오기
      • JPEG 형식의 이미지가 아니라면 변환함수 실행
      • Rekognition으로 얼굴 인식
        • Lambda 함수에서 Rekognition의 search_faces_by_image API 호출
        • 사전에 등록된 유재석 얼굴 컬렉션(face-detect)과 비교 95% 이상의 일치도를 기준으로 유재석 여부 판단 검색 결과에 FaceMatches가 있으면 "Y", 없으면 "N"으로 레이블
      • 데이터 병합 및 CSV 형식으로 S3에 저장하는 함수 실행
  • 결과: Lambda 함수를 통해 총 405개의 데이터 수집 완료
    • 저조도 + 유재석 감지 케이스
    • 저조도 + 유재석 미감지 케이스
    • 정상 조도 + 유재석 감지 케이스
    • 정상 조도 + 유재석 미감지 케이스

 

4. AWS Glue ETL을 통한 데이터 통합 및 전처리

  • ETL 작업 개발
    • Script Editor를 통해 Script 생성
    • Spark 스크립트 작성:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import lit, current_timestamp, col, when

# 초기화
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

# 소스 데이터 경로와 대상 경로 설정
source_path = "[Your S3 Bucket Name]/processed-images/"
target_path = "[Your S3 Bucket Name]/merged-data/"

# 모든 CSV 파일을 읽어 Dynamic Frame으로 변환
datasource = glueContext.create_dynamic_frame.from_options(
    connection_type="s3",
    connection_options={
        "paths": [source_path],
        "recurse": True,
        "groupFiles": "inPartition",
        "groupSize": 1048576  # 1MB 기준으로 그룹화
    },
    format="csv",
    format_options={
        "withHeader": True,
        "separator": ","
    }
)

print(f"원본 데이터 건수: {datasource.count()}")
print("스키마 정보:")
datasource.printSchema()

# DataFrame으로 변환
df = datasource.toDF()

# 중복 제거 (동일한 이미지가 여러 번 처리된 경우)
distinct_data = df.dropDuplicates(["image name"])

# 저조도 판별 열 추가 (예: 50% 미만을 저조도로 정의)
df_with_flags = distinct_data.withColumn(
    "is_low_light",
    when(col("`light-value`") < 50, True).otherwise(False)
)

# 저조도+유재석 감지 열 추가
df_with_target = df_with_flags.withColumn(
    "is_target_in_low_light",
    when((col("is_low_light") == True) & (col("yoo") == "Y"), True).otherwise(False)
)

# 처리 시간 추가
df_final = df_with_target.withColumn("processed_at", current_timestamp())

# 다시 DynamicFrame으로 변환
processed_data = DynamicFrame.fromDF(df_final, glueContext, "processed_data")

# 현재 시간 기반 파일명 생성
current_time = current_timestamp().cast("string")
timestamp = current_time._jc.toString().replace(" ", "_").replace(":", "-").replace(".", "-")
output_filename = f"merged_light_sensor_data_{timestamp}"

# 단일 파일로 저장
processed_data_repartitioned = processed_data.toDF().repartition(1)
processed_data_single = DynamicFrame.fromDF(processed_data_repartitioned, glueContext, "processed_data_single")

# 데이터 저장
glueContext.write_dynamic_frame.from_options(
    frame=processed_data_single,
    connection_type="s3",
    connection_options={
        "path": target_path,
        "partitionKeys": []
    },
    format="csv",
    format_options={
        "writeHeader": True,
        "separator": ","
    }
)

print(f"처리 완료: {processed_data.count()} 건의 데이터가 {target_path} 경로에 저장되었습니다.")

job.commit()
  • 핵심 데이터 변환 작업:
    • 중복 데이터 제거: dropDuplicates() 함수로 동일 이미지 처리 기록 중복 제거
    • 저조도 판별 변수 생성: 조도 값 50 미만을 기준으로 'is_low_light' 파생 변수 생성
    • 목표 변수 생성: 저조도 + 유재석 감지 조합으로 'is_target_in_low_light' 변수 생성
    • 처리 시간 기록: 데이터 처리 시점을 'processed_at' 열에 저장
    • 총 405개의 CSV 파일을 하나의 데이터셋으로 병합하여 [Your S3 Bucket Name]/merged-data/ 에 저장
  • IAM Role 연결
    • 다음 권한 정책이 들어가 있는 IAM Role 연결
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:PutObject",
                "s3:DeleteObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::[Your S3 Bucket Name]/*",
                "arn:aws:s3:::[Your S3 Bucket Name]"
            ]
        }
    ]
}