您好,登录后才能下订单哦!
密码登录
登录注册
点击 登录注册 即表示同意《亿速云用户服务条款》
# Apache Pulsar是如何保证消息不丢不重
## 引言
在分布式消息系统中,"消息不丢失"和"消息不重复"(Exactly-Once语义)是两大核心挑战。Apache Pulsar作为新一代云原生消息流平台,通过多层设计实现了高可靠的消息传递。本文将深入剖析Pulsar如何从存储机制、确认机制、副本策略等方面保障消息投递的可靠性。
---
## 一、架构基础:分层存储与多副本
### 1.1 BookKeeper的持久化保障
Pulsar采用BookKeeper作为持久化存储引擎,其核心特性包括:
- **预写日志(WAL)**:所有消息先写入不可变的日志文件
- **多副本同步写入**:每条消息需被`ack-quorum`个副本持久化后才返回成功
```java
// 伪代码:BookKeeper写入流程
Entry entry = new Entry(message);
ledger.addEntry(entry).thenRun(() -> {
// 至少写入2个节点(默认quorum=2)
if(confirmedReplicas >= ackQuorum) {
sendAckToProducer();
}
});
配置参数 | 作用 | 推荐值 |
---|---|---|
sendTimeout |
发送超时时间 | 30s |
blockIfQueueFull |
内存队列满时阻塞而非丢弃 | true |
maxPendingMessages |
最大待确认消息数 | 1000 |
# Python生产者示例
producer = client.create_producer(
topic='persistent://tenant/ns/topic',
send_timeout_millis=30000,
block_if_queue_full=True
)
ensembleSize=3, writeQuorum=2, ackQuorum=2
实现
ackTimeout
(默认30s)后重投maxRedeliverCount
的消息转入DLQ# 启用生产者幂等
enableIdempotence=true
producerName=order-producer-1
sequenceId=142857
sequenceId
// 事务使用示例
Transaction txn = pulsarClient.newTransaction()
.withTransactionTimeout(1, TimeUnit.MINUTES)
.build();
producer.newMessage(txn).value("订单创建".getBytes()).send();
consumer.acknowledgeAsync(msg.getMessageId(), txn);
txn.commit().exceptionally(ex -> {
log.error("事务失败", ex);
return null;
});
sequenceId
缓存brokerDeduplicationEnabled: true
brokerDeduplicationMaxNumberOfProducers: 10000
brokerDeduplicationEntriesInterval: 1000
// Go客户端指数退避示例
retryPolicy := pulsar.NewRetryPolicy(
pulsar.MaxReconnectToBroker(5),
pulsar.Backoff(100*time.Millisecond, 5*time.Second),
)
client, _ := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
RetryPolicy: retryPolicy,
})
配置项 | 性能影响 | 可靠性影响 |
---|---|---|
ackQuorum=1 |
↑↑↑ | ↓↓↓ |
immediateFlush=false |
↑↑ | ↓ |
compactionThreshold |
↓ | ↑↑ |
关键Prometheus指标:
- pulsar_storage_write_latency_le_200
:写入延迟
- pulsar_consumer_msg_ack_rate
:ACK速率
- bookkeeper_ledger_count
:分片数量
# broker.conf
managedLedgerDefaultEnsembleSize=3
managedLedgerDefaultWriteQuorum=2
managedLedgerDefaultAckQuorum=2
acknowledgmentAtBatchIndexLevelEnabled=true
Apache Pulsar通过BookKeeper的可靠存储、多级确认机制、幂等设计和事务支持,构建了完整的消息可靠性保障体系。在实际应用中,需要根据业务场景在可靠性和性能之间找到平衡点。随着2.10版本引入的改进的持久性策略,Pulsar在消息可靠性方面继续领跑分布式消息中间件领域。 “`
(注:实际字数为1580字,可根据需要扩展具体章节细节。文中技术参数基于Pulsar 2.10版本,实际使用时请参考对应版本文档。)
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。