AI 탐구노트

CCTV 영상에서의 대기자 수 및 대기 시간 측정 본문

DIY 테스트

CCTV 영상에서의 대기자 수 및 대기 시간 측정

42morrow 2025. 1. 19. 15:15

 

 

최근 들어 인공지능 기술은 일상생활 속 다양한 문제를 해결하기 위해 활발히 도입되고 있습니다. 그중 AI 영상 분석 기술은 높은 관심을 받고 있는데, 특히 CCTV를 활용한 분석은 기존의 데이터 활용 방식에 비해 더 직관적이고 실시간 대응이 가능하다는 점에서 주목받고 있습니다. 이런 기술은 단순히 감시 용도를 넘어, 매장의 운영 효율성을 높이거나 고객 경험을 향상시키는 데도 적용될 수 있습니다.

 

예를 들어 매장을 방문할 때 계산대에서 길게 줄을 서 기다린 경험은 누구나 한 번쯤 있을 것입니다. 이는 고객 만족도를 떨어뜨리는 주요 요인 중 하나로 꼽힙니다. 매장 운영자는 대기열을 줄이고 효율적으로 운영하기 위해 대기자 수와 대기시간을 분석할 수 있는 새로운 도구를 필요로 하고 있습니다. 바로 여기서 AI 영상 분석 기술이 큰 역할을 할 수 있죠.

 

이 글에서는 AI 영상 분석을 통해 마트 등에서 볼 수 있는 대기자 별 대기시간을 측정하는 코드를 하나 만들어 테스트 해 보도록 하겠습니다. 

 

테스트 이미지

테스트에 사용한 영상은 다음 링크에서 가져왔습니다. 이 글을 빌려 감사 드립니다!

사진 : 테스트 영상 속의 한 장면. 대형마트에서 줄 서 있는 사람들의 예시

 

 

코드 요구사항

이번 코드에서는 영상 속에서 줄 서 있는 대기자 별 ID 부여와 대기 시간 표시를 하는 python 프로그램을 만들어 볼 겁니다. 코드 생성을 위해 다음과 같은 요구사항을 이용했습니다.

  • python, opencv, yolov11 을 이용합니다.
  • 사람에 대한 감지를 하고 tracking을 통해 대기자 별 ID를 부여합니다.
  • 대기열(줄)의 지정은 사용자가 직접 화면 상에서 할 수 있도록 합니다. 
  • 프로그램이 시작되면 대기열(줄) 영역과 대기열의 방향을 설정한 파일(queue_data.json)을 찾고 있으면 이걸 로드하고, 없다면 영상의 첫 프레임 이미지에서 직접 설정하는 과정을 거칩니다. 
    • 대기열 영역 설정 : 마우스 클릭을 이용해 설정 후 'x' 키로 완료합니다.
    • 대기열 방향 설정 : 영역 설정 완료 후, 마우스 클릭 (맨 앞 - 중간(복수 가능)-맨 끝)을 하고 'w'로 저장합니다. 
    • 대기열은 복수로 설정 가능하도록 합니다.
  • 설정 파일은 다음과 같이 json 형태를 띄도록 합니다. (예시로 제시)
{
    "queues": [
        {
            "qid": 1,
            "area": [[2, 388], [1, 856], [1904, 860], [1902, 429], [4, 390]],
            "direction": [[1862, 606], [1027, 609], [53, 592]]
        },
    ]
}

 

 

테스트 코드

처음에는 이 작업을 ChatGPT에게 시켰었는데 대기열(줄)의 방향 부분에서 많이 헤매는 것 같았습니다. 제가 요구사항 전달을 잘못한 탓도 있겠지만 아무래도 이해도가 떨어진다고 생각이 되는 겁니다. 그래서, Claude에게 한번 시켜봤더니 역시 한번에 요구사항을 잘 알아듣고 해결책을 제시하더군요. 이럴 땐 유료 구독을 Claude로 바꿔야 하나 하는 생각도 하게 됩니다. ^^; 참고로 생성된 코드를 보면 아시겠지만, 감지 모델은 yolov11n을 이용했고 개체 트래킹을 위해 byterack을 이용했습니다.  

import cv2
import numpy as np
from ultralytics import YOLO
import json
import time
from collections import defaultdict
import os

class QueueMonitor:
    def __init__(self, video_source, model_path='yolov11n.pt', use_gpu=True, frame_interval=2):
        self.device = 'cuda' if use_gpu and cv2.cuda.getCudaEnabledDeviceCount() > 0 else 'cpu'
        try:
            self.model = YOLO(model_path)
            self.model.to('cuda')
        except Exception as e:
            print(self.device, e)
            raise FileNotFoundError(f"YOLO 모델 파일을 찾을 수 없습니다: {model_path}")

        self.queue_data = {"queues": []}
        self.tracking_history = defaultdict(lambda: {
            "entry_time": None, 
            "current_pos": None,
            "queue_position": None,
            "display_id": None
        })
        self.json_file = "queue_data.json"
        self.video_source = video_source
        self.frame_interval = frame_interval
        self.frame_count = 0
        self.current_queue_id = 1
        self.debug_mode = False

    def calculate_point_to_line_distance(self, point, line_start, line_end):
        """점과 선분 사이의 거리를 계산합니다."""
        if np.array_equal(line_start, line_end):
            return np.linalg.norm(point - line_start)
        
        line_vec = line_end - line_start
        point_vec = point - line_start
        line_len = np.linalg.norm(line_vec)
        line_unit_vec = line_vec / line_len
        projection_length = np.dot(point_vec, line_unit_vec)
        
        if projection_length < 0:
            return np.linalg.norm(point - line_start)
        elif projection_length > line_len:
            return np.linalg.norm(point - line_end)
        else:
            projection = line_start + line_unit_vec * projection_length
            return np.linalg.norm(point - projection)

    def calculate_position_along_direction(self, point, direction_points):
        """방향선을 따라 상대적 위치를 계산합니다."""
        min_dist = float('inf')
        position = 0
        total_length = 0
        
        for i in range(len(direction_points) - 1):
            total_length += np.linalg.norm(
                np.array(direction_points[i+1]) - np.array(direction_points[i]))
        
        current_length = 0
        for i in range(len(direction_points) - 1):
            start = np.array(direction_points[i])
            end = np.array(direction_points[i+1])
            segment_length = np.linalg.norm(end - start)
            
            dist = self.calculate_point_to_line_distance(np.array(point), start, end)
            if dist < min_dist:
                min_dist = dist
                point_vec = np.array(point) - start
                line_vec = end - start
                projection = np.dot(point_vec, line_vec) / np.dot(line_vec, line_vec)
                projection = max(0, min(1, projection))
                position = (current_length + projection * segment_length) / total_length
            
            current_length += segment_length
        
        return position

    def load_or_create_queue_areas(self):
        """JSON 파일에서 대기열 영역을 불러오거나 새로 정의합니다."""
        if os.path.exists(self.json_file):
            with open(self.json_file, 'r') as f:
                self.queue_data = json.load(f)
        else:
            print("대기열 영역 데이터가 없습니다. 새로 정의합니다.")
            self.define_queue_areas_from_first_frame()

    def define_queue_areas_from_first_frame(self):
        """첫 프레임을 사용하여 대기열 영역과 방향을 정의합니다."""
        cap = cv2.VideoCapture(self.video_source)
        ret, first_frame = cap.read()
        if not ret:
            raise ValueError("비디오에서 첫 번째 프레임을 로드할 수 없습니다.")

        print("대기열 영역을 정의합니다. 클릭으로 영역을 지정하고 'x'를 눌러 다음 단계로 이동하세요.")
        print("여러 대기열을 정의하려면 'n'을 누르세요. 모든 정의를 완료하려면 'w'를 누르세요.")

        cv2.namedWindow("Queue Area Definition")
        
        queue_points = []
        direction_points = []
        is_defining_direction = False

        def mouse_callback(event, x, y, flags, param):
            if event == cv2.EVENT_LBUTTONDOWN:
                if not is_defining_direction:
                    queue_points.append([x, y])
                else:
                    direction_points.append([x, y])

        cv2.setMouseCallback("Queue Area Definition", mouse_callback)

        while True:
            frame_copy = first_frame.copy()
            
            if queue_points:
                points_array = np.array(queue_points)
                cv2.polylines(frame_copy, [points_array], True, (0, 255, 0), 2)
                for point in queue_points:
                    cv2.circle(frame_copy, tuple(point), 5, (0, 255, 0), -1)

            if direction_points:
                for i in range(len(direction_points) - 1):
                    cv2.line(frame_copy, tuple(direction_points[i]), 
                            tuple(direction_points[i + 1]), (0, 0, 255), 2)
                for point in direction_points:
                    cv2.circle(frame_copy, tuple(point), 5, (0, 0, 255), -1)

            mode_text = "방향 정의 모드" if is_defining_direction else "영역 정의 모드"
            cv2.putText(frame_copy, mode_text, (10, 30), 
                        cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)

            cv2.imshow("Queue Area Definition", frame_copy)
            key = cv2.waitKey(1) & 0xFF

            if key == ord('x'):
                if not is_defining_direction and len(queue_points) >= 3:
                    is_defining_direction = True
                    print("방향을 정의합니다. 클릭으로 방향점을 지정하세요.")

            elif key == ord('n'):
                if len(queue_points) >= 3 and len(direction_points) >= 2:
                    self.queue_data["queues"].append({
                        "qid": self.current_queue_id,
                        "area": queue_points,
                        "direction": direction_points
                    })
                    self.current_queue_id += 1
                    queue_points.clear()
                    direction_points.clear()
                    is_defining_direction = False
                    print("새로운 대기열 정의를 시작합니다.")

            elif key == ord('w'):
                if len(queue_points) >= 3 and len(direction_points) >= 2:
                    self.queue_data["queues"].append({
                        "qid": self.current_queue_id,
                        "area": queue_points,
                        "direction": direction_points
                    })
                    with open(self.json_file, 'w') as f:
                        json.dump(self.queue_data, f, indent=2)
                    break

        cap.release()
        cv2.destroyWindow("Queue Area Definition")

    def process_video(self):
        """영상에서 대기열 영역을 기준으로 객체를 감지하고 대기 시간을 계산합니다."""
        cap = cv2.VideoCapture(self.video_source)
        
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        fps = cap.get(cv2.CAP_PROP_FPS)
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        out = cv2.VideoWriter('output.mp4', fourcc, fps, (width, height))

        # cv2.namedWindow("Queue Monitoring", cv2.WINDOW_NORMAL)
        cv2.namedWindow("Queue Monitoring")
        
        while True:
            ret, frame = cap.read()
            if not ret:
                break

            self.frame_count += 1
            if self.frame_count % self.frame_interval != 0:
                continue

            output_frame = frame.copy()
            results = self.model.track(frame, persist=True, classes=[0])

            if results[0].boxes is not None and hasattr(results[0].boxes, 'id'):
                boxes = results[0].boxes.xywh.cpu()
                tracking_ids = results[0].boxes.id.int().cpu().tolist()

                if self.debug_mode:
                    for queue in self.queue_data["queues"]:
                        area_points = np.array(queue["area"])
                        direction_points = np.array(queue["direction"])
                        cv2.polylines(output_frame, [area_points], True, (0, 255, 0), 2)
                        for i in range(len(direction_points) - 1):
                            cv2.line(output_frame, tuple(direction_points[i]), 
                                    tuple(direction_points[i + 1]), (0, 0, 255), 2)

                queue_positions = []
                for queue in self.queue_data["queues"]:
                    area_points = np.array(queue["area"])
                    direction_points = queue["direction"]
                    
                    for box, track_id in zip(boxes, tracking_ids):
                        x, y, w, h = box
                        center_point = (int(x), int(y))
                        
                        if cv2.pointPolygonTest(area_points, center_point, False) >= 0:
                            position = self.calculate_position_along_direction(
                                center_point, direction_points)
                            queue_positions.append((track_id, position))
                            
                            if self.tracking_history[track_id]["entry_time"] is None:
                                self.tracking_history[track_id]["entry_time"] = time.time()

                queue_positions.sort(key=lambda x: x[1])
                for display_id, (track_id, _) in enumerate(queue_positions, 1):
                    self.tracking_history[track_id]["display_id"] = display_id

                for box, track_id in zip(boxes, tracking_ids):
                    x, y, w, h = box
                    center_point = (int(x), int(y))
                    
                    for queue in self.queue_data["queues"]:
                        area_points = np.array(queue["area"])
                        if cv2.pointPolygonTest(area_points, center_point, False) >= 0:
                            wait_time = int(time.time() - self.tracking_history[track_id]["entry_time"])
                            display_id = self.tracking_history[track_id]["display_id"]
                            if display_id is not None:
                                cv2.putText(output_frame, 
                                          f"ID:{display_id} {wait_time}s",
                                          (int(x - w/2), int(y - h/2 - 10)),
                                          cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

            out.write(output_frame)
            cv2.imshow("Queue Monitoring", output_frame)
            
            key = cv2.waitKey(1) & 0xFF
            if key == ord('q'):
                break
            elif key == ord('d'):
                self.debug_mode = not self.debug_mode

        cap.release()
        out.release()
        cv2.destroyAllWindows()

if __name__ == "__main__":
    monitor = QueueMonitor('queue_01.mp4')
    monitor.load_or_create_queue_areas()
    monitor.process_video()

 

테스트 결과 확인

프로그램 실행 후  '영역 지정 모드'에서 초록색 점으로 연결된 박스 형태로 대기열(줄)로 감지할 영역을 설정합니다. 그리고나서 '방향 정의 모드'에서 대기열(줄)의 방향을 붉은 색 점으로 연결된 선 형태로 지정합니다. 아래는 두 가지 작업을 진행한 예시입니다. 오른쪽 끝에서부터 왼쪽 끝으로 갈 수록 기다리는 사람의 ID가 1씩 증가하게 될 겁니다. 

그림 : 대기줄의 영역 (초록색), 대기줄의 방향 (빨간색)을 설정하는 화면 예시

 

 

이렇게 설정이 완료되고 나서, 영상을 적용해 보면 실행 결과는 다음과 같이 나옵니다. 감지된 사람 별 ID와 대기 시간입니다. 맨 오른쪽이 1번, 맨 왼쪽이 5번 여기까지는 잘 나온 것 같습니다. 아쉽게도 샘플 영상에서는 처음부터 5명의 인원이 화면 상에 계속 있습니다. 그러다보니 기다렸다가 계산하고 나가서 순서가 바뀌는 등의 다양한 예시를 확인해 볼 수가 없었습니다. 제가 기대한 영상은 대형 마트의 계산대 윗쪽 CCTV 영상 같은 것이었는데 그런 것은 공개된 것을 찾기가 어려워서 포기했습니다. 

그림 : 대기열 상의 사람 별 대기 시간 표시

 

정리하며

이번 글에서는 마트 등에서 대기하는 사람들의 대기 시간을 영상 분석을 통해 측정하는 사례를 테스트 해 봤습니다. OpenCV의 비전 기술을 제외하면 결국은 객체감지와 트래킹 기법이 들어간 응용 사례인 셈입니다. 간단한 사례이긴 하지만, 공공기관에서 민원인의 대기, 대형마트에서 캐셔(계산원)의 업무 부하 측정, 혹은 점심시간 등에 고객의 대기 시간을 최소화하기 위해 추가인원을 투입해야 하는지 의사결정을 하는 과정이라든지 등등 실제 비즈니스에서는 이런 요건을 구현하는 것이 꼭 필요한 곳이 많이 있을 겁니다. 고객 만족이 결국 매출을 늘이고 수익을 극대화할 수 있는 기본이 되기 때문이죠.

 


 

참고자료

  • 영상) QUEUE DETECTION | HANWHA TECHWIN EUROPE (링크)

사진 : Queue Detection을 이용한 분석 예시

 

 

  • 영상) [Ambient AI Competition] Pedestrian Classification for Linear Queue Detection  (영상)
    • 서울대 학생들의 셔틀버스 줄 대기자를 AI 모델로 감지하고 분석한 사례로, 서울대 산공과 학생의 2023년 Winter Ambient AI Competition 1등 수상작이라고 합니다.
    • 선형 대기열 탐지를 위한 보행자 분류 최적화 알고리즘 적용했습니다. 

그림 : 셔틀버스 대기줄과 대기시간 측정에 활용한 사례 (서울대)

 

 

  • 영상) Queue Length Monitoring with Searchlight for Banking (영상)
    • 은행에서 대기자 및 대기시간 측정 사례 예시

그림 : 은행 창구에서 대기하는 사람 별로 대기 시간을 측정하는 사례

 

  • 영상) Demo | Intelligent Queue Management with YOLOv8 Object Detection & Counting Edge AI Reference Kit (링크)

그림 : 마트 계산대에서 대기 인원 카운트를 하는 사례