spring boot整合rocketmq

Updated on with 0 views and 0 comments

1、简介

RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个Consumer 实例构成。

更多信息参考官网:https://github.com/apache/rocketmq/tree/master/docs/cn

2、spring boot集成RocketMQ

2.1、引入依赖

spring boot依赖

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.5</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

属性

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <rocketmq-spring-boot-starter-version>2.2.1</rocketmq-spring-boot-starter-version>
    </properties>

rocketmq依赖

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>${rocketmq-spring-boot-starter-version}</version>
        </dependency>

2.2、属性配置

rocketmq.name-server=10.116.8.186:9876
rocketmq.producer.group=my-group1
rocketmq.producer.sendMessageTimeout=300000

3、常用操作

rocketmq发送消息有异步,同步,单向发送三种模式。

同步发送成功会有一个返回值,发送异步消息需要配置回调函数,单向发送则只管发送没有返回值,不关心发送结果。

异步发送只需要同步发送的函数(asyncSend)换成异步的函数(syncSend)发送即可,单向发送则是调用sendOneWay方法

3.1、发送消费示例

3.1.1、发送消息

/**
 * @author Mr.Wen
 * @version 1.0
 * @date 2021-10-09 13:28
 */
@Component
public class CustomMessageService {

    @Resource
    private RocketMQTemplate rocketMQTemplate;

    public void sendMessage() throws Exception {
        sendAsyncMessage();
    }

    private void sendAsyncMessage() throws Exception{
        rocketMQTemplate.asyncSend("topic-a", "测试同步消息", new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("发送同步消息成功--"+sendResult.toString()+"\n");
            }

            @Override
            public void onException(Throwable throwable) {
                System.out.println("发送同步消息失败");
                System.out.println(throwable.getMessage());
            }
        });
    }
}

上述代码中只是做了一个发送的操作,第一个参数是主题,第二个参数是数据,第三个参数是回调。

asyncSend有多个重载函数,在这些重载函数中,还可以指定超时时间或延迟消息级别image.png

3.1.2、消费消息

/**
 * @author Mr.Wen
 * @version 1.0
 * @date 2021-10-09 14:55
 */
@Service
@RocketMQMessageListener(topic = "topic-a",
        consumerGroup = "string_consumer")
public class SyncMessageConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        System.out.println("收到--"+message);
    }
}

消费数据需要实现RocketMQListener接口,这个接口的泛型是接受的数据类型,同时通过RocketMQMessageListener配置要消费的数据。

3.2、顺序消息示例

顺序消息分为全局顺序消息与分区顺序消息,全局顺序是指某个Topic下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可。

  • 全局顺序 对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。 适用场景:性能要求不高,所有的消息严格按照 FIFO 原则进行消息发布和消费的场景
  • 分区顺序 对于指定的一个 Topic,所有消息根据 sharding key 进行区块分区。 同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。 Sharding key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。 适用场景:性能要求高,以 sharding key 作为分区字段,在同一个区块中严格的按照 FIFO 原则进行消息发布和消费的场景。

上面是官网的描述,简单来说,调用顺序发送消息的方法会要求要设置一个hash key作为分区判断依据,相同的hash key的消息会按照顺序消费

3.2.1、发送消息

    private void sendAsyncMessageOrder() {
        for (int q = 0; q < 4; q++) {
            // send to 4 queues
            List<Message> msgs = new ArrayList<>();
            for (int i = 0; i < 10; i++) {
                int msgIndex = q * 10 + i;
                String msg = String.format("推送消息#%d 到队列: %d", msgIndex, q);
                msgs.add(MessageBuilder.withPayload(msg).
                        setHeader(RocketMQHeaders.KEYS, "KEY_" + msgIndex).build());
            }
            SendResult sr = rocketMQTemplate.syncSendOrderly("topic-orderly", msgs, q + "", 60000);
            System.out.println("--- 成功批量推送消息到队列" + sr.getMessageQueue().getQueueId() + " 返回结果:" + sr);
        }
    }

这里调用了syncSendOrderly方法来发送数据,第一个参数是topic,第二个参数是消息列表,第三个参数是hash值,第四个参数是发送超时时间设置,相同hash值得消息会按照先进先出的顺序消费

3.2.2、消费消息

/**
 * @author Mr.Wen
 * @version 1.0
 * @date 2021-10-09 16:59
 */
@Service
@RocketMQMessageListener(topic = "topic-orderly",
        consumerGroup = "string_consumer1")
public class SyncOrderlyMessageConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        System.out.println("收到--"+message);
    }
}

顺序消息的消费和普通消费一样,直接消费即可,也是实现RocketMQListener接口,然后使用RocketMQMessageListener注解配置消费信息即可

3.3、延时消息

3.3.1、发送消息

rocketmq暂时不支持自定义延迟时间,官方给出了18个延迟级别,分别对应的时间为

“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”

    private void sendDelayMessage() throws Exception {
        Message<String> delayMessage = MessageBuilder.withPayload("延时消息").build();
        SendResult sendResult = rocketMQTemplate.syncSend("topic-delay", delayMessage, 3000, 3);
    }

调用异步发送的方法,共有四个参数,第一个是topic,第二个是消息,第三个是发送超时时间,第四个就是延时发送级别

3.3.2、消费消息

消费消息的方法和之前的是一致的,切换到对应的topic即可

/**
 * @author Mr.Wen
 * @version 1.0
 * @date 2021-10-11 10:43
 */
@Service
@RocketMQMessageListener(topic = "topic-delay",
        consumerGroup = "string_consumer3")
public class SyncDelayMessageConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("收到--"+message);
    }
}

3.4、批量发送

3.4.1、发送消息

批量发送消息的方法和顺序消息类似,只是批量发送消息的时候,不需要设置hash值

    public void sendMessage() throws Exception {
        TimerTask timerTask = new TimerTask() {
            @SneakyThrows
            @Override
            public void run() {
                sendBatchMessage();
            }
        };

        Timer timer = new Timer();
        timer.schedule(timerTask,1000,5000);
    }
    private void sendBatchMessage() throws Exception {
        List<Message> messageList = new ArrayList<>();
        for(int i=0;i<10;i++){
            Message msg = MessageBuilder.withPayload("第"+i+"消息").build();
            messageList.add(msg);
        }
        SendResult sendResult = rocketMQTemplate.syncSend("topic-batch", messageList);
        System.out.println("发送结果"+sendResult.toString());
    }

3.4.2、消费消息

@Service
@RocketMQMessageListener(topic = "topic-batch",
        consumerGroup = "string_consumer4")
public class SyncBatchMessageConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("收到--"+message);
    }
}

3.5、过滤消息

rocketmq共有两种过滤方式,分别是TAG和和SQL92,通过消费端配置来实现消息过滤

3.5.1、tag过滤

一个TOPIC可以有多个TAG,需要注意,同一个consumerGroup如果有不同的TAG会出现消费混乱的情况,所以,需要单独定义consumerGroup

3.5.1.1、发送消息

在Topic后面加:tag标签

    private void sendTagMessage() throws Exception{
        Message msgA = MessageBuilder.withPayload("tagA消息消息").build();
        Message msgB = MessageBuilder.withPayload("tagB消息消息").build();
        rocketMQTemplate.syncSend("topic-tag:tagA",msgA);
        rocketMQTemplate.syncSend("topic-tag:tagB",msgB);
    }
3.5.1.2、消费消息

定义不同的消费者,注意不要放在同一个consumerGroup中

@Service
@RocketMQMessageListener(topic = "topic-tag",
        consumerGroup = "string_consumer5",
        selectorType = SelectorType.TAG,selectorExpression = "tagA")
public class SyncTagAMessageConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("tagA消费者收到--"+message);
    }
}
@Service
@RocketMQMessageListener(topic = "topic-tag",
        consumerGroup = "string_consumer6",
        selectorType = SelectorType.TAG,selectorExpression = "tagB")
public class SyncTagBMessageConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("tagB消费者收到--"+message);
    }
}

3.5.2、SQL92过滤

SQL92是在消息的生产端设置header属性,在consumer端过滤属性来实现消息过滤,broker默认是不支持的,如果要支持SQL92过滤方式,需要在配置文件broker.conf中修改配置项enablePropertyFilter=true

官方定义的比较类型类型如下

  • 数值比较,比如:>,>=,<,<=,BETWEEN,=
  • 字符比较,比如:=,<>,IN;
  • IS NULL 或者IS NOT NULL;
  • 逻辑符号AND,OR,NOT;

官方定义的常量类型为

  • 数值,比如:123,3.1415;
  • 字符,比如:'abc',必须用单引号包裹起来;
  • NULL ,特殊的常量
  • 布尔值,TRUE 或FALSE
3.5.2.1、发送消息
    private void sendSQL92Message() throws Exception{
        Message<String> msg1 = MessageBuilder.withPayload("消息1").setHeader("a",3).build();
        rocketMQTemplate.syncSend("topic-sql",msg1);
        Message<String> msg2 = MessageBuilder.withPayload("消息2").setHeader("a",4).build();
        rocketMQTemplate.syncSend("topic-sql",msg2);
    }
3.5.2.2、消费消息

selectType设置为SQL92,设置表达式,消费a>0的消息

@Service
@RocketMQMessageListener(topic = "topic-sql",
        consumerGroup = "string_consumer7",
        selectorType = SelectorType.SQL92,selectorExpression = "a > 0")
public class SyncSQLMessageConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
            System.out.println("消费者收到--"+message);
    }
}

3.6、事务消息

rocketMQ的事务消息采用2PC的思想用于处理分布式事务问题,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息,只保证事务的最终一致性!他保证了事务发起端的事务的原子性,但是消费端可能存在事务处理失败的情况,消费端失败后会一致重试直到消费成功,若最后都不能消费成功,则需要人工处理。

官网的原理定义如下:

上图说明了事务消息的大致方案,其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。

1.事务消息发送及提交:

(1) 发送消息(half消息)。

(2) 服务端响应消息写入结果。

(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。

(4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)

2.补偿流程:

(1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”

(2) Producer收到回查消息,检查回查消息对应的本地事务的状态

(3) 根据本地事务状态,重新Commit或者Rollback

其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。

3.6.1、发送消息

事务消息需要关联一个事务监听器,内部定义了本地事务执行和回查,本地事务执行状态为UNKNOW时,会调用回查方法

private void sendTransactionalMessage() throws Exception{
        TransactionMQProducer producer  = new TransactionMQProducer();
        producer.setNamesrvAddr("10.116.8.149:9876");
        producer.setProducerGroup("my-group1");
        producer.setTransactionListener(new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(org.apache.rocketmq.common.message.Message msg, Object arg) {
                // 在这里执行本地事务
                Integer args = (Integer) arg;
                System.out.println("开始执行本地事务");
                if(args == 0){
                    System.out.println("准备提交本地事务");
                    return LocalTransactionState.COMMIT_MESSAGE;
                }else if(args == 1){
                    System.out.println("准备回滚本地事务");
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                }else{
                    System.out.println("事务异常,准备执行回查");
                    return LocalTransactionState.UNKNOW;
                }
            }

            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                System.out.println("消息回查接口 --" + new String(msg.getBody()));
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });
        producer.start();
        org.apache.rocketmq.common.message.Message msg = new org.apache.rocketmq.common.message.Message();
        msg.setTopic("topic-tx");
        msg.setBody("发送状态0消息".getBytes());
        org.apache.rocketmq.common.message.Message msg1 = new org.apache.rocketmq.common.message.Message();
        msg1.setTopic("topic-tx");
        msg1.setBody("发送状态1消息".getBytes());
        org.apache.rocketmq.common.message.Message msg2 = new org.apache.rocketmq.common.message.Message();
        msg2.setTopic("topic-tx");
        msg2.setBody("发送状态2消息".getBytes());
        // 发送事务消息
        TransactionSendResult transactionSendResult = producer.sendMessageInTransaction(msg, 0);
        TransactionSendResult transactionSendResult1 = producer.sendMessageInTransaction(msg1, 1);
        TransactionSendResult transactionSendResult2 = producer.sendMessageInTransaction(msg2, 2);
        System.out.println("事务0发送结果"+transactionSendResult.getSendStatus());
        System.out.println("事务1发送结果"+transactionSendResult1.getSendStatus());
        System.out.println("事务2发送结果"+transactionSendResult2.getSendStatus());
    }

3.6.2、消费消息

@Service
@RocketMQMessageListener(topic = "topic-tx", consumerGroup = "string_trans_consumer")
public class SyncTransactionMessageConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
         int a = 1/0;
    }
}

这里消费的时候,如果发生异常会重新消费消息,正常消费会自己返回CONSUME_SUCCESS,发生异常则会返回RECONSUME_LATER然后重新消费该信息,超过默认的重复消费次数后,消息会被放入死信队列

源码中消费消息的代码位于org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer

代码中可以很清楚的看到消费失败会返回RECONSUME_LATER

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt messageExt : msgs) {
                log.debug("received msg: {}", messageExt);
                try {
                    long now = System.currentTimeMillis();
                    handleMessage(messageExt);
                    long costTime = System.currentTimeMillis() - now;
                    log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
                } catch (Exception e) {
                    log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e);
                    context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }

这是消费的返回值

public enum ConsumeConcurrentlyStatus {
    /**
     * Success consumption
     */
    CONSUME_SUCCESS,
    /**
     * Failure consumption,later try to consume
     */
    RECONSUME_LATER;
}

代码中我们模拟了1/0的异常,重复消费后再管理界面的死信队列里可以看到

4、数据重复消费

在consumer端判断和消息是否消费过,将已经消费的消息存储到redis或者数据库中, 消费之前做查询判断是否消费过即可


标题:spring boot整合rocketmq
作者:wenyl
地址:http://www.wenyoulong.com/articles/2021/10/09/1633770440361.html