RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个Consumer 实例构成。
更多信息参考官网:https://github.com/apache/rocketmq/tree/master/docs/cn
spring boot依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
属性
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<rocketmq-spring-boot-starter-version>2.2.1</rocketmq-spring-boot-starter-version>
</properties>
rocketmq依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq-spring-boot-starter-version}</version>
</dependency>
rocketmq.name-server=10.116.8.186:9876
rocketmq.producer.group=my-group1
rocketmq.producer.sendMessageTimeout=300000
rocketmq发送消息有异步,同步,单向发送三种模式。
同步发送成功会有一个返回值,发送异步消息需要配置回调函数,单向发送则只管发送没有返回值,不关心发送结果。
异步发送只需要同步发送的函数(asyncSend)换成异步的函数(syncSend)发送即可,单向发送则是调用sendOneWay方法
/**
* @author Mr.Wen
* @version 1.0
* @date 2021-10-09 13:28
*/
@Component
public class CustomMessageService {
@Resource
private RocketMQTemplate rocketMQTemplate;
public void sendMessage() throws Exception {
sendAsyncMessage();
}
private void sendAsyncMessage() throws Exception{
rocketMQTemplate.asyncSend("topic-a", "测试同步消息", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送同步消息成功--"+sendResult.toString()+"\n");
}
@Override
public void onException(Throwable throwable) {
System.out.println("发送同步消息失败");
System.out.println(throwable.getMessage());
}
});
}
}
上述代码中只是做了一个发送的操作,第一个参数是主题,第二个参数是数据,第三个参数是回调。
asyncSend有多个重载函数,在这些重载函数中,还可以指定超时时间或延迟消息级别
/**
* @author Mr.Wen
* @version 1.0
* @date 2021-10-09 14:55
*/
@Service
@RocketMQMessageListener(topic = "topic-a",
consumerGroup = "string_consumer")
public class SyncMessageConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("收到--"+message);
}
}
消费数据需要实现RocketMQListener接口,这个接口的泛型是接受的数据类型,同时通过RocketMQMessageListener配置要消费的数据。
顺序消息分为全局顺序消息与分区顺序消息,全局顺序是指某个Topic下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可。
上面是官网的描述,简单来说,调用顺序发送消息的方法会要求要设置一个hash key作为分区判断依据,相同的hash key的消息会按照顺序消费
private void sendAsyncMessageOrder() {
for (int q = 0; q < 4; q++) {
// send to 4 queues
List<Message> msgs = new ArrayList<>();
for (int i = 0; i < 10; i++) {
int msgIndex = q * 10 + i;
String msg = String.format("推送消息#%d 到队列: %d", msgIndex, q);
msgs.add(MessageBuilder.withPayload(msg).
setHeader(RocketMQHeaders.KEYS, "KEY_" + msgIndex).build());
}
SendResult sr = rocketMQTemplate.syncSendOrderly("topic-orderly", msgs, q + "", 60000);
System.out.println("--- 成功批量推送消息到队列" + sr.getMessageQueue().getQueueId() + " 返回结果:" + sr);
}
}
这里调用了syncSendOrderly方法来发送数据,第一个参数是topic,第二个参数是消息列表,第三个参数是hash值,第四个参数是发送超时时间设置,相同hash值得消息会按照先进先出的顺序消费
/**
* @author Mr.Wen
* @version 1.0
* @date 2021-10-09 16:59
*/
@Service
@RocketMQMessageListener(topic = "topic-orderly",
consumerGroup = "string_consumer1")
public class SyncOrderlyMessageConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("收到--"+message);
}
}
顺序消息的消费和普通消费一样,直接消费即可,也是实现RocketMQListener接口,然后使用RocketMQMessageListener注解配置消费信息即可
rocketmq暂时不支持自定义延迟时间,官方给出了18个延迟级别,分别对应的时间为
“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”
private void sendDelayMessage() throws Exception {
Message<String> delayMessage = MessageBuilder.withPayload("延时消息").build();
SendResult sendResult = rocketMQTemplate.syncSend("topic-delay", delayMessage, 3000, 3);
}
调用异步发送的方法,共有四个参数,第一个是topic,第二个是消息,第三个是发送超时时间,第四个就是延时发送级别
消费消息的方法和之前的是一致的,切换到对应的topic即可
/**
* @author Mr.Wen
* @version 1.0
* @date 2021-10-11 10:43
*/
@Service
@RocketMQMessageListener(topic = "topic-delay",
consumerGroup = "string_consumer3")
public class SyncDelayMessageConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("收到--"+message);
}
}
批量发送消息的方法和顺序消息类似,只是批量发送消息的时候,不需要设置hash值
public void sendMessage() throws Exception {
TimerTask timerTask = new TimerTask() {
@SneakyThrows
@Override
public void run() {
sendBatchMessage();
}
};
Timer timer = new Timer();
timer.schedule(timerTask,1000,5000);
}
private void sendBatchMessage() throws Exception {
List<Message> messageList = new ArrayList<>();
for(int i=0;i<10;i++){
Message msg = MessageBuilder.withPayload("第"+i+"消息").build();
messageList.add(msg);
}
SendResult sendResult = rocketMQTemplate.syncSend("topic-batch", messageList);
System.out.println("发送结果"+sendResult.toString());
}
@Service
@RocketMQMessageListener(topic = "topic-batch",
consumerGroup = "string_consumer4")
public class SyncBatchMessageConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("收到--"+message);
}
}
rocketmq共有两种过滤方式,分别是TAG和和SQL92,通过消费端配置来实现消息过滤
一个TOPIC可以有多个TAG,需要注意,同一个consumerGroup如果有不同的TAG会出现消费混乱的情况,所以,需要单独定义consumerGroup
在Topic后面加:tag标签
private void sendTagMessage() throws Exception{
Message msgA = MessageBuilder.withPayload("tagA消息消息").build();
Message msgB = MessageBuilder.withPayload("tagB消息消息").build();
rocketMQTemplate.syncSend("topic-tag:tagA",msgA);
rocketMQTemplate.syncSend("topic-tag:tagB",msgB);
}
定义不同的消费者,注意不要放在同一个consumerGroup中
@Service
@RocketMQMessageListener(topic = "topic-tag",
consumerGroup = "string_consumer5",
selectorType = SelectorType.TAG,selectorExpression = "tagA")
public class SyncTagAMessageConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("tagA消费者收到--"+message);
}
}
@Service
@RocketMQMessageListener(topic = "topic-tag",
consumerGroup = "string_consumer6",
selectorType = SelectorType.TAG,selectorExpression = "tagB")
public class SyncTagBMessageConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("tagB消费者收到--"+message);
}
}
SQL92是在消息的生产端设置header属性,在consumer端过滤属性来实现消息过滤,broker默认是不支持的,如果要支持SQL92过滤方式,需要在配置文件broker.conf中修改配置项enablePropertyFilter=true
官方定义的比较类型类型如下
官方定义的常量类型为
private void sendSQL92Message() throws Exception{
Message<String> msg1 = MessageBuilder.withPayload("消息1").setHeader("a",3).build();
rocketMQTemplate.syncSend("topic-sql",msg1);
Message<String> msg2 = MessageBuilder.withPayload("消息2").setHeader("a",4).build();
rocketMQTemplate.syncSend("topic-sql",msg2);
}
selectType设置为SQL92,设置表达式,消费a>0的消息
@Service
@RocketMQMessageListener(topic = "topic-sql",
consumerGroup = "string_consumer7",
selectorType = SelectorType.SQL92,selectorExpression = "a > 0")
public class SyncSQLMessageConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("消费者收到--"+message);
}
}
rocketMQ的事务消息采用2PC的思想用于处理分布式事务问题,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息,只保证事务的最终一致性!他保证了事务发起端的事务的原子性,但是消费端可能存在事务处理失败的情况,消费端失败后会一致重试直到消费成功,若最后都不能消费成功,则需要人工处理。
官网的原理定义如下:
上图说明了事务消息的大致方案,其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。
1.事务消息发送及提交:
(1) 发送消息(half消息)。
(2) 服务端响应消息写入结果。
(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。
(4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)
2.补偿流程:
(1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
(2) Producer收到回查消息,检查回查消息对应的本地事务的状态
(3) 根据本地事务状态,重新Commit或者Rollback
其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。
事务消息需要关联一个事务监听器,内部定义了本地事务执行和回查,本地事务执行状态为UNKNOW时,会调用回查方法
private void sendTransactionalMessage() throws Exception{
TransactionMQProducer producer = new TransactionMQProducer();
producer.setNamesrvAddr("10.116.8.149:9876");
producer.setProducerGroup("my-group1");
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(org.apache.rocketmq.common.message.Message msg, Object arg) {
// 在这里执行本地事务
Integer args = (Integer) arg;
System.out.println("开始执行本地事务");
if(args == 0){
System.out.println("准备提交本地事务");
return LocalTransactionState.COMMIT_MESSAGE;
}else if(args == 1){
System.out.println("准备回滚本地事务");
return LocalTransactionState.ROLLBACK_MESSAGE;
}else{
System.out.println("事务异常,准备执行回查");
return LocalTransactionState.UNKNOW;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println("消息回查接口 --" + new String(msg.getBody()));
return LocalTransactionState.COMMIT_MESSAGE;
}
});
producer.start();
org.apache.rocketmq.common.message.Message msg = new org.apache.rocketmq.common.message.Message();
msg.setTopic("topic-tx");
msg.setBody("发送状态0消息".getBytes());
org.apache.rocketmq.common.message.Message msg1 = new org.apache.rocketmq.common.message.Message();
msg1.setTopic("topic-tx");
msg1.setBody("发送状态1消息".getBytes());
org.apache.rocketmq.common.message.Message msg2 = new org.apache.rocketmq.common.message.Message();
msg2.setTopic("topic-tx");
msg2.setBody("发送状态2消息".getBytes());
// 发送事务消息
TransactionSendResult transactionSendResult = producer.sendMessageInTransaction(msg, 0);
TransactionSendResult transactionSendResult1 = producer.sendMessageInTransaction(msg1, 1);
TransactionSendResult transactionSendResult2 = producer.sendMessageInTransaction(msg2, 2);
System.out.println("事务0发送结果"+transactionSendResult.getSendStatus());
System.out.println("事务1发送结果"+transactionSendResult1.getSendStatus());
System.out.println("事务2发送结果"+transactionSendResult2.getSendStatus());
}
@Service
@RocketMQMessageListener(topic = "topic-tx", consumerGroup = "string_trans_consumer")
public class SyncTransactionMessageConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
int a = 1/0;
}
}
这里消费的时候,如果发生异常会重新消费消息,正常消费会自己返回CONSUME_SUCCESS,发生异常则会返回RECONSUME_LATER然后重新消费该信息,超过默认的重复消费次数后,消息会被放入死信队列
源码中消费消息的代码位于org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer
代码中可以很清楚的看到消费失败会返回RECONSUME_LATER
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt messageExt : msgs) {
log.debug("received msg: {}", messageExt);
try {
long now = System.currentTimeMillis();
handleMessage(messageExt);
long costTime = System.currentTimeMillis() - now;
log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
} catch (Exception e) {
log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e);
context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
这是消费的返回值
public enum ConsumeConcurrentlyStatus {
/**
* Success consumption
*/
CONSUME_SUCCESS,
/**
* Failure consumption,later try to consume
*/
RECONSUME_LATER;
}
代码中我们模拟了1/0的异常,重复消费后再管理界面的死信队列里可以看到
在consumer端判断和消息是否消费过,将已经消费的消息存储到redis或者数据库中, 消费之前做查询判断是否消费过即可