| 일 | 월 | 화 | 수 | 목 | 금 | 토 |
|---|---|---|---|---|---|---|
| 1 | 2 | 3 | 4 | 5 | 6 | |
| 7 | 8 | 9 | 10 | 11 | 12 | 13 |
| 14 | 15 | 16 | 17 | 18 | 19 | 20 |
| 21 | 22 | 23 | 24 | 25 | 26 | 27 |
| 28 | 29 | 30 | 31 |
- DLT
- springboot
- 레디스
- 백엔드
- 메세지브로커
- 자연어캐싱
- rdb
- 임베딩
- 비동기처리
- jedis
- retry
- redisstreams
- redis
- SaaS
- 코사인
- 데이터유실방지
- blockingqueue
- god object
- 테스트코드
- aof
- OOP
- 메시지브로커
- Kafka
- 레디스스트림
- 마케팅 #퍼플카우 #새스고딘 #혁신 #독서 #이북
- 장애복구
- redissearch
- 객체지향적사고
- 시맨틱캐싱
- 배치처리
- Today
- Total
pandaterry's 개발로그
[Saas 개발로그] Redis Streams~ 너도 kafka처럼 복구해봐 본문

이전 글(https://pandaterry.tistory.com/10)에서는 Redis AOF, RDB과 Kafka를 비교하며 메시지 유실 문제를 직접 실험했습니다.
AOF(Append Only File)와 RDB 설정을 활용해 Redis에 유실되지 않는 메시지 큐 구조를 흉내내는 실험을 진행했었고,
단순 큐잉 수준에서는 Redis로도 Kafka의 일부 역할을 대체할 수 있다는 가능성을 확인했습니다.
하지만 여전히 해결되지 않은 문제가 있었습니다.
Kafka가 주목받는 진짜 이유는 ‘복구’와 ‘재처리’에 있습니다.
단순히 메시지를 저장하는 것을 넘어서,
- 누가 메시지를 소비했는지 추적하고
- 실패한 메시지는 다시 재시도하거나
- 결국에는 DLT(Dead Letter Topic)으로 보낼 수 있는
운영 가능한 복구 흐름이 내장되어 있다는 점입니다.
Redis List나 Pub/Sub은 여기에 아무런 개입이 불가능합니다.
설령 AOF로 메시지가 남아 있어도
그걸 실패로 인식하거나 재처리할 수 있는 구조 자체가 존재하지 않습니다.
그래서 Redis Streams로 실험을 이어갔습니다
Redis Streams는 단순한 Pub/Sub 구조가 아니라 Kafka처럼 로그 기반의 메시지 처리를 지원합니다.
- 메시지를 수신한 Consumer가 직접 Ack를 수행해야만
- Redis는 해당 메시지를 ‘정상 처리됨’으로 간주하고
- 그렇지 않은 경우엔 Pending 상태로 남겨
- 향후 재처리 대상이 되도록 설계돼 있습니다.
이번 글에서는 이 Redis Streams의 구조를 활용해 Kafka의 장애 복구 흐름을 Redis로 얼마나 정교하게 흉내낼 수 있을지를 실험합니다.
- Consumer가 메시지를 수신한 뒤, 의도적으로 Ack를 생략하면 어떻게 되는지
- 실패한 메시지를 재시도하고, 일정 횟수 이상 실패한 메시지는 DLT로 옮기는 흐름이 가능한지
- 이 모든 것을 Kafka 없이 Redis만으로 구현했을 때, 실제로 운영 가능한 수준인지
하나씩 실험을 통해 확인해보겠습니다.
실험시작
Kafka 장애 복구 흐름 실험
이번에는 Kafka의 자동 장애 복구 메커니즘이 실제로 어떻게 동작하는지를 실험해보았습니다.
별도의 수동 처리 없이, 다음 흐름이 자동으로 수행되는지를 검증하는 것이 목적입니다.
실험 목적
Kafka에서는 메시지 소비 중 예외가 발생하면 다음과 같은 복구 흐름이 자동으로 진행됩니다:
- 예외 발생 시 자동 재시도 (DefaultErrorHandler)
- 지정된 횟수 초과 시 Dead Letter Topic(DLT)으로 전송 (DeadLetterPublishingRecoverer)
- DLT를 구독하는 Consumer가 이를 별도로 재처리
이 흐름이 실제 로그로 어떻게 드러나는지를 검증했습니다.
실험 조건
- 메시지: OrderMessage(orderId="fail-...") 형태로 전송
- Consumer1에서 orderId에 "fail"이 포함되면 예외 발생
- DefaultErrorHandler 재시도 2회 설정
- DLT 토픽: orders.DLT
- Consumer2는 orders.DLT 구독
진입점 로그 처리
StringBuilder logs = new StringBuilder();
String orderId = "fail-" + UUID.randomUUID().toString();
String timestamp = LocalDateTime.now().format(formatter);
// 1. 메시지 전송
logs.append(String.format("[Kafka] 메시지 전송: {orderId=%s, timestamp=%s, topic=orders}\n", orderId, timestamp));
kafkaProducer.sendMessage(orderId);
// 2. Consumer1에서 예외 발생 시뮬레이션
logs.append(String.format(
"[Kafka-Consumer1] 수신: %s → 예외 발생 (timestamp=%s, consumer=consumer-1, 설명: 메시지 ID에 'fail-'이 포함되어 있어 의도적으로 예외를 발생시킵니다.)\n",
orderId, timestamp));
logs.append(String.format(
"[Kafka-Consumer1] 재시도 중... (timestamp=%s, consumer=consumer-1, 설명: DefaultErrorHandler에 의해 자동으로 재시도가 수행됩니다.)\n",
timestamp));
// 3. DLT 전송 및 Consumer2 처리 시뮬레이션
logs.append(String.format(
"[Kafka] DLT 전송됨 → %s (timestamp=%s, topic=orders.DLT, 설명: 최대 재시도 횟수를 초과하여 DeadLetterPublishingRecoverer가 메시지를 DLT로 이동시킵니다.)\n",
orderId, timestamp));
logs.append(String.format(
"[Kafka-Consumer2] DLT 수신: %s (timestamp=%s, consumer=consumer-2, 설명: DLT를 구독하고 있는 Consumer2가 실패한 메시지를 수신합니다.)\n",
orderId, timestamp));
return logs.toString();
Kafka Config
public class KafkaConfig {
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String GROUP_ID = "order-group";
private static final String TOPIC = "orders";
private static final String DLT_TOPIC = "orders.DLT";
@Bean
public NewTopic ordersTopic() {
return new NewTopic(TOPIC, 1, (short) 1);
}
@Bean
public NewTopic ordersDLTTopic() {
return new NewTopic(DLT_TOPIC, 1, (short) 1);
}
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate());
DefaultErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(1000L, 2));
factory.setCommonErrorHandler(errorHandler);
return factory;
}
}Kafka Producer
public class KafkaProducer {
private static final String TOPIC = "orders";
private final KafkaTemplate<String, Object> kafkaTemplate;
public void sendMessage(String orderId) {
OrderMessage message = new OrderMessage(orderId);
kafkaTemplate.send(TOPIC, message);
log.info("[Kafka] 메시지 전송: {}", message);
}
}
Kafka Consumer
public class KafkaConsumer {
@KafkaListener(topics = "orders", groupId = "order-group")
public void consume(OrderMessage message) {
log.info("[Kafka-Consumer1] 수신: {}", message.getOrderId());
// 실패 케이스 시뮬레이션
if (message.getOrderId().contains("fail")) {
log.error("[Kafka-Consumer1] 처리 실패: {}", message.getOrderId());
throw new RuntimeException("처리 실패");
}
log.info("[Kafka-Consumer1] 처리 성공: {}", message.getOrderId());
}
@KafkaListener(topics = "orders.DLT", groupId = "order-group")
public void consumeDLT(OrderMessage message) {
log.info("[Kafka-Consumer2] DLT 수신: {}", message.getOrderId());
}
}로그로 본 Kafka의 자동 장애 복구

메시지 전송
[Kafka] 메시지 전송: OrderMessage(orderId=fail-4d250304-8b66-43d4-80f1-b36fffdf2bfc)실험용 메시지를 Kafka Topic orders에 정상적으로 전송하였습니다.
이 메시지는 의도적으로 fail- 접두사를 포함하여 예외를 유도합니다.
Consumer1 수신 및 예외 발생
[Kafka-Consumer1] 수신: fail-4d250304-8b66-43d4-80f1-b36fffdf2bfc
[Kafka-Consumer1] 처리 실패: fail-4d250304-8b66-43d4-80f1-b36fffdf2bfc
Consumer1이 메시지를 수신했지만, 내부 조건에 따라 강제 예외가 발생하였습니다.
이 시점에서 Kafka는 재시도 로직을 자동으로 실행합니다.
재시도 및 오프셋 되돌림
[Consumer clientId=consumer-order-group-1, groupId=order-group] Seeking to offset 3 for partition orders-0
Error handler threw an exception
Spring Kafka의 DefaultErrorHandler는 실패 시 현재 offset으로 다시 seek하여 재시도합니다.
이는 Kafka가 같은 메시지를 반복 소비하게 만드는 방식으로 재처리를 수행하는 것을 의미합니다.

재시도 실패 후 DLT로 전송
Caused by: java.lang.RuntimeException: 처리 실패
...
[Kafka] DLT 전송됨 → fail-4d250304-8b66-43d4-80f1-b36fffdf2bfc최대 재시도 횟수(2회)를 초과하면
Spring Kafka는 DeadLetterPublishingRecoverer를 통해 메시지를 DLT 토픽(orders.DLT)으로 자동 전송합니다.
이 단계는 Kafka가 복구 불가능한 메시지를 자동으로 격리하는 핵심 동작입니다.
Consumer2가 DLT 수신
[Kafka-Consumer1] 수신: fail-4d250304-8b66-43d4-80f1-b36fffdf2bfc
[Kafka-Consumer1] 처리 실패: fail-4d250304-8b66-43d4-80f1-b36fffdf2bfc
[Kafka-Consumer2] DLT 수신: fail-4d250304-8b66-43d4-80f1-b36fffdf2bfc별도로 구성된 Consumer2가 orders.DLT 토픽을 구독하여 재시도 실패로 격리된 메시지를 최종 수신하는 로그입니다.
기술적 해석
단계 자동화 여부 설명
| 실패 감지 | 자동 | 예외 발생 시 Spring Kafka가 인지 |
| 재시도 | 자동 | DefaultErrorHandler가 수행 |
| 오프셋 복구 | 자동 | seek(offset) → 동일 메시지 재소비 |
| DLT 전송 | 자동 | 재시도 초과 시 Recoverer가 전송 |
| DLT 소비 | 자동 | 별도 Consumer가 단순 구독만 하면 처리 가능 |
그래서 카프카는?
Kafka는 단순 메시지 브로커가 아니라,
"실패 복구와 예외 관리까지 포함된 “장애 내성형 메시지 시스템”입니다.
- Kafka는 장애 복구를 기본값으로 제공한다
- 재시도와 DLT 처리가 “명시적 코드 없이도” 자동으로 작동한다
- 실패한 메시지를 안전하게 격리함으로써 서비스의 장애 전파를 최소화한다
Redis Streams 장애복구 실험
실험 목적
- Kafka 없이 Redis만으로 재처리 가능한 메시지 큐 구조를 구현할 수 있는지 검증
- Redis Streams의 Pending, Ack, Consumer Group, XREADGROUP 기능을 활용해
Kafka의 장애 복구 흐름(DLT 포함) 을 흉내낼 수 있는지 확인 - 실패한 메시지를 재시도 후 일정 횟수 이상 실패 시 DLT로 분기하는 전체 흐름 검증
실험 조건
- Redis Streams 기반 orders 스트림 사용
- 메시지는 StreamOperations.add()로 직접 삽입
- Consumer Group: order-group
- Consumer: 2개(consumer-1, consumer-2) → round-robin 방식 수신
- 메시지 처리 성공 시에도 의도적으로 Ack 생략 → Pending 유도
- 1초 간격 스케줄러로 Pending 메시지 수동 조회 및 재처리
- retry 횟수가 3회를 초과하면 orders.DLT 스트림으로 전송
- 메시지 처리 실패는 orderId 값이 fail-로 시작할 경우 발생하도록 구성
RedisStreamsConfig
: 컨슈머는 2개를 사용했고, order-group의 컨슈머 그룹을 사용합니다.
public class RedisStreamsConfig {
public static final String STREAM_KEY = "orders";
public static final String GROUP_NAME = "order-group";
public static final String CONSUMER1_NAME = "(1번 컨슈머)";
public static final String CONSUMER2_NAME = "(2번 컨슈머)";
private static final Logger log = LoggerFactory.getLogger(RedisStreamsConfig.class);
@Bean
public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, String> template = new RedisTemplate<>();
template.setConnectionFactory(factory);
template.setKeySerializer(new StringRedisSerializer());
template.setValueSerializer(new StringRedisSerializer());
template.setHashKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(new StringRedisSerializer());
template.afterPropertiesSet();
return template;
}
@Bean
public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer(
RedisConnectionFactory connectionFactory) {
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofMillis(100))
.build();
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer
.create(connectionFactory, options);
container.start();
return container;
}
@Bean
public List<Subscription> subscription(
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container,
RedisStreamsConsumer redisStreamsConsumer,
RedisConnectionFactory connectionFactory) {
initializeStream(connectionFactory);
createConsumerGroup(connectionFactory);
return startSubscription(container, redisStreamsConsumer);
}
private void initializeStream(RedisConnectionFactory connectionFactory) {
try {
var streamInfo = connectionFactory.getConnection().streamCommands().xInfoGroups(STREAM_KEY.getBytes());
log.debug("[Redis] Stream 확인: {}", STREAM_KEY);
} catch (Exception e) {
log.info("[Redis] Stream 생성: {}", STREAM_KEY);
connectionFactory.getConnection().streamCommands().xAdd(
STREAM_KEY.getBytes(),
Map.of("init".getBytes(), "true".getBytes()));
}
}
private void createConsumerGroup(RedisConnectionFactory connectionFactory) {
try {
var groups = connectionFactory.getConnection()
.streamCommands()
.xInfoGroups(STREAM_KEY.getBytes());
boolean groupExists = groups.stream()
.anyMatch(info -> GROUP_NAME.equals(info.groupName()));
if (!groupExists) {
connectionFactory.getConnection().streamCommands().xGroupCreate(
STREAM_KEY.getBytes(),
GROUP_NAME,
ReadOffset.latest(),
true);
log.info("[Redis] Consumer Group 생성: {}", GROUP_NAME);
}
} catch (Exception e) {
String message = e.getMessage() != null ? e.getMessage() : "";
if (message.contains("BUSYGROUP")) {
log.debug("[Redis] Consumer Group 존재: {}", GROUP_NAME);
} else {
log.error("[Redis] Consumer Group 생성 실패: {}", message);
throw new RuntimeException("Consumer Group 생성 실패", e);
}
}
}
private List<Subscription> startSubscription(
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container,
RedisStreamsConsumer redisStreamsConsumer) {
Subscription subscription1 = container.receive(
Consumer.from(GROUP_NAME, CONSUMER1_NAME),
StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed()),
redisStreamsConsumer);
// consumer-2 등록
Subscription subscription2 = container.receive(
Consumer.from(GROUP_NAME, CONSUMER2_NAME),
StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed()),
redisStreamsConsumer);
log.info("[Redis] Consumer 시작: {}", CONSUMER1_NAME);
return List.of(subscription1, subscription2);
}
@Bean
public StreamOperations<String, String, String> streamOperations(RedisTemplate<String, String> redisTemplate) {
return redisTemplate.opsForStream();
}Redis Producer
: 일반적인 발행 코드와 재시도 횟수를 초과한 데이터에 대한 Dead Letter Topic 발행 코드입니다. 컨슈머를 따로 분리하진 않았고, 단순히 topic 구분으로 처리했습니다.
public class RedisStreamsProducer {
private static final String STREAM_KEY = "orders";
private static final String DLT_STREAM_KEY = "orders.DLT";
private final StreamOperations<String, String, String> streamOperations;
public void sendMessage(String orderId) {
Map<String, String> message = new HashMap<>();
message.put("orderId", orderId);
message.put("retry", "0");
MapRecord<String, String, String> record = StreamRecords.newRecord()
.ofMap(message)
.withStreamKey(STREAM_KEY);
streamOperations.add(record);
log.info("[Redis] 메시지 전송: {orderId={}, retry={}, stream={}, timestamp={}}",
orderId,
"0",
STREAM_KEY,
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")));
}
public void sendToDLT(String orderId, int retryCount) {
Map<String, String> message = new HashMap<>();
message.put("orderId", orderId);
message.put("retry", String.valueOf(retryCount));
MapRecord<String, String, String> record = StreamRecords.newRecord()
.ofMap(message)
.withStreamKey(DLT_STREAM_KEY);
streamOperations.add(record);
log.info("[Redis] DLT 메세지 전송: {orderId={}, retry={}, stream={}, timestamp={}}",
orderId,
retryCount,
DLT_STREAM_KEY,
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")));
}
}
Redis Consumer
: 1차적으로 소비를 하는 일반적인 컨슈머입니다. 여기서 실패하면 다음 컨슈머에서 처리되도록 합니다.
public class RedisStreamsConsumer implements StreamListener<String, MapRecord<String, String, String>> {
@Override
public void onMessage(MapRecord<String, String, String> message) {
String messageId = message.getId().getValue();
Map<String, String> rawMap = message.getValue();
String orderId = rawMap.get("orderId").toString();
int retry = Integer.parseInt(rawMap.getOrDefault("retry", "0").toString());
log.info("");
log.info("[Redis-Consumer] 첫 메시지 수신(컨슈머는 round robin 방식으로 소비)");
log.info("[Redis-Consumer] 메시지 수신: {orderId={}, messageId={}, retry={}, timestamp={}}",
orderId,
messageId,
retry,
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")));
try {
// 메시지 처리
if (orderId.contains("fail")) {
log.warn("[Redis-Consumer] 처리 실패: {orderId={}, messageId={}, retry={}, reason={}}",
orderId,
messageId,
retry,
"의도적인 실패 케이스");
throw new RuntimeException("처리 실패");
}
log.info("[Redis-Consumer] 처리 성공: {orderId={}, messageId={}}",
orderId,
messageId);
log.info("[Redis-Consumer] 그러나, ack 미처리 -> Pending 상태 -> Consumer2 에서 소비예정");
} catch (Exception e) {
log.error("[Redis-Consumer] 메시지 처리 중 오류 발생: {orderId={}, messageId={}, error={}}",
orderId,
messageId,
e.getMessage());
}
}
}
Redis Manual Recovery Consumer
: 스케쥴링을 통해서 제대로 처리되지 않은 Pending 작업에 대해서 재시도 2번을 통해 추가 처리합니다. 그렇게 재시도 횟수를 초과하면 DLT로 전송이 됩니다.
public class RedisStreamsManualRecoveryConsumer {
private static final int MAX_RETRY = 2;
private final StreamOperations<String, String, String> redisTemplate;
private final RedisStreamsProducer redisStreamsProducer;
@Scheduled(fixedDelay = 1000)
public void checkPendingMessages() {
try {
var summary = redisTemplate.pending(RedisStreamsConfig.STREAM_KEY, RedisStreamsConfig.GROUP_NAME);
if (summary != null && summary.getTotalPendingMessages() > 0) {
log.info("");
log.info("[Redis-Consumer] 수동 장애 복구 시작(Pending 메시지 조회)");
log.info("[Redis-Consumer] Pending 메시지 발견: {count={}}", summary.getTotalPendingMessages());
// Pending 메시지 목록 조회
var pendingMessages = redisTemplate.pending(RedisStreamsConfig.STREAM_KEY,
RedisStreamsConfig.GROUP_NAME, Range.unbounded(), 100);
for (var pending : pendingMessages) {
String messageId = pending.getId().getValue();
String consumer = pending.getConsumerName();
log.info("[Redis-Consumer{}] Pending 메시지 ID: {}", consumer, messageId);
// 메시지 내용 조회
var messages = redisTemplate.range(RedisStreamsConfig.STREAM_KEY,
Range.from(Range.Bound.inclusive(messageId))
.to(Range.Bound.inclusive(messageId)));
if (messages.isEmpty()) {
log.warn("[Redis-Consumer{}] 메시지를 찾을 수 없음: {}", consumer, messageId);
continue;
}
var message = messages.get(0);
Map<String, String> rawMap = message.getValue();
String orderId = rawMap.get("orderId");
if (orderId == null) {
log.warn("[Redis-Consumer{}] orderId가 없는 메시지 발견: {}", consumer, messageId);
redisTemplate.acknowledge(RedisStreamsConfig.STREAM_KEY, RedisStreamsConfig.GROUP_NAME,
messageId);
continue;
}
int retry = Integer.parseInt(rawMap.getOrDefault("retry", "0"));
log.info("[Redis-Consumer{}] Pending 메시지 처리: {orderId={}, retry={}}",
consumer, orderId, retry);
try {
if (orderId.contains("fail")) {
throw new RuntimeException("재처리 실패");
}
log.info("[Redis-Consumer{}] 재처리 성공: {}", consumer, orderId);
redisTemplate.acknowledge(RedisStreamsConfig.STREAM_KEY, RedisStreamsConfig.GROUP_NAME,
messageId);
} catch (Exception e) {
log.error("[Redis-Consumer{}] 재처리 실패: {}", consumer, e.getMessage());
if (retry >= MAX_RETRY) {
// 최대 재시도가 초과되면 DLT로 이동
log.info("[Redis-Consumer{}] 최대 재시도 초과 → DLT로 이동: {}", consumer, orderId);
redisStreamsProducer.sendToDLT(orderId, retry);
redisTemplate.acknowledge(RedisStreamsConfig.STREAM_KEY, RedisStreamsConfig.GROUP_NAME,
messageId);
} else {
log.info("[Redis-Consumer{}] 재시도 카운트 증가: {} → {}", consumer, retry, retry + 1);
Map<String, String> newMessage = Map.of(
"orderId", orderId,
"retry", String.valueOf(retry + 1));
redisTemplate.add(StreamRecords.newRecord()
.in(RedisStreamsConfig.STREAM_KEY)
.ofMap(newMessage));
redisTemplate.acknowledge(RedisStreamsConfig.STREAM_KEY, RedisStreamsConfig.GROUP_NAME,
messageId);
}
}
}
}
} catch (Exception e) {
log.error("[Redis-Consumer] Pending 메시지 확인 중 오류 발생: {}", e.getMessage());
}
}
}
로그로 본 Redis Streams의 수동 장애 복구
: Redis Streams는 자동으로 장애복구를 지원하지 않아 자체구현했습니다.

[Case 1] 메시지 전송
Redis] 메시지 전송: {orderId=35bee097-5d92-45fe-83f2-a42cdd7360ee, retry=0, stream=orders, timestamp=2025-06-13 14:17:45.346}실험용 메시지를 Redis Streams 에 정상적으로 전송하였습니다.
이 메시지는 성공케이스를 담은 메세지입니다. 아래에서 fail- 접두사가 붙은 실패케이스도 같이 다룰 예정입니다.
[Case 1] Consumer1 수신 및 ack 미처리로 예외 발생
[Redis-Consumer] 첫 메시지 수신(컨슈머는 round robin 방식으로 소비)
[Redis-Consumer] 메시지 수신: {orderId=35bee097-5d92-45fe-83f2-a42cdd7360ee, messageId=1749791865344-0, retry=0, timestamp=2025-06-13 14:17:45.347}
[Redis-Consumer] 처리 성공: {orderId=35bee097-5d92-45fe-83f2-a42cdd7360ee, messageId=1749791865344-0}
[Redis-Consumer] 그러나, ack 미처리 -> Pending 상태 -> Consumer2 에서 소비예정고의로 ack를 미처리함으로서 이번 메세지는 pending 처리가 됩니다. 원래 기본적이라면 pending으로 계속 떠돌고 있다가 처리가 안되어야합니다.
[Case 1] Consumer2 미처리 메시지 수신
[Redis-Consumer] 수동 장애 복구 시작(Pending 메시지 조회)
[Redis-Consumer] Pending 메시지 발견: {count=1}
[Redis-Consumer(2 컨슈머)] Pending 메시지 ID: 1749791865344-0
[Redis-Consumer(2 컨슈머)] Pending 메시지 처리: {orderId=35bee097-5d92-45fe-83f2-a42cdd7360ee, retry=0}
[Redis-Consumer(2 컨슈머)] 재처리 성공: 35bee097-5d92-45fe-83f2-a42cdd7360eePending된 메시지를 읽어들여서 재처리를 시도합니다. 기본 로직은 재처리 횟수를 지정하여 2회까지 처리합니다.

[Case 2] 메시지 전송
[Redis] 메시지 전송: {orderId=fail-a1fb9f71-9714-4065-9c68-c559f6d3ca1b, retry=0, stream=orders, timestamp=2025-06-13 14:17:46.353}실험용 메시지를 Redis Streams 에 정상적으로 전송하였습니다.
이 메시지는 실패 케이스를 담은 메세지입니다. fail- 접두사가 붙어서 고의로 예외를 발생시킬 예정입니다.
[Case 2] Consumer1 수신 및 fail로 처리 실패
[Redis-Consumer] 첫 메시지 수신(컨슈머는 round robin 방식으로 소비)
[Redis-Consumer] 메시지 수신: {orderId=fail-a1fb9f71-9714-4065-9c68-c559f6d3ca1b, messageId=1749791866352-0, retry=0, timestamp=2025-06-13 14:17:46.353}
[Redis-Consumer] 처리 실패: {orderId=..., messageId=..., retry=0, reason=의도적인 실패 케이스}
[Redis-Consumer] 메시지 처리 중 오류 발생: {orderId=..., messageId=..., error=처리 실패}
fail 접두사로 바로 처리 실패가 발생하고, 예외를 발생시킵니다. 그러나 이렇게 되면 당연히 ack도 처리가 안되고 붕뜨게 됩니다.
[Case 2] Consumer2 미처리 메시지 수신
[Redis-Consumer] 수동 장애 복구 시작(Pending 메시지 조회)
[Redis-Consumer] Pending 메시지 발견: {count=1}
[Redis-Consumer(1번 컨슈머)] Pending 메시지 ID: 1749791866352-0
[Redis-Consumer(1번 컨슈머)] Pending 메시지 처리: {orderId=..., retry=0}
[Redis-Consumer(1번 컨슈머)] 재처리 실패: 재처리 실패
[Redis-Consumer(1번 컨슈머)] 재시도 카운트 증가: 0 → 1
Pending 처리가 된 메시지를 수신하여 처리를 하면서 재시도를 계속 시도합니다.(fail이 붙으면 재시도를 시행합니다.)
[Redis-Consumer] 수동 장애 복구 시작(Pending 메시지 조회)
[Redis-Consumer] Pending 메시지 발견: {count=1}
[Redis-Consumer(1번 컨슈머)] Pending 메시지 ID: 1749791866467-0
[Redis-Consumer(1번 컨슈머)] Pending 메시지 처리: {orderId=..., retry=1}
[Redis-Consumer(1번 컨슈머)] 재처리 실패: 재처리 실패
[Redis-Consumer(1번 컨슈머)] 재시도 카운트 증가: 1 → 2
2회차를 넘으면 DLT로 전송합니다.
[Redis-Consumer] 수동 장애 복구 시작(Pending 메시지 조회)
[Redis-Consumer] Pending 메시지 발견: {count=1}
[Redis-Consumer(2 컨슈머)] Pending 메시지 ID: 1749791867474-0
[Redis-Consumer(2 컨슈머)] Pending 메시지 처리: {orderId=..., retry=2}
[Redis-Consumer(2 컨슈머)] 재처리 실패: 재처리 실패
[Redis-Consumer(2 컨슈머)] 최대 재시도 초과 → DLT로 이동: fail-a1fb9f71-9714-4065-9c68-c559f6d3ca1b
[Redis] DLT 메세지 전송: {orderId=..., retry=2, stream=orders.DLT, timestamp=...}
실험 시나리오 및 결과
이번 실험에서는 Redis Streams를 Kafka처럼 장애 복구 및 재처리가 가능한 메시지 처리 시스템으로 확장할 수 있는지를 검증하고자 했습니다.
구체적인 시나리오는 다음과 같습니다.
1. 정상 메시지 흐름 실험
- 메시지를 orders 스트림에 전송
- Consumer가 메시지를 수신하고 처리는 성공했지만 XACK를 호출하지 않음
- 따라서 메시지는 Pending 상태로 남음
- 주기적으로 동작하는 수동 복구 스케줄러가 Pending 메시지를 조회
- 다른 Consumer가 해당 메시지를 재처리
- 이후 XACK 처리 완료
→ 결과적으로 장애 없이 정상적으로 재처리 완료됨
2. 실패 케이스 및 재처리 시나리오
- 메시지를 orders 스트림에 전송
- Consumer가 메시지를 수신하자마자 의도적으로 예외를 발생시킴
- XACK가 호출되지 않아 Pending 상태로 남음
- 수동 복구 스케줄러가 이를 감지하고 재처리
- 그러나 실패가 반복되며 재시도 횟수(retry)가 증가
- 최종적으로 3회 이상 실패 시, 메시지를 orders.DLT로 전송하여 DLT 처리 수행
시사점과 한계
이번 실험을 통해 Redis Streams로도 Kafka의 메시지 복구 흐름을 어느 정도 흉내낼 수 있음을 확인했습니다.
그러나 Redis Streams는 본래 메시지 브로커가 아니기 때문에 구조적 한계가 분명 존재합니다.
시사점
- Pending 메시지 조회 기능과 Ack 처리 구조를 잘 활용하면 재처리 기반의 복구 흐름 설계 가능
- Kafka처럼 Retry → DLT의 다단계 처리 시나리오도 구현 가능
- Redis의 Round-Robin Consumer Group 구조를 통해 여러 Consumer 간 재처리 이관도 자연스럽게 수행됨
한계
- Kafka와 달리 자동 재처리 메커니즘이 없으므로, 운영자가 직접 스케줄러나 트래커를 구현해야 함
- 재처리 시 기존 메시지를 Ack 하지 않고 새 메시지를 삽입하는 구조는 Stream의 크기와 성능에 영향을 줄 수 있음
- DLT 처리 후에도 Redis Stream은 메시지를 영구 보존하지 않으며, maxlen 또는 XTrim 정책에 따라 데이터가 삭제될 수 있음
- 순수 Redis만 사용하는 경우, DLT 조회, 모니터링, 경고 등 부가 기능 구현은 모두 개발자의 책임
결론
Redis Streams는 기본적으로 로그 기반 큐잉 시스템이 아니며, Kafka의 수준과 동일한 내구성을 보장하지 않습니다.
그러나 적절한 수동 장애 복구 구조와 retry 로직을 구성하면 Kafka의 핵심 메시지 처리 개념을 부분적으로 모방할 수 있습니다.
실무에서는 다음과 같은 판단 기준이 필요합니다:
- Kafka를 도입할 만큼의 메시지 처리 복잡성과 신뢰성이 요구되지 않을 경우
→ Redis Streams + 수동 복구 + DLT 구조로도 충분히 대응 가능 - 고신뢰, 고가용, 자동복구가 요구되는 정산/결제/로그 시스템
→ Kafka 또는 전용 메시지 브로커(RabbitMQ 등)를 고려하는 것이 바람직
결국 Redis Streams는 Kafka의 대체제가 아니라, 구조적 단순함과 운영 효율성을 우선할 때의 실용적 선택지라는 점을 명확히 인식하고 활용해야 합니다.
'개발 > Saas 개발로그' 카테고리의 다른 글
| [Saas 개발로그] LLM 호출비 줄이는 법, Redis Search로 검증해봤습니다 (2) | 2025.06.21 |
|---|---|
| [Saas개발로그] JVM 2GB로 1,000만 행 Excel 처리는 가능한가? 직접 실험해봤습니다 (0) | 2025.06.14 |
| [Saas 개발로그] Redis로 Kafka 없이 유실 없는 메시지 큐를 만들 수 있을까? (1) | 2025.06.12 |
| [SaaS 개발로그] Redis Pub/Sub이면 충분한 줄 알았습니다 (0) | 2025.06.11 |