redis订阅功能可以用于消息的传输,是一种消息通信模式,Redis客户端可以订阅任意数量的频道,具体实现代码:
package com.hcmony.sword.redis;import org.apache.commons.lang3.StringUtils;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPubSub;
/**
* <h3>Shenjue.java基本描述</h3>
*/
public class RedisMQ {
private static final String TOPIC="TOPIC";
private final JedisPool jedisPool;
public RedisMQ(JedisPool jedisPool) {
this.jedisPool = jedisPool;
}
/**
* 发布消息
* @param topic
* @param messge
*/
public void publish(String topic ,String messge){
Jedis jedis = null;
if (StringUtils.isBlank(topic)){
topic=TOPIC;
}
try {
jedis = jedisPool.getResource();
jedis.publish(topic,messge);
}
finally {
if (null != jedis) {
jedis.close();
}
}
}
/**
* 订阅消息
* @param topic
* @param jedisPubSub
*/
public void subscribe(String topic,JedisPubSub jedisPubSub){
Jedis jedis = null;
if (StringUtils.isBlank(topic)){
topic=TOPIC;
}
try {
jedis = jedisPool.getResource();
jedis.subscribe(jedisPubSub,topic);
}
finally {
if (null != jedis) {
jedis.close();
}
}
}
public static void main(String[] args) {
//默认连接本地redis,
// loclhost:6379
JedisPool jedisPool = new JedisPool();
RedisMQ publish = new RedisMQ(jedisPool);
new Thread(new Runnable() {
@Override
public void run() {
publish.subscribe("PID",new MyjedisPubSub());
}
}).start();
for (int i=0;i<100;i++){
publish.publish("PID","messge"+i);
}
}
public static class MyjedisPubSub extends JedisPubSub {
@Override
public void onMessage(String channel, String message) {
System.out.println("-------channel is "+channel+" message is "+message);
}
}
}