Work Queues : Competing Consumers Pattern 이란?
메시지를 여러 Consumer 에게 분배하여 작업을 분산처리하는 구조.
작업 부하를 효율적으로 분산하고, 병렬 처리를 가능하게 만들어 처리 속도를 향상시킵니다.
- Round Ronin 방식과 Fair Dispatch 방식을 사용하여 메시지를 Consumer에게 분배
- Fair Dispatch 방식은 메시지 수동 확인 모드로 개발하고 메시지 처리 비중 설정등을 통해 조정 가능함
특징
1. 경쟁적 메시지 소비
- 여러 Consumer가 동일한 메시지 큐에서 메시지를 가져가서 처리합니다.
- 특정 메시지는 한 번에 하나의 Consumer에 의해 처리되므로 메시지 중복 처리를 방지합니다.
2. 작업 분산
- 메시지가 여러 Consumer 간에 분배되어 병렬로 처리되므로 작업 부하를 효율적으로 분산합니다.
3. 확장성
- Consumer를 추가하거나 제거함으로써 작업 처리 능력을 동적으로 확장하거나 축소합니다.
4. 내결함성
- Consumer 중 하나가 실패하더라도 다른 Consumer가 작업을 이어받아 처리할 수 있어 시스템이 중단되지 않고 작동합니다.
구현
구현은 4개의 class를 통해 구현합니다.
1. 설정파일
package com.example.HelloMessageQueue.step2;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
public static final String QUEUE_NAME = "WorkQueue";
@Bean
public Queue queue(){
/**
* Queue 생성자로 name과 durable(Boolean) 받음
* durable : 휘발성이냐 아니냐 (volatile, persistent)
* false로 주면 휘발성 - volatile -> 서버가 종료되거나 시작될 때 큐의 메시지가 사라짐
* true로 주면 영속성 - persistent
*/
return new Queue(QUEUE_NAME, true);
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
return new RabbitTemplate(connectionFactory);
}
@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(QUEUE_NAME);
container.setMessageListener(listenerAdapter);
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
return container;
}
@Bean
public MessageListenerAdapter listenerAdapter(WorkQueueConsumer workQueueTask){
return new MessageListenerAdapter(workQueueTask, "workQueueTask");
}
}
전에 했던 메시지 송수신 코드랑 달라진게 거의 없습니다.
이번에는 Queue를 만들때durable 을 true로 주어서 시스템이 중단되더라도 Queue에 메시지가 남아있도록 합니다.
그리고
container를 생성할때
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
이 부분이 추가됐습니다. 사실 위 코드는 작성하지 않더라도 기본값이 AUTO이기 때문에 쓰나 안쓰나 똑같지만 명시적으로 추가합니다.
2. Consumer
package com.example.HelloMessageQueue.step2;
import org.springframework.stereotype.Component;
@Component
public class WorkQueueConsumer {
public void workQueueTask(String message){
String[] messageParts = message.split("\\|");
String originMessage = messageParts[0];
int duration = Integer.parseInt(messageParts[1]);
System.out.println("# Received: " + originMessage + " (duration: " + duration + "ms)");
try{
int seconds = duration / 1000;
for (int i = 0; i < seconds; i++) {
Thread.sleep(1000);
System.out.print(".");
}
}catch (InterruptedException e){
Thread.currentThread().interrupt();
}
System.out.println("\n[X] Completed: " + originMessage);
}
}
| 를 기준으로 앞에있는건 메시지, 뒤에있는 숫자는 duration으로 설정해서 duration만큼 시간을 두고 메시지를 처리하도록 했습니다.
시간차를 줌으로써 메시지를 받는것을 확인할 수 있도록 했습니다.
3. Producer
package com.example.HelloMessageQueue.step2;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
@Component
public class WorkQueueProducer {
private final RabbitTemplate rabbitTemplate;
public WorkQueueProducer(RabbitTemplate rabbitTemplate){
this.rabbitTemplate = rabbitTemplate;
}
public void sendWorkQueue(String workQueueMessage, int duration){
String message = workQueueMessage + "| " + duration;
rabbitTemplate.convertAndSend(RabbitMQConfig.QUEUE_NAME, message);
System.out.println("Sent WorkQueue " + message);
}
}
message를 | 로 이어서 통으로 보내게 합니다.
4. Controller
package com.example.HelloMessageQueue.step2;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/api")
public class WorkQueueController {
private final WorkQueueProducer workQueueProducer;
public WorkQueueController(WorkQueueProducer workQueueProducer){
this.workQueueProducer = workQueueProducer;
}
@PostMapping("/workqueue")
public String workQueue(@RequestParam(name = "message") String message, @RequestParam(name = "duration") int duration){
workQueueProducer.sendWorkQueue(message, duration);
return "Work queue sent = " + message + ", (" + duration + ")";
}
}
이렇게 해주고 서버를 시작해줍니다.
그리고 나서
curl -X "POST" "http://localhost:8081/api/workqueue?message=Task1&duration=2000"
curl -X "POST" "http://localhost:8081/api/workqueue?message=Task2&duration=4000"
curl -X "POST" "http://localhost:8081/api/workqueue?message=Task3&duration=5000"
명령프롬프트에서 위 명령어를 이용해 메시지를 쏴주면?
에러가 납니다.
한 번 RabbitMQ Management에 들어가보죵
Ready상태에 2건이 있는걸 볼 수 있는데 이건 Queue에 들어가긴 했는데, Consumer가 어떤 이유때문에 처리를 못 한 경우입니다. 큐를 생성할때 durable을 true로 설정했기 때문에 다시 서버를 시작하더라도 Queue에는 처리되지 않은 메시지들이 남아있을거에요. 그래서 처리될 때까지 기다릴 겁니다.
그래서 에러가 나면 빠르게 프로그램을 수정해서 재처리가 되게 하거나, Purge를 통해 메시지를 날려버려야 합니다..
에러가 난 이유를 보면
NumberFormatException이 났습니다.
Producer가 메시지를 만들 때
public void sendWorkQueue(String workQueueMessage, int duration){
String message = workQueueMessage + "| " + duration;
rabbitTemplate.convertAndSend(RabbitMQConfig.QUEUE_NAME, message);
System.out.println("Sent WorkQueue " + message);
}
"| " 이런식으로 | 뒤에 공백을 하나 줬고,
Consmer가 처리할 때
int duration = Integer.parseInt(messageParts[1]);
냅다 int형으로 바꾸려고 했기 때문에 처리를 하지 못한 것입니다.
int duration = Integer.parseInt(messageParts[1].trim());
으로 바꿔주고 서버를 재시작하면 서버를 켜자마자 메시지를 소비할거에요.
위처럼 바꾸고 서버를 켜보면?
이렇게 서버를 켜자마자 쌓여있던 2개의 메시지를 처리하고
래빗엠큐 매니지 페이지를 가보면
이렇게 메시지가 처리되어 0개로 나타게 되쥬
Ready 상태가 많아지게 되면 Consumer 수를 늘리거나 Consumer의 메시지 처리 속도를 튜닝하거나 해야한다고 합니다.
unacked 상태가 많은 경우에는 Consumer가 메시지를 처리를 못하는 것이기 때문에 빠르게 디버깅을 해서 ack를 보내거나, consumer 연결상태 확인, restart, 재연결, 프로세스 재시작 이런것들을 해야한다고 합니다.
라운드 로빈으로 처리되는지 확인해보기
인텔리제이 터미널에서 아래 명령어를 통해 빌드를 해줍니다.
./gradlew clean build
빌드가 완료되면
저 경로에 jar파일이 생성될거에요.
그러고 나서 저는 windows 환경이니까
cmd 창을 지금부터 3개를 띄울겁니다.
1, 2번창에서 각각 포트번호를 다르게 하여 서버를 띄울 거고 3번 창에서는 요청을 날릴거에요.
우리가 라운드 로빈 형태로 동작하길 바라니까
메시지 A,B,C를 보낸다고 치면, 요청을 처리하는 형태가
첫 번째 서버가 A, C를 처리하고 두 번째 서버가 B를 처리하는 형태가 되어야 합니다.
이걸 눈으로 확인하기 위해 각각 서버를 실행해보겠습니다.
먼저 위 사진에서 jar파일이 있는 경로까지 들어가줍니다.
그러고 난 뒤에 아래 명령어를 통해 각각 다른 터미널에서 서버를 실행해줍니다.
java -jar HelloMessageQueue-0.0.1-SNAPSHOT.jar --server.port=8081
java -jar HelloMessageQueue-0.0.1-SNAPSHOT.jar --server.port=8082
저는 8080을 오라클이 쓰고있어서 8081이랑 8082로 했습니다.
그러면 각각 위 사진처럼 포트번호만 다른채로 실행이 될거에요
이제 또 다른 프롬프트(터미널)에서 curl 요청을 보내보겠습니다.
왼쪽 위(8081) 왼쪽아래(8082) 오른쪽(요청보내는곳)
상태에서
curl -X "POST" "http://localhost:8081/api/workqueue?message=Task1&duration=2000"
curl -X "POST" "http://localhost:8081/api/workqueue?message=Task2&duration=4000"
curl -X "POST" "http://localhost:8081/api/workqueue?message=Task3&duration=5000"
요청을 보내면??
위(8081) 에서 2번째 메시지를 처리하고, 아래(8082)에서 1, 3번쨰 요청을 가져가서 처리했네요
여기서 또 한번 날려보면??
이번에는 8081쪽에서 먼저 1, 3 번을 처리하고, 8082에서 2번을 처리했네요
이렇게 번갈아가면서 라운드로빈으로 처리하는걸 확인할 수 있었습니다.
관리페이지에서도 호출시점에 잘 처리된 걸 볼 수 있습니다.
이것으로 Work Queues 방식에 대해 알아보았습니다.
코드 출처 : https://github.com/villainscode/HelloMessageQueue/tree/tutorial-step2
내용 : 인프런 코드빌런 강사님의 "RabbitMQ를 이용한 비동기 아키텍처 한방에 해결하기" 강의 내용 정리
'개발지식 정리' 카테고리의 다른 글
페이징 쿼리 최적화: Covering Index 활용하기 (2) | 2025.01.30 |
---|---|
비동기 아키텍처 이해 2 - 메시지 전송 예제 (0) | 2025.01.24 |
비동기 아키텍처 이해 1 - RabbitMQ, AMQP (0) | 2025.01.23 |