原生版本
新建一个SpringBoot项目,pom文件如下
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.3.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.xpc</groupId> <artifactId>springboot-rocketmq</artifactId> <version>1.0-SNAPSHOT</version> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.4.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
定义生产者
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.springframework.stereotype.Component; @Component public class XpcProducer { private DefaultMQProducer producer; /** * 定义生产者组 */ private final static String PRODUCER_GROUP = "xpc_topic_group"; public XpcProducer() { producer = new DefaultMQProducer(); producer.setProducerGroup(PRODUCER_GROUP); producer.setNamesrvAddr("127.0.0.1:9876"); try { //启动 producer.start(); } catch (MQClientException e) { e.printStackTrace(); } } public DefaultMQProducer getProducer() { return this.producer; } }
发送消息
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; @RestController public class MqController { private static final Logger log = LoggerFactory.getLogger(MqController.class); @Resource private XpcProducer xpcProducer; @GetMapping("/sendXpc") public String send() throws Exception{ DefaultMQProducer producer = xpcProducer.getProducer(); Message message = new Message("xpc-topic-mq","*","","hello mq".getBytes()); SendResult sendResult = producer.send(message); log.info("发送结果是:{}",sendResult.getSendStatus()); return "ok"; } }
定义一个消费者
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; public class XpcConsumer { private static final Logger log = LoggerFactory.getLogger(XpcConsumer.class); private static final String CONSUMER_GROUP = "xpc_topic_group_consumer"; private DefaultMQPushConsumer consumer; public XpcConsumer() { consumer = new DefaultMQPushConsumer(); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.setConsumerGroup(CONSUMER_GROUP); try{ consumer.subscribe("xpc-topic-mq","*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try{ MessageExt messageExt = msgs.get(0); String message = new String(messageExt.getBody()); log.info("收到消息:{}",message); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }catch (Exception e) { log.error("消费消息异常:{}",e.getMessage()); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } }); consumer.start(); }catch (Exception e) { log.error("XpcConsumer | {}",e.getMessage()); } } }
在启动类中启动消费者
import com.xpc.xpc.XpcConsumer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class App { public static void main(String[] args) { SpringApplication.run(App.class,args); new XpcConsumer(); } }
生产者同步发送
Message message = new Message("xpc-topic-mq","*","","hello mq".getBytes()); SendResult sendResult = producer.send(message); log.info("发送结果是:{}",sendResult.getSendStatus());
生产者异步发送
Message message = new Message("xpc-topic-mq","*","","hello mq".getBytes()); producer.send(message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info("发送结果是:{}",sendResult.getSendStatus()); } @Override public void onException(Throwable e) { log.info("发送消息异常: {}",e.getMessage()); } });
生产者ONEWAY发送方式
Message message = new Message("xpc-topic-mq","*","","hello mq".getBytes()); producer.sendOneway(message);
延迟消息
@GetMapping("/delayMsg") public String delayMsg() throws Exception{ DefaultMQProducer producer = xpcProducer.getProducer(); Message message = new Message("xpc-topic-mq","*","","这是延时消息".getBytes()); //延时消息:1s,5s,10s,30s,1m,2m,3m,.....1h,2h //定义消息延时级别 message.setDelayTimeLevel(2); SendResult sendResult = producer.send(message); log.info("消息发送结果:{}",sendResult.getSendStatus()); return "ok"; }
从发送成功到消息中间延迟了5s
顺序消息
生产者
@GetMapping("/orderLy") public String orderLy() throws Exception{ DefaultMQProducer producer = xpcProducer.getProducer(); for(int i = 0; i < 5; i ++) { Message message = new Message("xpc-topic-mq-orderly","*","","这是顺序消息-".getBytes()); producer.send(message, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { //arg的值就是send方法最后一个参数,也就是我们这里传的1 int parseInt = Integer.parseInt(arg.toString()); int index = parseInt % mqs.size(); return mqs.get(index); } },1); } return "ok"; }
消费者
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; public class MqOrderlyConsumer { private static final Logger log = LoggerFactory.getLogger(MqOrderlyConsumer.class); private static final String CONSUMER_GROUP = "xpc_topic_group_consumer_orderly"; private DefaultMQPushConsumer consumer; public MqOrderlyConsumer() { consumer = new DefaultMQPushConsumer(); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.setConsumerGroup(CONSUMER_GROUP); try { consumer.subscribe("xpc-topic-mq-orderly","*"); //这里要切换成new MessageListenerOrderly() consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { MessageExt messageExt = msgs.get(0); try{ log.info("收到服务端消息:{}",new String(messageExt.getBody())); return ConsumeOrderlyStatus.SUCCESS; }catch (Exception e) { return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } } }); consumer.start(); } catch (MQClientException e) { e.printStackTrace(); } } }
-- 注意:顺序消息只能同步发送,不能使用异步发送,不然消费者顺序会不一致
SpringBoot整合RocketMQ
一般我们很少使用上面这种方式来使用,比较常用的是接下来这种,新建一个SpringBoot项目,pom.xml文件如下
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.xpc</groupId> <artifactId>springboot-rocketmq</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <packaging>jar</packaging> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.18</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.2</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.9.4</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.16.18</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> <scope>runtime</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
application.yml文件如下
rocketmq: consumer: group: xpc_consumer_group pull-batch-size: 10 producer: group: xpc_producer_group send-message-timeout: 3000 retry-times-when-send-failed: 2 max-message-size: 4096 retry-next-server: false name-server: 127.0.0.1:9876 server: port: 7002
生产者
import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.TransactionSendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.util.UUID; @Slf4j @RestController public class ProducerController { @Resource private RocketMQTemplate rocketMQTemplate; /** * 同步发送 * @return */ @GetMapping("/send") public String send() { Message<String> msg = MessageBuilder.withPayload("Hello,RocketMQ").build(); SendResult sendResult = rocketMQTemplate.syncSend("xpc-topic", msg); log.info("消息发送结果:{}",sendResult.getSendStatus()); return "ok"; } /** * 异步发送 */ @GetMapping("/asyncSend") public String asyncSend() { Message<String> msg = MessageBuilder.withPayload("这是异步发送").build(); rocketMQTemplate.asyncSend("xpc-topic", msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info("异步发送结果:{}",sendResult.getSendStatus()); } @Override public void onException(Throwable throwable) { log.info("异步发送异常:{}",throwable.getMessage()); } }); return "ok"; } /** * 单向发送 */ @GetMapping("/oneWay") public String oneWay() { Message<String> msg = MessageBuilder.withPayload("这是单向发送").build(); rocketMQTemplate.sendOneWay("xpc-topic",msg); return "ok"; } /** * 顺序消息 */ @GetMapping("/orderly") public String orderly() { Message<String> msg = MessageBuilder.withPayload("这是顺序消息").build(); //最后一个参数会被使用决定使用topic中的哪个queue,默认使用的Hash取模SelectMessageQueueByHash SendResult sendResult = rocketMQTemplate.syncSendOrderly("xpc-topic-orderly", msg, "1"); return "ok"; } /** * 事务消息 */ @GetMapping("/tx") public String tx() { String uuid = UUID.randomUUID().toString(); Message<String> msg = MessageBuilder .withPayload("这是事务消息") .setHeader(RocketMQHeaders.TRANSACTION_ID,uuid) .setHeader(RocketMQHeaders.KEYS,"1001") .build(); TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction("xpc-topic-tx", msg, null); log.info("事务消息发送结果:{}",transactionSendResult.getSendStatus()); return "ok"; } }
定义消费者
import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener; import org.springframework.stereotype.Component; @Slf4j @Component @RocketMQMessageListener( topic = "xpc-topic-tx", consumerGroup = "xpc_consumer_group-tx" ) public class SyncConsumer implements RocketMQListener<MessageExt> , RocketMQPushConsumerLifecycleListener { @Override public void onMessage(MessageExt message) { byte[] body = message.getBody(); log.info("消费者接收到消息:{}",new String(body)); try{ int i = 1 / 0; }catch (Exception e) { //如果达到重试次数,执行后续的业务逻辑 log.error("重试次数:{}",message.getReconsumeTimes()); throw new RuntimeException("测试重试次数"); } } @Override public void prepareStart(DefaultMQPushConsumer consumer) { try { //设置消费端重试次数 consumer.setMaxReconsumeTimes(2); consumer.subscribe("xpc-topic-tx","*"); } catch (MQClientException e) { e.printStackTrace(); } } }
顺序消息消费者
import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; @Slf4j @Component @RocketMQMessageListener( consumerGroup = "orderly-group", //主要是把该值设置为ConsumeMode.ORDERLY consumeMode = ConsumeMode.ORDERLY, topic = "xpc-topic-orderly", messageModel = MessageModel.CLUSTERING ) public class OrderlyConsumer implements RocketMQListener { @Override public void onMessage(Object message) { log.info("收到的消息是:{}",message); } }
事务消息的业务代码需要写在Listener中,如下
import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; @Slf4j @Component @RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate") public class TxConsumer implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { //执行本地事务 String payload = (String) msg.getPayload(); log.info("收到消息:{}",payload); return RocketMQLocalTransactionState.UNKNOWN; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { log.info("开始执行本地回查"); //这里的值在发送事务消息指定的两个Header的值 Object txId = msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID); Object key = msg.getHeaders().get(RocketMQHeaders.KEYS); return RocketMQLocalTransactionState.COMMIT; } }