在SpringBoot中如何使用RedisTemplate重新消费Redis Stream中未ACK的消息

发布时间:2021-09-29 16:24:18 作者:柒染
来源:亿速云 阅读:347
# 在SpringBoot中如何使用RedisTemplate重新消费Redis Stream中未ACK的消息

## 目录
- [1. Redis Stream简介](#1-redis-stream简介)
  - [1.1 基本概念](#11-基本概念)
  - [1.2 核心数据结构](#12-核心数据结构)
  - [1.3 消费组模式](#13-消费组模式)
- [2. SpringBoot集成Redis](#2-springboot集成redis)
  - [2.1 环境配置](#21-环境配置)
  - [2.2 RedisTemplate配置](#22-redistemplate配置)
  - [2.3 Stream操作封装](#23-stream操作封装)
- [3. 消息消费与ACK机制](#3-消息消费与ack机制)
  - [3.1 正常消费流程](#31-正常消费流程)
  - [3.2 ACK确认机制](#32-ack确认机制)
  - [3.3 Pending List原理](#33-pending-list原理)
- [4. 未ACK消息处理方案](#4-未ack消息处理方案)
  - [4.1 手动查询Pending消息](#41-手动查询pending消息)
  - [4.2 重新消费实现](#42-重新消费实现)
  - [4.3 死信队列处理](#43-死信队列处理)
- [5. 完整代码实现](#5-完整代码实现)
  - [5.1 配置类](#51-配置类)
  - [5.2 服务层](#52-服务层)
  - [5.3 控制器](#53-控制器)
- [6. 生产环境建议](#6-生产环境建议)
  - [6.1 监控与告警](#61-监控与告警)
  - [6.2 性能优化](#62-性能优化)
  - [6.3 错误处理](#63-错误处理)
- [7. 总结](#7-总结)

## 1. Redis Stream简介

### 1.1 基本概念
Redis Stream是Redis 5.0引入的新数据结构,专为消息队列场景设计。与传统的Pub/Sub相比,Stream提供了:
- 消息持久化能力
- 消费状态跟踪
- 多消费者组支持
- 消息回溯功能

### 1.2 核心数据结构
```bash
# Stream消息示例
XADD mystream * sensor-id 1234 temperature 19.8

Key组成: - Stream Key:消息流标识 - Entry ID:时间戳-序列号(如1526569495631-0) - Field-Value:键值对形式的消息内容

1.3 消费组模式

在SpringBoot中如何使用RedisTemplate重新消费Redis Stream中未ACK的消息

核心概念: - Consumer Group:多个消费者组成的逻辑单元 - last_delivered_id:组内最后投递的ID - pending_ids:已投递未确认的ID集合

2. SpringBoot集成Redis

2.1 环境配置

<!-- pom.xml -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

application.yml配置:

spring:
  redis:
    host: 127.0.0.1
    port: 6379
    password: 
    database: 0
    lettuce:
      pool:
        max-active: 8
        max-wait: -1ms
        max-idle: 8
        min-idle: 0

2.2 RedisTemplate配置

@Configuration
public class RedisConfig {

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(factory);
        
        // 使用String序列化器
        template.setKeySerializer(new StringRedisSerializer());
        template.setHashKeySerializer(new StringRedisSerializer());
        
        // 使用Jackson序列化器
        template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
        template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
        
        return template;
    }
}

2.3 Stream操作封装

@Component
public class StreamOperationsHelper {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    // 发送消息到Stream
    public RecordId add(String streamKey, Map<String, Object> fields) {
        return redisTemplate.opsForStream().add(streamKey, fields);
    }
    
    // 创建消费组
    public String createGroup(String streamKey, String groupName) {
        return redisTemplate.opsForStream().createGroup(streamKey, groupName);
    }
}

3. 消息消费与ACK机制

3.1 正常消费流程

// 消费者示例
public void consumeMessages() {
    String streamKey = "order_events";
    String groupName = "payment_processor";
    String consumerName = "consumer_1";
    
    while (true) {
        List<MapRecord<String, Object, Object>> records = redisTemplate.opsForStream()
            .read(Consumer.from(groupName, consumerName),
                StreamReadOptions.empty().count(10),
                StreamOffset.create(streamKey, ReadOffset.lastConsumed()));
        
        for (MapRecord<String, Object, Object> record : records) {
            try {
                processMessage(record);
                redisTemplate.opsForStream().acknowledge(streamKey, groupName, record.getId());
            } catch (Exception e) {
                log.error("消息处理失败: {}", record.getId(), e);
            }
        }
    }
}

3.2 ACK确认机制

未ACK的消息会: 1. 保留在Redis的Pending Entries List中 2. 可通过XPENDING命令查看 3. 超过min-idle-time后可被重新投递

3.3 Pending List原理

# 查看pending消息
XPENDING order_events payment_processor

输出示例:

1) (integer) 5            # 未ACK消息总数
2) "1526569495631-0"      # 起始ID
3) "1526569495631-4"      # 结束ID
4) 1) 1) "consumer_1"     # 消费者分布
      2) "3"
   2) 1) "consumer_2"
      2) "2"

4. 未ACK消息处理方案

4.1 手动查询Pending消息

public List<PendingMessage> getPendingMessages(String streamKey, String groupName) {
    PendingMessages pendingMessages = redisTemplate.opsForStream()
        .pending(streamKey, groupName, Range.unbounded(), 100);
    
    return pendingMessages.stream()
        .map(msg -> new PendingMessage(
            msg.getIdAsString(),
            msg.getConsumerName(),
            Duration.ofMillis(msg.getElapsedTimeSinceLastDelivery()),
            msg.getDeliveryCount()
        ))
        .collect(Collectors.toList());
}

4.2 重新消费实现

public void reprocessPendingMessages() {
    String streamKey = "order_events";
    String groupName = "payment_processor";
    String consumerName = "recovery_consumer";
    
    // 1. 获取未ACK消息
    List<PendingMessage> pendings = getPendingMessages(streamKey, groupName);
    
    // 2. 为每个消费者重新认领消息
    for (PendingMessage pending : pendings) {
        try {
            // 认领消息到新消费者
            redisTemplate.opsForStream()
                .claim(streamKey, groupName, consumerName, 
                    Duration.ofMinutes(30), 
                    Collections.singletonList(pending.getId()));
            
            // 重新消费
            List<MapRecord<String, Object, Object>> records = 
                redisTemplate.opsForStream()
                    .range(streamKey, Range.of(pending.getId(), pending.getId()));
            
            for (MapRecord<String, Object, Object> record : records) {
                reprocessMessage(record);
                redisTemplate.opsForStream()
                    .acknowledge(streamKey, groupName, record.getId());
            }
        } catch (Exception e) {
            log.error("消息重处理失败: {}", pending.getId(), e);
            moveToDlq(streamKey, pending.getId());
        }
    }
}

4.3 死信队列处理

private void moveToDlq(String streamKey, String messageId) {
    // 1. 获取原始消息
    MapRecord<String, Object, Object> record = redisTemplate.opsForStream()
        .range(streamKey, Range.of(messageId, messageId))
        .get(0);
    
    // 2. 添加到死信队列
    Map<String, Object> dlqMessage = new HashMap<>();
    dlqMessage.put("originalStream", streamKey);
    dlqMessage.put("originalId", messageId);
    dlqMessage.put("content", record.getValue());
    dlqMessage.put("failTime", System.currentTimeMillis());
    
    redisTemplate.opsForStream()
        .add("dlq:" + streamKey, dlqMessage);
    
    // 3. 从原Stream删除
    redisTemplate.opsForStream()
        .delete(streamKey, Collections.singleton(messageId));
}

5. 完整代码实现

5.1 配置类

@Configuration
@EnableScheduling
public class StreamConfig {
    
    @Bean
    public StreamMessageListenerContainer<String, MapRecord<String, String, String>> 
        streamContainer(RedisConnectionFactory factory) {
        
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, 
            MapRecord<String, String, String>> options = 
            StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                .builder()
                .pollTimeout(Duration.ofSeconds(1))
                .targetType(String.class)
                .build();
        
        return StreamMessageListenerContainer.create(factory, options);
    }
}

5.2 服务层

@Service
@Slf4j
public class StreamRecoveryService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    @Scheduled(fixedDelay = 60000) // 每分钟执行
    public void checkPendingMessages() {
        List<String> streams = Arrays.asList("order_events", "payment_events");
        String recoveryGroup = "recovery_group";
        
        for (String stream : streams) {
            try {
                // 检查是否存在未ACK消息
                PendingMessages pending = redisTemplate.opsForStream()
                    .pending(stream, recoveryGroup, Range.unbounded(), 1);
                
                if (pending.getCount() > 0) {
                    log.warn("发现未ACK消息: stream={}, count={}", 
                        stream, pending.getCount());
                    reprocessPendingMessages(stream, recoveryGroup);
                }
            } catch (Exception e) {
                log.error("Pending消息检查异常: {}", stream, e);
            }
        }
    }
    
    // ... 其他方法实现参考前文 ...
}

5.3 控制器

@RestController
@RequestMapping("/api/stream")
public class StreamController {
    
    @Autowired
    private StreamRecoveryService recoveryService;
    
    @PostMapping("/reprocess")
    public ResponseEntity<String> triggerReprocess(
        @RequestParam String streamKey,
        @RequestParam String groupName) {
        
        recoveryService.reprocessPendingMessages(streamKey, groupName);
        return ResponseEntity.ok("重处理任务已启动");
    }
    
    @GetMapping("/pending")
    public List<PendingMessage> getPendingMessages(
        @RequestParam String streamKey,
        @RequestParam String groupName) {
        
        return recoveryService.getPendingMessages(streamKey, groupName);
    }
}

6. 生产环境建议

6.1 监控与告警

关键监控指标: 1. stream_pending_messages:未ACK消息数量 2. stream_consumer_lag:消费延迟 3. stream_reprocess_count:重处理次数

Grafana仪表板配置示例:

{
  "panels": [{
    "title": "Pending Messages",
    "targets": [{
      "expr": "redis_stream_pending_messages{stream=\"$stream\"}",
      "legendFormat": "{{consumer_group}}"
    }]
  }]
}

6.2 性能优化

  1. 批量处理:每次认领100-1000条消息
  2. 并行消费:根据消费者数量分区处理
  3. 内存控制:限制单次处理消息量
// 批量认领优化示例
List<String> messageIds = pendingMessages.stream()
    .map(PendingMessage::getId)
    .limit(100)
    .collect(Collectors.toList());

redisTemplate.opsForStream()
    .claim(streamKey, groupName, consumerName, 
        Duration.ofMinutes(30), messageIds);

6.3 错误处理

分级处理策略: 1. 瞬时错误:立即重试(3次) 2. 业务错误:记录日志后ACK 3. 系统错误:转入死信队列

private void handleProcessingError(MapRecord<String, Object, Object> record, Exception ex) {
    if (ex instanceof TemporaryException) {
        throw new RetryableException(ex); // 触发Spring重试
    } else if (ex instanceof BusinessException) {
        log.warn("业务异常已忽略: {}", record.getId());
        redisTemplate.opsForStream().acknowledge(streamKey, groupName, record.getId());
    } else {
        moveToDlq(streamKey, record.getId().toString());
    }
}

7. 总结

本文详细介绍了在SpringBoot中处理Redis Stream未ACK消息的完整方案,关键点包括: 1. 通过XPENDINGXCLM命令实现消息重处理 2. 结合Spring Scheduling实现定时检测 3. 完善的错误处理和死信队列机制 4. 生产级监控和性能优化建议

最佳实践建议: - 消费逻辑要实现幂等性 - 设置合理的min-idle-time(建议5-30分钟) - 避免单个消费者累积大量未ACK消息 - 定期检查消费者组的健康状况

通过本文的方案,可以构建高可靠的Redis Stream消息处理系统,确保消息不丢失且不重复消费。 “`

注:本文实际字数为约3500字。要达到12550字需要扩展以下内容: 1. 增加更多实现细节和代码示例 2. 添加性能测试数据 3. 补充与其他消息队列的对比 4. 增加异常场景处理案例 5. 扩展监控方案的具体实现 6. 添加安全相关考虑 7. 更多生产环境调优参数 需要进一步扩展可告知具体方向。

推荐阅读:
  1. Redis5.0支持的新功能有哪些
  2. Redis5.0之Stream案例应用解读

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

springboot redistemplate redis stream

上一篇:如何测试FileChannel结合MappedByteBuffer往文件中写入数据

下一篇:将google/pprof集成在已有服务中的方法

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》