Apache Pulsar是如何保证消息不丢不重

发布时间:2021-12-22 14:50:29 作者:柒染
来源:亿速云 阅读:327
# Apache Pulsar是如何保证消息不丢不重

## 引言

在分布式消息系统中,"消息不丢失"和"消息不重复"(Exactly-Once语义)是两大核心挑战。Apache Pulsar作为新一代云原生消息流平台,通过多层设计实现了高可靠的消息传递。本文将深入剖析Pulsar如何从存储机制、确认机制、副本策略等方面保障消息投递的可靠性。

---

## 一、架构基础:分层存储与多副本

### 1.1 BookKeeper的持久化保障
Pulsar采用BookKeeper作为持久化存储引擎,其核心特性包括:
- **预写日志(WAL)**:所有消息先写入不可变的日志文件
- **多副本同步写入**:每条消息需被`ack-quorum`个副本持久化后才返回成功
```java
// 伪代码:BookKeeper写入流程
Entry entry = new Entry(message);
ledger.addEntry(entry).thenRun(() -> {
    // 至少写入2个节点(默认quorum=2)
    if(confirmedReplicas >= ackQuorum) {
        sendAckToProducer();
    }
});

1.2 分片(Fragment)机制

Apache Pulsar是如何保证消息不丢不重


二、消息不丢失的保障机制

2.1 生产者端保证

配置参数 作用 推荐值
sendTimeout 发送超时时间 30s
blockIfQueueFull 内存队列满时阻塞而非丢弃 true
maxPendingMessages 最大待确认消息数 1000
# Python生产者示例
producer = client.create_producer(
    topic='persistent://tenant/ns/topic',
    send_timeout_millis=30000,
    block_if_queue_full=True
)

2.2 Broker持久化策略

  1. 同步刷盘:通过ensembleSize=3, writeQuorum=2, ackQuorum=2实现
    • 消息必须写入2/3个节点才算成功
  2. 延迟确认:仅当消息被持久化后才向生产者返回ACK
  3. 自动故障转移:节点故障时自动切换写入到健康副本

2.3 消费者端确认


三、消息不重复的精确一次投递

3.1 生产者幂等性

# 启用生产者幂等
enableIdempotence=true
producerName=order-producer-1
sequenceId=142857

3.2 事务消息(跨分区原子性)

// 事务使用示例
Transaction txn = pulsarClient.newTransaction()
    .withTransactionTimeout(1, TimeUnit.MINUTES)
    .build();

producer.newMessage(txn).value("订单创建".getBytes()).send();
consumer.acknowledgeAsync(msg.getMessageId(), txn);

txn.commit().exceptionally(ex -> {
    log.error("事务失败", ex);
    return null;
});

3.3 去重表(Deduplication)

  1. Broker维护最近消息的sequenceId缓存
  2. 窗口期默认为5分钟,可配置:
brokerDeduplicationEnabled: true
brokerDeduplicationMaxNumberOfProducers: 10000
brokerDeduplicationEntriesInterval: 1000

四、故障场景应对策略

4.1 网络分区处理

4.2 数据恢复流程

  1. 检测到副本缺失时触发自动恢复
  2. 从健康副本复制数据
  3. 使用CRC32校验数据完整性

4.3 客户端重试策略

// Go客户端指数退避示例
retryPolicy := pulsar.NewRetryPolicy(
    pulsar.MaxReconnectToBroker(5),
    pulsar.Backoff(100*time.Millisecond, 5*time.Second),
)
client, _ := pulsar.NewClient(pulsar.ClientOptions{
    URL:        "pulsar://localhost:6650",
    RetryPolicy: retryPolicy,
})

五、性能与可靠性的平衡

5.1 写入性能优化

配置项 性能影响 可靠性影响
ackQuorum=1 ↑↑↑ ↓↓↓
immediateFlush=false ↑↑
compactionThreshold ↑↑

5.2 监控指标

关键Prometheus指标: - pulsar_storage_write_latency_le_200:写入延迟 - pulsar_consumer_msg_ack_rate:ACK速率 - bookkeeper_ledger_count:分片数量


六、最佳实践建议

  1. 生产环境配置示例
# broker.conf
managedLedgerDefaultEnsembleSize=3
managedLedgerDefaultWriteQuorum=2
managedLedgerDefaultAckQuorum=2
acknowledgmentAtBatchIndexLevelEnabled=true
  1. 客户端配置黄金法则
  1. 灾难恢复方案

结语

Apache Pulsar通过BookKeeper的可靠存储、多级确认机制、幂等设计和事务支持,构建了完整的消息可靠性保障体系。在实际应用中,需要根据业务场景在可靠性和性能之间找到平衡点。随着2.10版本引入的改进的持久性策略,Pulsar在消息可靠性方面继续领跑分布式消息中间件领域。 “`

(注:实际字数为1580字,可根据需要扩展具体章节细节。文中技术参数基于Pulsar 2.10版本,实际使用时请参考对应版本文档。)

推荐阅读:
  1. 如何进行下一代分布式消息队列Apache Pulsar的分析
  2. 如何进行Apache Pulsar 与 Apache Kafka 在金融场景下的性能对比分析

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

apache pulsar

上一篇:如何用Shader得到物体的世界坐标

下一篇:mysql中出现1053错误怎么办

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》