RabbitMQ 와 client 의 통신 구조 전반
- RabbitMQ와 Connection 1개당 메인 쓰레드를 생성하고 read()를 동해서 메인 쓰레드는 Socket에 데이터가 들어올때까지 Block 상태로 대기한다.
- Socket에 데이터가 들어오면 메인쓰레드는 Woker Thread에게 callback 함수를 전달하여 후속 로직을 실행한다.
- 이벤트 드리븐 아키텍쳐지만 non blocking event loop구조는 아니다 왜냐면 MQ client 는 보통 많은 connection 을 열 필요가 없음 Kafka, RabbitMQ, NATS, Redis Streams 등 MQ client 들은 한 application 이 broker 와 보통 1~5개 connection 만 유지합니다. epoll, kqueue 는 수천~수만 socket fd 를 하나의 event loop 로 처리하기 위한 기술인데, MQ client 에선 그 정도 규모가 아님. thread 몇 개로도 충분히 커버 가능.
Connection 생성 (TCP + AMQP 프로토콜)
- RabbitMQ 는 브로커 서버(rabbitmq-server)가 있고, Java Client (예: com.rabbitmq.client.*) 는 브로커와 TCP 소켓 연결을 맺음. 이 위에서 AMQP 프로토콜로 데이터(메시지, ack, heartbeat)를 주고받음.
- TCP 연결(AMQP connection) 개수는 당신이 newConnection() 몇 번 호출하느냐로 결정 돼요. (보통 하나만 열고, 그 위에 여러 channel 을 씁니다.)
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel 생성 (AMQP virtual connection)
- broker 는“connection 이 열렸다고 해서 거기로 메시지를 보내지 않음. connection 위에 channel (AMQP virtual connection)을 열고, 그 channel 로 basicConsume 을 선언해서 “queue A" 를 구독해야만 메세지를 받음
- connection (TCP) 위에 만드는 논리적 경로 (AMQP virtual connection)로 하나의 connection 위에 여러 channel을 열 수 있음.
채널(Channel) 마다 독립적 설정
기능 | 설명 |
queue consume | 이 채널에서 어떤 queue 를 구독할지 (basicConsume), |
publish | 이 채널로 메시지를 publish (queue/exchange로) |
ack/nack | 이 채널에서 consume 한 메시지에 대해 ack/nack |
flow control | basicQos (prefetch) 로 flow control, 즉 몇 개까지 동시에 처리할지 결정 |
Channel channelA = conn.createChannel();
channelA.basicQos(5); // 채널 A는 동시에 최대 5개까지만 ack 대기
channelA.basicConsume("queue1", false, callbackA);
Channel channelB = conn.createChannel();
channelB.basicQos(20); // 채널 B는 동시에 20개까지 ack 대기
channelB.basicConsume("queue2", false, callbackB);
ack / nack
- ack : 나 이 메시지 성공적으로 처리했어, 삭제해도 돼" 라고 RabbitMQ에게 알림. 그러면 RabbitMQ가 큐에서 이 메시지를 지워버림.
- nack : "이 메시지 처리 실패했어" requeue = true 이면 → 다시 Queue에 넣어서 나중에 재시도.
Flow Control prefetch (basicQos)
- prefetch count (QoS, Quality of Service) 는
- "이 채널이 동시에 'ack되지 않은' 메시지를 최대 몇 개까지 받을지" 를 제한
- RabbitMQ 기본은 unlimited (prefetch 무제한). consumer가 메시지를 계속 받고, ack가 늦어지면 메모리에 메시지가 잔뜩 쌓임. basicQos(5) 를 주면 이 채널(consumer)는 동시에 최대 5개의 ack 대기 메시지만 받음 나머지는 ack 보내기 전까지 서버 큐에 그대로 대기.
channel consumtipion tuning 이 필요한이유
- 메시지 분산(공정한 dispatch) : 아직 처리 안 된 메시지가 channelA의 unacked 상태로 몰려있어서 다시 requeue → 재분배까지 시간이 걸림. prefetch = 5 이면 unack 메세지가 금새 다른 채널B/C로 넘어감
- backpressure : 내 worker pool은 10개밖에 없는데, 채널에 이미 1000개를 받은 상태 (unacked)JVM heap 메모리에서 큐처럼 쌓여서 → GC 부담, OOM 리스크 증가.
Channel channel = connection.createChannel();
// 이 채널에서 queueA 구독, handleDelivery 로 처리
channel.basicConsume("queueA", (consumerTag, message) -> {
System.out.println("QueueA: " + new String(message.getBody()));
}, consumerTag -> {});
// 같은 connection 위에서 다른 채널로
Channel channel2 = connection.createChannel();
// 이 채널에서는 queueB 구독, 다른 callback
channel2.basicConsume("queueB", (consumerTag, message) -> {
System.out.println("QueueB: " + new String(message.getBody()));
}, consumerTag -> {});
- 채널하나의 channel 만 쓰면 prefetch(=동시 처리 가능 메시지 수)가 1~10 정도로 제한. channel 여러 개를 쓰면 prefetch 를 각각 먹여서 소비 속도 확장.
Socket read thread (MainLoop)
- connection 당 하나씩 있는 thread가 blocking read 로 socket 에서 계속 데이터를 읽음.
- read() 로 broker 에서 오는 데이터가 socket에 올때까지 blocking 해서 기다림.
- 읽은 데이터를 AMQP frame 으로 해석 → channel id로 분리 → 각 channel에 전달.
- consumer callback 으로 dispatch
while (true) {
Frame frame = readFromSocket(); // blocking
int channelId = frame.getChannelId();
ChannelN channel = connection.getChannel(channelId);
channel.handleFrame(frame);
}
Consumer
- channel.basicConsume() 을 하면,
- 해당 queue 의 메시지를 broker 가 push.
- socket read thread 가 이를 받아서,
- channel 에 등록된 consumer callback 으로 넘김.
worker thread (consumer callback thread pool)
- ConsumerWorkService 라는 내부 thread pool 이 있어서
- socket read thread (MainLoop) 가 메시지를 받으면
- 이 pool 에 task 를 제출해서 handleDelivery() callback 을 실행
- 실제 DeliverCallback.handleDelivery() 같은 비즈니스 로직은
- RabbitMQ Java Client 내부의 ConsumerWorkService (thread pool) 에서 실행됨.
ExecutorService executorService = Executors.newFixedThreadPool(10);
Connection connection = factory.newConnection(executorService);
Spring AMQP (Spring Boot)에서는?
- @RabbitListener 를 사용하면,
- Spring 이 알아서 connection / channel / socket read thread / worker thread 를 모두 관리.
- SimpleMessageListenerContainer 가 RabbitMQ Client 위에 동작하면서,
- 메시지를 받아 Spring 의 TaskExecutor (thread pool) 에서 callback 을 실행.
'개발기술 > 통신 인터페이스, 프로토콜' 카테고리의 다른 글
전송계층 : TCP와 UDP (0) | 2025.04.30 |
---|---|
MessageBroker 구현 : RabbitMQ, Mosquitto (0) | 2025.02.17 |
메시지 큐, RabbitMQ, Kafka 개념 (0) | 2025.02.14 |
AI 영상분석과 websocket + message broker (0) | 2025.02.14 |
실시간 통신 기술의 개발 역사 (0) | 2025.02.14 |