본문 바로가기

개발기술/설계|소프트웨어패턴

Consumer-Producer 패턴

핵심 아이디어

Consumer-Producer 패턴은 "만드는 사람"과 "사용하는 사람"을 분리하는 개념입니다.

 

구성 요소

Producer (생산자)

  • 데이터나 작업을 만들어내는 역할
  • 예: 파일을 읽어서 데이터를 생성하는 프로그램

Consumer (소비자)

  • Producer가 만든 데이터를 가져다 사용하는 역할
  • 예: 데이터를 받아서 화면에 출력하는 프로그램

Buffer (버퍼)

  • Producer와 Consumer 사이의 임시 저장소
  • 예: 대기열(Queue)처럼 데이터를 순서대로 보관

왜 필요한가?

  • 속도 차이 해결: 만드는 속도와 사용하는 속도가 다를 때 한쪽이 바쁠 때 다른 쪽이 기다리지 않아도 됨
  • 코드 독립성: 각 모듈이 서로의 내부 구현이나 존재를 몰라도 동작하도록 해서 변경 시 코드 수정 범위를 최소화하는 것을 의미
Producer: 1초마다 숫자 1개씩 생성
Buffer: 최대 5개까지 저장 가능
Consumer: 2초마다 숫자 1개씩 처리

시간 0초: Producer가 1 생성 → Buffer [1]
시간 1초: Producer가 2 생성 → Buffer [1,2]
시간 2초: Producer가 3 생성, Consumer가 1 처리 → Buffer [2,3]

스레드 풀 분리

성격이 다른 작업을 서로 간섭하지 않도록 격리하기 위함 여기서 '성격'은 단지 I/O와 CPU의 차이뿐만 아니라, 작업의 우선순위, 예상 실행 시간, 안정성 등 다양한 기준을 포함합니다.

 

  • 가장 기본적인 분리 기준 (I/O vs. CPU):
    • CPU 바운드 작업은 CPU 코어 수에 맞춰 스레드 수를 제한하여 컨텍스트 스위칭 오버헤드를 줄여야 효율적입니다.
    • I/O 바운드 작업은 대부분의 시간을 대기하므로, CPU 코어 수보다 훨씬 많은 스레드를 두어 CPU를 놀리지 않고 효율적으로 사용해야 합니다.
    • 이 두 가지를 한 풀에 섞어두면 최적화가 불가능해 비효율적이 됩니다.
  • 더 확장된 분리 기준:
    • 작업의 중요도: 응답 속도가 중요한 실시간 요청과, 시간이 좀 걸려도 되는 배치 작업 등을 분리합니다.
    • 리소스 의존성: 데이터베이스 커넥션 풀을 사용하는 작업과, 외부 API를 호출하는 작업 등을 분리하여 특정 리소스의 병목이 전체에 영향을 주지 않도록 합니다.
    • 안정성: 안정성이 보장되지 않는 외부 시스템 호출 같은 위험한 작업을 별도의 풀에 격리하여, 해당 작업이 실패해도 핵심 기능에는 영향이 없도록 합니다.

 

구현예시

  • Controller는 빠른 응답, 외부 Event 수신을 위해서 항시대기 해야하기 때문에 controller 스레드로는 가벼운 작업만 처리
  • 무거운 작업은 별도의 스레드 풀을 사용하여 진행
[Controller]  →  [Queue]  →  [Consumer(Worker)]
        │                 │              │
        │                 │              └─ DB 저장, Redis 업데이트 등 무거운 작업
        │                 └─ 버퍼(Backpressure)
        └─ 빠른 응답

 

Controller : 외부 Event 수신

@PostMapping(path = "/fa/count")
public ResponseEntity<ApiResponseDto<Void>> addFaCountEvent(@io.swagger.v3.oas.annotations.parameters.RequestBody(description = "FA 카운트 이벤트 등록 DTO") @RequestBody @Valid FaEventDto<FaEventDto.CountDto> dto) {
    eventService.addFaCountEvent(dto);

 

Queue : Event 적재

public void addQueue(FaEventDto<FaEventDto.CountDto> countDto) {
    this.countQueue.add(countDto);
}

 

Worker : Event 소비

@Async("taskExecutor")
public void startFaEventProcessing() {
    log.info("[FaCountProcessorService][startFaEventProcessing] start FA Count Thread : {}", Thread.currentThread().getName());
    while (true) {
        try {
            FaEventDto<FaEventDto.CountDto> countDto = countQueue.take();
            processData(countDto);
        }catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            break;
        }
    }
}

 

BlockingQueue

Blocking(블로킹)" 기능이 추가된 큐(Queue)입니다. 여기서 '블로킹'은 특정 상황에서 스레드를 멈춰 세워 효율적으로 자원을 관리한다는 의미입니다. Producer-Consumer 패턴을 구현하는 데 있어 가장 중요한 핵심 요소.

 

 

1. Producer 측의 블로킹 (생산자의 대기)

  • put(E e) 메서드: 큐에 데이터를 넣는 메서드입니다.
  • 기능: 만약 큐가 가득 차 있다면, put()을 호출한 Producer 스레드는 큐에 공간이 생길 때까지 wait 상태로 멈춥니다.
  • 이유: 큐가 가득 차면 생산자는 잠시 멈추고 기다리면서, Consumer의 처리 속도에 맞춰 생산 속도를 조절하여  메모리 과부하(OOM, Out-Of-Memory)를 방지하는 백프레셔(Backpressure) 역할을 자동으로 수행합니다.

2. Consumer 측의 블로킹 (소비자의 대기)

  • take() 메서드: 큐에서 데이터를 가져오는 메서드입니다.
  • 기능: 만약 큐가 비어 있다면, take()을 호출한 Consumer 스레드는 큐에 새로운 데이터가 들어올 때까지 wait 상태로 멈춥니다.
  • 이유:  데이터가 들어올 때까지 CPU를 소모하지 않고 효율적으로 대기합니다. 이 방식은 계속해서 if(queue.isEmpty())를 체크하는 비효율적인 Busy Waiting을 방지합니다.