본문 바로가기

개발기술/통신 인터페이스, 프로토콜

RabbitMQ 구현

RabbitMQ 핵심 요소

요소설명

Producer (생산자) 메시지를 RabbitMQ에 전송하는 역할 (IP와 Port로 rabbitMQ 식별)
Exchange (교환기) 메시지를 큐에 전달하는 역할 (Producer가 보낸 routing key로 Queue 맵핑)
Queue (큐) 메시지가 저장되는 공간 ( Consumer가 ip,port, id, pw, virtualHost로 consumer로 등록)
Consumer (소비자) 큐에서 메시지를 가져와 처리하는 역할 (Consume역할은 queue의 이름으로 맵핑하여 consume)

 

 

. Exchange 유형

Direct
자주사용
특정 Routing Key가 일치하는 Queue에만 메시지를 전달 / 특정 로그 레벨 (error, warning, info)을 필터링하여 전
-> 일반적으로 많이 사용
Fanout
사용됨
모든 바인딩된 Queue에 메시지를 브로드캐스트 / 실시간 알림, 공지사항 전송
Topic
자주사용
패턴 매칭을 이용해 특정 조건의 Queue로 메시지를 전달 /  특정 카테고리의 로그만 받기 (logs.error.#)
-> 특정 패턴
Headers 메시지의 Header 값을 기반으로 라우팅 / 특정 속성(type: json, format: utf-8)을 가진 메시지만 전달

 

RabbitMQ 메시지 흐름

[Producer][RabbitMQ 서버(Exchange -> Queue)][Consumer]
  • Producer: 메시지를 RabbitMQ로 보냄
  • RabbitMQ : 메시지를 받아서, Exchange를 사용해서 적절한 Queue로 라우팅
  • Consumer: 메시지를 받아서 비즈니스 로직 처리

 RabbitMQ 서버가 모든 것을 관리하는 중앙 브로커 역할을 합니다. 즉, Exchange와 Queue는 RabbitMQ 서버 내부의 논리적인 개체이며 클러스터 환경에서는 여러 노드가 Queue(메타데이터와 메시지)를 공유·미러링할 수 있습니다.

 

 Producer (메시지 발행)

  • RabbitTemplate을 사용하여 메시지를 Exchange에 전송
  • convertAndSend(Exchange, Routing Key, Message) 호출
dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-amqp' // RabbitMQ 의존성
}
@Service
@RequiredArgsConstructor
public class ReservationService {
    private final ReservationRepository reservationRepository;
    private final NotificationProducer notificationProducer;

    @Transactional
    public ReservationDto rejectReservation(String partnerId, String reservationId) {
        log.info("Rejecting reservation {}", reservationId);

        ReservationEntity reservationEntity = reservationRepository.findByReservationId(reservationId)
            .orElseThrow(() -> new CustomException(ErrorCode.RESERVATION_ID_NONEXISTENT));

        if (!reservationEntity.getStoreEntity().getPartnerEntity().getPartnerId().equals(partnerId)) {
            throw new CustomException(ErrorCode.MEMBERID_STOREOWNER_UNMATCHED);
        }

        if (!reservationEntity.getReservationStatus().equals(ReservationStatus.REQUESTED)) {
            throw new CustomException(ErrorCode.RESERVATION_STATUS_ERROR);
        }

        log.info("Updating reservation status to REJECTED for reservation {}", reservationId);
        reservationEntity.setReservationStatus(ReservationStatus.REJECTED);
        reservationRepository.save(reservationEntity); // ✅ 동기 처리 (트랜잭션 보장)

        // ✅ 비동기적으로 알림 처리 (RabbitMQ에 메시지 전송)
        notificationProducer.sendNotificationMessage(reservationEntity);

        return ReservationDto.fromEntity(reservationEntity);
    }
}

 

RabbitTemplate을 Spring Boot에서 자동으로 설정할 수도 있고, 커스텀 설정도 가능합니다.Spring Boot에서는 spring-boot-starter-amqp를 추가하면 기본적으로 RabbitTemplate이 자동으로 빈(Bean)으로 등록됩니다.

 

@Service
@RequiredArgsConstructor
public class NotificationProducer {
    private final RabbitTemplate rabbitTemplate;

    public void sendNotificationMessage(NotificationMessage message) {
        log.info("Sending notification message: {}", message);
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.ROUTING_KEY, message);
    }
}

 

메시지는 NotificationMessage DTO 형태로 RabbitMQ를 통해 전달됨.

@Data
@AllArgsConstructor
@NoArgsConstructor
public class NotificationMessage {
    private String userId;
    private String message;
}

 

RabbitMQ에서 Producer가 메시지를 직접 Queue로 보내지 않고, Exchange로 보내므로 Exchange 서버 정보를 설정해야 합니다.

 

Spring Boot 프로젝트에 RabbitMQ 설정

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-amqp' // RabbitMQ 의존성
}

 

RabbitMQ Configuration 설정

  간단한 개발코드에서는 Producer에서 RabbitMQ 설정을 담당하고, Consumer는 단순히 Queue를 리스닝(Listen)하는 구조를 갖을 수 있음. 실제 개발에서는 설정이 동적으로 변경되어서는 안되기때문에  DevOps/Admin이 RabbitMQ Management UI 또는 설정 파일 (rabbitmq.conf) 로 설정을 관리한다.

 

RabbitMQ의 핵심 개념

 Exchange(교환기)  : 메시지를 적절한 Queue로 라우팅(Routing) 하는 역할. Exchange Type에 따라 메시지 분배 방식이 달라짐

  • Exchange는 이벤트(Event) 개념으로 묶인다

 Queue(큐) : 메시지를 임시로 저장하는 공간, Consumer가 메시지를 가져가 처리하면 제거됨

  • 큐는 이벤트에 맞는 액션(Action)으로 묶인다

 Binding(바인딩) : Exchange와 Queue를 연결하는 설정, 어떤 Queue가 어떤 Exchange의 메시지를 받을지 정의

 Routing Key(라우팅 키) : 특정 Queue로 메시지를 보내기 위한 라우팅 정보. Exchange에서 Routing Key를 기준으로 메시지를 배분



Direct Exchange (직접 라우팅)

설정코드

@Configuration
public class DirectExchangeConfig {

    public static final String DIRECT_EXCHANGE_NAME = "direct.exchange";
    public static final String QUEUE1_NAME = "queue.order";
    public static final String QUEUE2_NAME = "queue.payment";
    public static final String ROUTING_KEY_ORDER = "order.created";
    public static final String ROUTING_KEY_PAYMENT = "payment.processed";

    @Bean
    public Queue orderQueue() {
        return new Queue(QUEUE1_NAME, true);
    }

    @Bean
    public Queue paymentQueue() {
        return new Queue(QUEUE2_NAME, true);
    }

    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange(DIRECT_EXCHANGE_NAME);
    }

    @Bean
    public Binding orderBinding(Queue orderQueue, DirectExchange directExchange) {
        return BindingBuilder.bind(orderQueue).to(directExchange).with(ROUTING_KEY_ORDER);
    }

    @Bean
    public Binding paymentBinding(Queue paymentQueue, DirectExchange directExchange) {
        return BindingBuilder.bind(paymentQueue).to(directExchange).with(ROUTING_KEY_PAYMENT);
    }
}

 

Producer Code

@Service
@RequiredArgsConstructor
public class DirectProducer {
    private final RabbitTemplate rabbitTemplate;

    public void sendOrderCreatedMessage(String message) {
        rabbitTemplate.convertAndSend("direct.exchange", "order.created", message);
    }

    public void sendPaymentProcessedMessage(String message) {
        rabbitTemplate.convertAndSend("direct.exchange", "payment.processed", message);
    }
}

 

Consumer Code

@Component
@RabbitListener(queues = "queue.order")
public class OrderConsumer {
    @RabbitHandler
    public void receiveOrderMessage(String message) {
        System.out.println("Received order message: " + message);
    }
}

@Component
@RabbitListener(queues = "queue.payment")
public class PaymentConsumer {
    @RabbitHandler
    public void receivePaymentMessage(String message) {
        System.out.println("Received payment message: " + message);
    }
}

Fanout Exchange (브로드캐스트)

@Configuration
public class FanoutExchangeConfig {

    public static final String FANOUT_EXCHANGE_NAME = "fanout.exchange";

    @Bean
    public Queue queue1() {
        return new Queue("queue.email", true);
    }

    @Bean
    public Queue queue2() {
        return new Queue("queue.sms", true);
    }

    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(FANOUT_EXCHANGE_NAME);
    }

    @Bean
    public Binding binding1(Queue queue1, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue1).to(fanoutExchange);
    }

    @Bean
    public Binding binding2(Queue queue2, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue2).to(fanoutExchange);
    }
}

 

Producer 코드

@Service
@RequiredArgsConstructor
public class FanoutProducer {
    private final RabbitTemplate rabbitTemplate;

    public void sendBroadcastMessage(String message) {
        rabbitTemplate.convertAndSend("fanout.exchange", "", message);
    }
}

 

Consumer 코드

@Component
@RabbitListener(queues = "queue.email")
public class EmailConsumer {
    @RabbitHandler
    public void receiveEmailMessage(String message) {
        System.out.println("Received email notification: " + message);
    }
}

@Component
@RabbitListener(queues = "queue.sms")
public class SmsConsumer {
    @RabbitHandler
    public void receiveSmsMessage(String message) {
        System.out.println("Received SMS notification: " + message);
    }
}

Topic Exchange (패턴 기반 라우팅)

@Configuration
public class TopicExchangeConfig {

    public static final String TOPIC_EXCHANGE_NAME = "topic.exchange";

    @Bean
    public Queue errorQueue() {
        return new Queue("queue.error", true);
    }

    @Bean
    public Queue infoQueue() {
        return new Queue("queue.info", true);
    }

    @Bean
    public Queue allQueue() {
        return new Queue("queue.all", true);
    }

    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(TOPIC_EXCHANGE_NAME);
    }

    @Bean
    public Binding errorBinding(Queue errorQueue, TopicExchange topicExchange) {
        return BindingBuilder.bind(errorQueue).to(topicExchange).with("log.error.#");
    }

    @Bean
    public Binding infoBinding(Queue infoQueue, TopicExchange topicExchange) {
        return BindingBuilder.bind(infoQueue).to(topicExchange).with("log.info.#");
    }

    @Bean
    public Binding allBinding(Queue allQueue, TopicExchange topicExchange) {
        return BindingBuilder.bind(allQueue).to(topicExchange).with("log.#");
    }
}

Producer 코드

@Service
@RequiredArgsConstructor
public class TopicProducer {
    private final RabbitTemplate rabbitTemplate;

    public void sendErrorLog(String message) {
        rabbitTemplate.convertAndSend("topic.exchange", "log.error.db", message);
    }

    public void sendInfoLog(String message) {
        rabbitTemplate.convertAndSend("topic.exchange", "log.info.server", message);
    }

    public void sendAllLog(String message) {
        rabbitTemplate.convertAndSend("topic.exchange", "log.system", message);
    }
}

 

consumer 코드

@Component
@RabbitListener(queues = "queue.error")
public class ErrorConsumer {
    @RabbitHandler
    public void receiveErrorMessage(String message) {
        System.out.println("Received error log: " + message);
    }
}

@Component
@RabbitListener(queues = "queue.info")
public class InfoConsumer {
    @RabbitHandler
    public void receiveInfoMessage(String message) {
        System.out.println("Received info log: " + message);
    }
}

@Component
@RabbitListener(queues = "queue.all")
public class AllConsumer {
    @RabbitHandler
    public void receiveAllMessage(String message) {
        System.out.println("Received all log: " + message);
    }
}

 

그리고 바인딩(Binding) 은 RabbitMQ 내부에서 Exchange와 Queue를 연결하는 라우팅 규칙이며, RabbitMQ 서버 내부에서 관리되는 설정 정보입니다.

 

개념설명

  • Queue (큐) → 메시지를 저장하는 공간
    • notification.queue라는 이름의 Queue를 생성.
    • durable=true → RabbitMQ 서버가 재시작되어도 Queue가 유지됨.
  • DirectExchange (교환기) → 특정 Routing Key를 가진 메시지만 전달
    • notification.exchange라는 이름의 DirectExchange를 생성.
    • DirectExchange는 특정 Routing Key를 가진 메시지만 해당 Queue로 전달.
  • Binding (바인딩) → 큐와 교환기를 연결 (라우팅 키를 통해 매칭)
    • notification.exchange → notification.queue 간의 바인딩 설정.
    • notification.send라는 Routing Key를 가진 메시지만 notification.queue에 전달됨.
    •  바인딩(Binding)은 RabbitMQ 클러스터 내부에서 관리되므로, 특정 서버에서만 설정해야 하는 개념이 아님.

 

Consumer (메시지 수신)

1. 하나의 Queue에서 처리하는 방식 (비효율적)

  • DB 저장, 웹소켓, 푸시 알림을 하나의 Queue에서 처리 → 하나가 지연되면 전체가 느려짐.
  • 모든 Consumer가 같은 작업을 수행해야 하므로 최적화하기 어려움.
  • @RabbitListener(queues = "notification.queue"): 특정 큐의 메시지를 소비
  • @RabbitHandler: 메시지를 받아서 처리하는 메서드
  • 메시지를 수신하면 DB 저장 → 웹소켓 전송 → 푸시 알림 전송
@Component
@RabbitListener(queues = "notification.queue")
public class NotificationConsumer {
    private final NotificationRepository notificationRepository;
    private final WebSocketService webSocketService;
    private final FirebasePushService firebasePushService;

    @RabbitHandler
    public void receiveNotification(NotificationMessage message) {
        log.info("Received notification message: {}", message);

        // ✅ 비동기적으로 알림 DB 저장
        NotificationEntity notification = new NotificationEntity(
            message.getUserId(),
            message.getMessage()
        );
        notificationRepository.save(notification);

        // ✅ 웹소켓 알림 전송
        webSocketService.sendNotificationWebSocket(message.getUserId(), message.getMessage());

        // ✅ Firebase 푸시 알림 전송
        firebasePushService.sendPushNotification(message.getUserId(), "Reservation Rejected", message.getMessage());
    }
}

 

2. 작업별로 Queue를 분리하는 방식 (권장되는 설계)

// ✅ DB 저장 전용 Consumer
@RabbitListener(queues = "notification.db.queue", concurrency = "5")
public void saveNotification(NotificationMessage message) {
    notificationRepository.save(new NotificationEntity(message.getUserId(), message.getMessage()));
}

// ✅ 웹소켓 전송 전용 Consumer
@RabbitListener(queues = "notification.websocket.queue", concurrency = "3")
public void sendWebSocketNotification(NotificationMessage message) {
    webSocketService.sendNotificationWebSocket(message.getUserId(), message.getMessage());
}

// ✅ 푸시 알림 전송 전용 Consumer
@RabbitListener(queues = "notification.push.queue", concurrency = "10")
public void sendPushNotification(NotificationMessage message) {
    firebasePushService.sendPushNotification(message.getUserId(), "Reservation Rejected", message.getMessage());
}

 

Consumer도 Spring Boot에서 @RabbitListener를 사용하면 자동으로 설정됨!
✔ 별도의 설정 없이 @RabbitListener만 붙이면 Consumer로 동작할 수 있음.

@Component
@RabbitListener(queues = "notification.queue")
public class NotificationConsumer {
    @RabbitHandler
    public void receiveNotification(NotificationMessage message) {
        log.info("Received notification message: {}", message);
    }
}

 

 

RabbitMQ  Broker설정 

대부분의 RabbitMQ 시스템에서는 Exchange, Queue, Binding을 Producer가 직접 정의하지 않음.
이 작업은 운영 환경에서 Infrastructure 관리 주체 (DevOps, 관리자, 별도 설정 시스템) 가 수행하는 것이 일반적.

 

 

 

 

RabbitMQ Plugin

 

 

 


 

목적 사용하는 플러그인
웹 UI로 상태 확인 rabbitmq_management
Prometheus/Grafana로 모니터링 rabbitmq_prometheus
MQTT 장치 연동 rabbitmq_mqtt, rabbitmq_web_mqtt
다른 브로커로 메시지 복사 rabbitmq_shovel, rabbitmq_federation
딜레이 큐 기능 rabbitmq_delayed_message_exchange
LDAP/OAuth 인증 연동 rabbitmq_auth_backend_ldap, rabbitmq_auth_backend_oauth2

 

  • RabbitMQ Management UI (HTML Web Interface) :  웹 기반 관리 UI (Management Plugin) 가 있어서, GUI로 설정을 직접 변경 가능.
    • UI에서 할 수 있는 것
      • Exchange, Queue 생성 및 삭제
      • Binding 설정
      • 현재 메시지 상태 확인
      • Connected Clients 확인
      • Message Rate 모니터링

📌 RabbitMQ Management UI 활성화

  1. RabbitMQ Management Plugin 활성화:
rabbitmq-plugins enable rabbitmq_management


 2. 웹 브라우저에서 접속:

http://localhost:15672
 
3. 기본 로그인 정보
  • Username: guest
  • Password: guest
  • (운영 환경에서는 보안상 변경해야 함)

 

RabbitMQ 설정 파일 (rabbitmq.conf)

  운영 환경에서는 자동화 & 유지보수를 위해 UI보다는 rabbitmq.conf 또는 Docker 환경변수를 사용해서 CI/CD를  것이 더 좋음.

# 메시지 큐 설정
        default_user = myuser
default_pass = mypassword
listeners.tcp.default = 5672
management.tcp.port = 15672

# 클러스터링 설정
cluster_formation.peer_discovery_backend = rabbit

 

Virtual Host (vhost)

 

in RabbitMQ is a logical namespace that isolates resources like queues, exchanges, and bindings. It allows multiple applications or tenants to use the same RabbitMQ instance without interfering with each othe

 

Why Use Virtual Hosts?

  • Isolation → Different applications or teams can have separate RabbitMQ environments.
  • Security → Each vhost can have different access controls (permissions for users).
  • Multi-Tenancy → Multiple applications can share a single RabbitMQ instance without conflict.