본문 바로가기

개발기술/통신 인터페이스, 프로토콜

RabbitMQ : Java Client 통신구조

 RabbitMQ 와 client 의 통신 구조 전반

  • RabbitMQ와 Connection 1개당 메인 쓰레드를 생성하고 read()를 동해서 메인 쓰레드는 Socket에 데이터가 들어올때까지 Block 상태로 대기한다. 
  • Socket에 데이터가 들어오면 메인쓰레드는 Woker Thread에게 callback 함수를 전달하여 후속 로직을 실행한다.
  • 이벤트 드리븐 아키텍쳐지만 non blocking event loop구조는 아니다 왜냐면  MQ client 는 보통 많은 connection 을 열 필요가 없음 Kafka, RabbitMQ, NATS, Redis Streams 등 MQ client 들은 한 application 이 broker 와 보통 1~5개 connection 만 유지합니다. epoll, kqueue 는 수천~수만 socket fd 를 하나의 event loop 로 처리하기 위한 기술인데, MQ client 에선 그 정도 규모가 아님. thread 몇 개로도 충분히 커버 가능.

 

Connection 생성 (TCP + AMQP 프로토콜)

  • RabbitMQ 는 브로커 서버(rabbitmq-server)가 있고, Java Client (예: com.rabbitmq.client.*) 는 브로커와 TCP 소켓 연결을 맺음. 이 위에서 AMQP 프로토콜로 데이터(메시지, ack, heartbeat)를 주고받음.
  • TCP 연결(AMQP connection) 개수는 당신이 newConnection() 몇 번 호출하느냐로 결정 돼요. (보통 하나만 열고, 그 위에 여러 channel 을 씁니다.)
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");

Connection connection = factory.newConnection();

 

Channel 생성 (AMQP virtual connection)

  •  broker 는“connection 이 열렸다고 해서 거기로 메시지를 보내지 않음. connection 위에 channel (AMQP virtual connection)을 열고, 그 channel 로 basicConsume 을 선언해서 “queue A" 를 구독해야만 메세지를 받음
  • connection (TCP) 위에 만드는 논리적 경로 (AMQP virtual connection)로 하나의 connection 위에 여러 channel을 열 수 있음.

채널(Channel) 마다 독립적 설정


기능 설명
queue consume 이 채널에서 어떤 queue 를 구독할지 (basicConsume),
publish 이 채널로 메시지를 publish (queue/exchange로)
ack/nack 이 채널에서 consume 한 메시지에 대해 ack/nack
flow control basicQos (prefetch) 로 flow control, 즉 몇 개까지 동시에 처리할지 결정

 

Channel channelA = conn.createChannel();
channelA.basicQos(5); // 채널 A는 동시에 최대 5개까지만 ack 대기
channelA.basicConsume("queue1", false, callbackA);

Channel channelB = conn.createChannel();
channelB.basicQos(20); // 채널 B는 동시에 20개까지 ack 대기
channelB.basicConsume("queue2", false, callbackB);

 

 ack / nack

  • ack : 나 이 메시지 성공적으로 처리했어, 삭제해도 돼" 라고 RabbitMQ에게 알림. 그러면 RabbitMQ가 큐에서 이 메시지를 지워버림.
  • nack : "이 메시지 처리 실패했어" requeue = true 이면 → 다시 Queue에 넣어서 나중에 재시도.

   

Flow Control prefetch (basicQos)

  • prefetch count (QoS, Quality of Service) 는
  • "이 채널이 동시에 'ack되지 않은' 메시지를 최대 몇 개까지 받을지" 를 제한
  • RabbitMQ 기본은 unlimited (prefetch 무제한). consumer가 메시지를 계속 받고, ack가 늦어지면 메모리에 메시지가 잔뜩 쌓임. basicQos(5) 를 주면 이 채널(consumer)는 동시에 최대 5개의 ack 대기 메시지만 받음 나머지는 ack 보내기 전까지 서버 큐에 그대로 대기.

 channel consumtipion tuning 이 필요한이유

 

  • 메시지 분산(공정한 dispatch) : 아직 처리 안 된 메시지가 channelA의 unacked 상태로 몰려있어서 다시 requeue → 재분배까지 시간이 걸림. prefetch = 5 이면 unack 메세지가 금새 다른 채널B/C로 넘어감
  • backpressure : 내 worker pool은 10개밖에 없는데, 채널에 이미 1000개를 받은 상태 (unacked)JVM heap 메모리에서 큐처럼 쌓여서 → GC 부담, OOM 리스크 증가.

 

 

Channel channel = connection.createChannel();

// 이 채널에서 queueA 구독, handleDelivery 로 처리
channel.basicConsume("queueA", (consumerTag, message) -> {
        System.out.println("QueueA: " + new String(message.getBody()));
        }, consumerTag -> {});

// 같은 connection 위에서 다른 채널로
Channel channel2 = connection.createChannel();

// 이 채널에서는 queueB 구독, 다른 callback
channel2.basicConsume("queueB", (consumerTag, message) -> {
        System.out.println("QueueB: " + new String(message.getBody()));
        }, consumerTag -> {});

 

  

 

  • 채널하나의 channel 만 쓰면 prefetch(=동시 처리 가능 메시지 수)가 1~10 정도로 제한. channel 여러 개를 쓰면 prefetch 를 각각 먹여서 소비 속도 확장.
    •  

 

 

 

 

Socket read thread (MainLoop)

  • connection 당 하나씩 있는 thread가 blocking read 로 socket 에서 계속 데이터를 읽음.
    • read() 로 broker 에서 오는 데이터가 socket에 올때까지 blocking 해서 기다림.
  • 읽은 데이터를 AMQP frame 으로 해석 → channel id로 분리 → 각 channel에 전달.
  • consumer callback 으로 dispatch

 

while (true) {
    Frame frame = readFromSocket(); // blocking
    int channelId = frame.getChannelId();
    ChannelN channel = connection.getChannel(channelId);
    channel.handleFrame(frame);
}

 

 

Consumer

  • channel.basicConsume() 을 하면,
  • 해당 queue 의 메시지를 broker 가 push.
  • socket read thread 가 이를 받아서,
  • channel 에 등록된 consumer callback 으로 넘김.

worker thread (consumer callback thread pool)

 

 

  • ConsumerWorkService 라는 내부 thread pool 이 있어서
  • socket read thread (MainLoop) 가 메시지를 받으면
  • 이 pool 에 task 를 제출해서 handleDelivery() callback 을 실행
  • 실제 DeliverCallback.handleDelivery() 같은 비즈니스 로직은
  • RabbitMQ Java Client 내부의 ConsumerWorkService (thread pool) 에서 실행됨.

 

ExecutorService executorService = Executors.newFixedThreadPool(10);

Connection connection = factory.newConnection(executorService);

 

 

 

 

Spring AMQP (Spring Boot)에서는?

 

  • @RabbitListener 를 사용하면,
  • Spring 이 알아서 connection / channel / socket read thread / worker thread 를 모두 관리.
  • SimpleMessageListenerContainer 가 RabbitMQ Client 위에 동작하면서,
  • 메시지를 받아 Spring 의 TaskExecutor (thread pool) 에서 callback 을 실행.