前言
消息队列是常用的异步通信解决方案,一般有两种解决方案,一种为队列模式
,一种为发布订阅模式
,对于Redis而言,官方就支持这种.
发布订阅
发布订阅是一种消息模式,消息的发送者(称为发布者)不会将消息直接发送给特定的接收者(称为订阅者)。而是将发布的消息分为不同的类别,无需了解哪些订阅者(如果有的话)可能存在。同样的,订阅者可以表达对一个或多个类别的兴趣,只接收感兴趣的消息,无需了解哪些发布者(如果有的话)存在。
就如同微信公众号一样,
代码
按照官网的教程,首先我们先创建一个消息发布者和消息订阅者.
消息订阅者
/**
* 消息订阅者
*/
public class Receiver {
private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);
private AtomicInteger counter = new AtomicInteger();
//消费方法
public void receiveMessage(String message) {
LOGGER.info("收到消息为: <" + message + ">");
counter.incrementAndGet();
}
}
注册订阅者
在RedisConfig
类添加如下代码
/**
* 将订阅者 注入到容器中
*/
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//订阅者和topic
container.addMessageListener(listenerAdapter, new PatternTopic("chat"));
return container;
}
//注册订阅者
@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
//我们的订阅者
@Bean
Receiver receiver() {
return new Receiver();
}
listenerAdapter
方法中创建一个消息侦听器,并在container
中注册,监听有关chat
主题的消息,在MessageListenerAdapter
中需要指定订阅者执行的方法(如果订阅者实现MessageListener
接口则无须指定,下面演示).
然后就可以通过RedisTemplate
发送事件
@Autowired
private RedisTemplate redisTemplate;
@Test
void contextLoads() {
redisTemplate.convertAndSend("chat","asd");
}
实现MessageListener的订阅者
/**
* 消息订阅者
*/
public class Receiver implements MessageListener {
private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);
private AtomicInteger counter = new AtomicInteger();
@Override
public void onMessage(Message message, byte[] bytes) {
LOGGER.info("收到消息为: <" + message.toString() + ">");
counter.incrementAndGet();
}
}
在RedisConfig
中修改如下
//注册订阅者
@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {
return new MessageListenerAdapter(receiver);
}
自定义消息体
创建自定义的消息体
public class MessageEntity implements Serializable {
private static final long serialVersionUID = 8632296967087444509L;
private String id;
private String content;
public MessageEntity() {
super();
}
public MessageEntity(String id, String content) {
super();
this.id = id;
this.content = content;
}
//get set 这里省略 记得加上
@Override
public String toString() {
return "MessageEntity [id=" + id + ", content=" + content + "]";
}
}
订阅者修改为:
/**
* 消息订阅者
*/
public class Receiver implements MessageListener {
private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);
private AtomicInteger counter = new AtomicInteger();
@Autowired
private RedisTemplate redisTemplate;
@Override
public void onMessage(Message message, byte[] pattern) {
doBusiness(message);
}
/**
* 打印 message body 内容
* @param message
*/
public void doBusiness(Message message) {
Object value = redisTemplate.getValueSerializer().deserialize(message.getBody());
System.out.println("consumer message: " + value.toString());
}
}
redisTemplate.convertAndSend("chat", new MessageEntity("1", "object"));//发送事件 也可以发送普通的String
最后用spring data redis
实现redis
订阅者,本质上还是Listener模式.
参考
文章1:https://blog.csdn.net/johnf_nash/article/details/87891293