您好,登录后才能下订单哦!
密码登录
登录注册
点击 登录注册 即表示同意《亿速云用户服务条款》
# Pulsar该如何使用
## 目录
1. [什么是Pulsar](#什么是pulsar)
2. [Pulsar的核心概念](#pulsar的核心概念)
3. [安装与部署](#安装与部署)
4. [基础使用指南](#基础使用指南)
5. [高级功能](#高级功能)
6. [最佳实践](#最佳实践)
7. [常见问题解答](#常见问题解答)
---
## 什么是Pulsar
Apache Pulsar 是一个开源的分布式发布-订阅消息系统,由雅虎开发并捐赠给Apache基金会。它结合了高性能消息队列、流处理和轻量级函数计算的能力,具有以下特点:
- **多租户架构**:支持隔离的命名空间和细粒度权限控制
- **低延迟高吞吐**:单集群可支持百万级TPS
- **持久化存储**:通过分层存储(Tiered Storage)实现无限回溯
- **地理复制**:原生支持跨地域数据同步
典型应用场景包括:
- 金融交易实时处理
- IoT设备数据采集
- 微服务间异步通信
- 流式数据分析管道
---
## Pulsar的核心概念
### 1. 基础组件
| 组件 | 说明 |
|---------------|----------------------------------------------------------------------|
| **Broker** | 无状态服务层,处理消息路由和协议转换 |
| **BookKeeper**| 持久化存储层,提供高可靠的日志存储 |
| **ZooKeeper** | 元数据存储和集群协调服务 |
### 2. 逻辑概念
- **Tenant**:多租户隔离单元
- **Namespace**:租户内的逻辑分区
- **Topic**:消息的发布/订阅通道(支持持久化/非持久化)
- **Subscription**:消费组的订阅关系(包含Exclusive/Failover/Shared/Key_Shared四种模式)
### 3. 消息模型
```python
# 消息结构示例
{
"data": "二进制消息体",
"properties": {"k1":"v1"}, # 用户自定义属性
"sequence_id": 12345, # 序列号
"publish_time": 1630000000 # 发布时间戳
}
# 使用官方Docker镜像
docker run -it -p 6650:6650 -p 8080:8080 \
apachepulsar/pulsar:latest \
bin/pulsar standalone
bin/pulsar-admin clusters list
# 创建Topic
bin/pulsar-admin topics create persistent://tenant/ns/topic1
# 查看Topic列表
bin/pulsar-admin topics list tenant/ns
import pulsar
client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer('my-topic')
for i in range(10):
producer.send(f'Message-{i}'.encode('utf-8'))
client.close()
consumer = client.subscribe('my-topic', 'my-sub')
while True:
msg = consumer.receive()
print(f"Received: {msg.data().decode()}")
consumer.acknowledge(msg)
// Java自定义路由示例
MessageRouter customRouter = (msg, metadata) -> {
String key = msg.getKey();
return key.hashCode() % metadata.numPartitions();
};
# 开启事务
txn = client.new_transaction()
.with_txn_timeout(5, TimeUnit.MINUTES)
.build()
# 事务内操作
producer.send(msg, txn=txn)
consumer.acknowledge(msg, txn=txn)
# 提交/回滚
txn.commit()
# txn.abort()
# function-config.yaml
name: word-counter
inputs: ["persistent://public/default/input"]
output: "persistent://public/default/output"
runtime: python
producer.batch_enable = True
managedLedgerCacheSizeMB
关键监控项: - 生产者/消费者延迟 - 积压消息数(backlog) - 存储写入延迟
# 获取统计信息
bin/pulsar-admin topics stats-internal persistent://tenant/ns/topic1
# broker.conf
authenticationEnabled=true
authorizationEnabled=true
superUserRoles=admin
receiverQueueSize
)维度 | Pulsar | Kafka |
---|---|---|
扩展性 | 计算存储分离,弹性扩展 | 需要整体扩容 |
延迟 | 毫秒级 | 通常更高 |
功能特性 | 内置多协议/函数计算 | 依赖外部生态(如KSQL) |
提示:本文基于Pulsar 2.10版本编写,具体实现可能随版本变化。建议参考官方文档获取最新信息。 “`
这篇文章包含了约1600字,采用Markdown格式编写,覆盖了Pulsar的基础使用到高级特性。如需扩展特定部分或添加更多示例代码,可以进一步补充细节内容。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。