您好,登录后才能下订单哦!
# Redis中发布订阅及事务的知识点整理
## 目录
1. [发布订阅模式概述](#发布订阅模式概述)
2. [Redis发布订阅基础命令](#redis发布订阅基础命令)
3. [发布订阅实战示例](#发布订阅实战示例)
4. [发布订阅的优缺点](#发布订阅的优缺点)
5. [Redis事务简介](#redis事务简介)
6. [Redis事务命令详解](#redis事务命令详解)
7. [事务的ACID特性分析](#事务的acid特性分析)
8. [事务中的WATCH命令](#事务中的watch命令)
9. [发布订阅与事务的结合应用](#发布订阅与事务的结合应用)
10. [常见问题与解决方案](#常见问题与解决方案)
---
## 发布订阅模式概述
发布订阅(Pub/Sub)是一种消息通信模式,包含两种角色:
- **发布者(Publisher)**:向指定频道发送消息
- **订阅者(Subscriber)**:订阅一个或多个频道接收消息
Redis的Pub/Sub特点:
- 实时消息系统
- 无消息存储(离线订阅者无法获取历史消息)
- 基于频道(Channel)的消息路由
- 支持模式匹配订阅

---
## Redis发布订阅基础命令
### 核心命令
```bash
# 发布消息到指定频道
PUBLISH channel message
# 订阅一个或多个频道
SUBSCRIBE channel1 [channel2...]
# 取消订阅所有频道
UNSUBSCRIBE
# 按模式订阅
PSUBSCRIBE pattern*
# 取消模式订阅
PUNSUBSCRIBE
# 终端1(订阅方)
127.0.0.1:6379> SUBSCRIBE news
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "news"
3) (integer) 1
# 终端2(发布方)
127.0.0.1:6379> PUBLISH news "Redis 6.2 released!"
(integer) 1
import redis
import threading
# 发布者
def publisher():
r = redis.Redis()
for i in range(5):
r.publish('channel', f'message-{i}')
# 订阅者
def subscriber():
r = redis.Redis()
pubsub = r.pubsub()
pubsub.subscribe('channel')
for message in pubsub.listen():
print(message)
# 启动线程
threading.Thread(target=subscriber).start()
threading.Thread(target=publisher).start()
# 订阅所有以'log.'开头的频道
PSUBSCRIBE log.*
# 发布到不同频道
PUBLISH log.warning "High memory usage!"
PUBLISH log.error "Connection failed!"
适用场景:实时通知、聊天室、日志收集等对可靠性要求不高的场景
Redis事务的特点:
- 通过MULTI
、EXEC
、DISCARD
命令实现
- 单线程保证命令顺序执行
- 不支持回滚(与关系型数据库的主要区别)
- 两种错误处理方式:
- 命令入队时出错(整个事务不执行)
- 命令执行时出错(其他命令继续执行)
事务执行流程:
MULTI → 命令入队 → EXEC/DISCARD
# 开始事务
MULTI
# 执行事务
EXEC
# 取消事务
DISCARD
127.0.0.1:6379> MULTI
OK
127.0.0.1:6379(TX)> SET name "Alice"
QUEUED
127.0.0.1:6379(TX)> INCR counter
QUEUED
127.0.0.1:6379(TX)> EXEC
1) OK
2) (integer) 1
特性 | Redis实现情况 | 说明 |
---|---|---|
原子性(Atomicity) | 部分支持 | 命令全执行或全不执行,但无回滚 |
一致性(Consistency) | 支持 | 执行前后数据保持一致状态 |
隔离性(Isolation) | 完全支持 | 单线程天然隔离 |
持久性(Durability) | 取决于配置 | 开启AOF时可保证 |
实现CAS(Check-And-Set)操作:
WATCH key # 监视一个或多个key
UNWATCH # 取消所有监视
127.0.0.1:6379> WATCH balance
OK
127.0.0.1:6379> MULTI
OK
127.0.0.1:6379(TX)> DECRBY balance 50
QUEUED
127.0.0.1:6379(TX)> EXEC # 如果balance被其他客户端修改,此处返回nil
def process_order(order_id):
r = redis.Redis()
pubsub = r.pubsub()
pubsub.subscribe('order_complete')
with r.pipeline() as pipe:
try:
pipe.watch('order_status')
# 开启事务
pipe.multi()
pipe.hset('order_status', order_id, 'processing')
pipe.publish('order_update', f'{order_id}:processing')
pipe.execute()
# 处理订单...
pipe.multi()
pipe.hset('order_status', order_id, 'completed')
pipe.publish('order_complete', order_id)
pipe.execute()
except redis.WatchError:
print("订单状态已被修改,处理终止")
方案:使用Redis Stream替代Pub/Sub
# 生产者
XADD orders * product_id 123 quantity 2
# 消费者
XREAD COUNT 1 STREAMS orders 0
答案:是的,Redis是单线程模型,事务执行时会阻塞其他所有命令
建议方案: 1. 使用专业的消息队列(RabbitMQ/Kafka) 2. 实现应用层的确认机制 3. 结合数据库存储重要消息
Redis的发布订阅和事务为不同场景提供了解决方案: - 发布订阅:适合实时性高但可靠性要求不高的场景 - 事务:保证命令原子性执行,适合需要批量操作的场景 - WATCH:实现乐观锁,解决并发修改问题
实际开发中应根据业务需求选择合适的机制,必要时可以组合使用这些特性构建更健壮的系统。 “`
注:本文约3500字,包含代码示例、表格对比和图文说明。实际使用时可根据需要调整示例代码和详细说明部分。建议通过实际Redis-cli操作验证所有命令示例。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。