前言

消息队列是常用的异步通信解决方案,一般有两种解决方案,一种为队列模式,一种为发布订阅模式,对于Redis而言,官方就支持这种.

发布订阅

发布订阅是一种消息模式,消息的发送者(称为发布者)不会将消息直接发送给特定的接收者(称为订阅者)。而是将发布的消息分为不同的类别,无需了解哪些订阅者(如果有的话)可能存在。同样的,订阅者可以表达对一个或多个类别的兴趣,只接收感兴趣的消息,无需了解哪些发布者(如果有的话)存在。

就如同微信公众号一样,

image-20210806144326149

代码

按照官网的教程,首先我们先创建一个消息发布者和消息订阅者.

消息订阅者

/**
 * 消息订阅者
 */
public class Receiver {
    private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);

    private AtomicInteger counter = new AtomicInteger();

    //消费方法
    public void receiveMessage(String message) {
        LOGGER.info("收到消息为: <" + message + ">");
        counter.incrementAndGet();
    }
}

注册订阅者

RedisConfig类添加如下代码

/**
 * 将订阅者 注入到容器中
 */
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                        MessageListenerAdapter listenerAdapter) {
    RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    //订阅者和topic
    container.addMessageListener(listenerAdapter, new PatternTopic("chat"));

    return container;
}

//注册订阅者
@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {
    return new MessageListenerAdapter(receiver, "receiveMessage");
}

//我们的订阅者
@Bean
Receiver receiver() {
    return new Receiver();
}

listenerAdapter方法中创建一个消息侦听器,并在container中注册,监听有关chat主题的消息,在MessageListenerAdapter中需要指定订阅者执行的方法(如果订阅者实现MessageListener接口则无须指定,下面演示).

然后就可以通过RedisTemplate发送事件

@Autowired
private RedisTemplate redisTemplate;

@Test
void contextLoads() {
    redisTemplate.convertAndSend("chat","asd");
}

实现MessageListener的订阅者

/**
 * 消息订阅者
 */
public class Receiver implements MessageListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);

    private AtomicInteger counter = new AtomicInteger();

    @Override
    public void onMessage(Message message, byte[] bytes) {
        LOGGER.info("收到消息为: <" + message.toString() + ">");
        counter.incrementAndGet();
    }
}

RedisConfig中修改如下

//注册订阅者
@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {
    return new MessageListenerAdapter(receiver);
}

自定义消息体

创建自定义的消息体

public class MessageEntity implements Serializable {

    private static final long serialVersionUID = 8632296967087444509L;

    private String id;
    
    private String content;

    public MessageEntity() {
        super();
    }

    public MessageEntity(String id, String content) {
        super();
        this.id = id;
        this.content = content;
    }
    
    //get set 这里省略 记得加上

    @Override
    public String toString() {
        return "MessageEntity [id=" + id + ", content=" + content + "]";
    }
}

订阅者修改为:

/**
 * 消息订阅者
 */
public class Receiver implements MessageListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);

    private AtomicInteger counter = new AtomicInteger();

    @Autowired
    private RedisTemplate redisTemplate;

    @Override
    public void onMessage(Message message, byte[] pattern) {
        doBusiness(message);
    }

    /**
     * 打印 message body 内容
     * @param message
     */
    public void doBusiness(Message message) {
        Object value = redisTemplate.getValueSerializer().deserialize(message.getBody());
        System.out.println("consumer message: " + value.toString());
    }
}
redisTemplate.convertAndSend("chat", new MessageEntity("1", "object"));//发送事件 也可以发送普通的String

最后用spring data redis实现redis订阅者,本质上还是Listener模式.

参考

文章1:https://blog.csdn.net/johnf_nash/article/details/87891293

官网:https://spring.io/guides/gs/messaging-redis/