原生版本
新建一個SpringBoot專案,pom檔案如下
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.3.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.xpc</groupId> <artifactId>springboot-rocketmq</artifactId> <version>1.0-SNAPSHOT</version> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.4.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
定義生產者
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.springframework.stereotype.Component; @Component public class XpcProducer { private DefaultMQProducer producer; /** * 定義生產者組 */ private final static String PRODUCER_GROUP = "xpc_topic_group"; public XpcProducer() { producer = new DefaultMQProducer(); producer.setProducerGroup(PRODUCER_GROUP); producer.setNamesrvAddr("127.0.0.1:9876"); try { //啟動 producer.start(); } catch (MQClientException e) { e.printStackTrace(); } } public DefaultMQProducer getProducer() { return this.producer; } }
傳送訊息
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; @RestController public class MqController { private static final Logger log = LoggerFactory.getLogger(MqController.class); @Resource private XpcProducer xpcProducer; @GetMapping("/sendXpc") public String send() throws Exception{ DefaultMQProducer producer = xpcProducer.getProducer(); Message message = new Message("xpc-topic-mq","*","","hello mq".getBytes()); SendResult sendResult = producer.send(message); log.info("傳送結果是:{}",sendResult.getSendStatus()); return "ok"; } }
定義一個消費者
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; public class XpcConsumer { private static final Logger log = LoggerFactory.getLogger(XpcConsumer.class); private static final String CONSUMER_GROUP = "xpc_topic_group_consumer"; private DefaultMQPushConsumer consumer; public XpcConsumer() { consumer = new DefaultMQPushConsumer(); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.setConsumerGroup(CONSUMER_GROUP); try{ consumer.subscribe("xpc-topic-mq","*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try{ MessageExt messageExt = msgs.get(0); String message = new String(messageExt.getBody()); log.info("收到訊息:{}",message); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }catch (Exception e) { log.error("消費訊息異常:{}",e.getMessage()); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } }); consumer.start(); }catch (Exception e) { log.error("XpcConsumer | {}",e.getMessage()); } } }
在啟動類中啟動消費者
import com.xpc.xpc.XpcConsumer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class App { public static void main(String[] args) { SpringApplication.run(App.class,args); new XpcConsumer(); } }
生產者同步傳送
Message message = new Message("xpc-topic-mq","*","","hello mq".getBytes()); SendResult sendResult = producer.send(message); log.info("傳送結果是:{}",sendResult.getSendStatus());
生產者非同步傳送
Message message = new Message("xpc-topic-mq","*","","hello mq".getBytes()); producer.send(message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info("傳送結果是:{}",sendResult.getSendStatus()); } @Override public void onException(Throwable e) { log.info("傳送訊息異常: {}",e.getMessage()); } });
生產者ONEWAY傳送方式
Message message = new Message("xpc-topic-mq","*","","hello mq".getBytes()); producer.sendOneway(message);
延遲訊息
@GetMapping("/delayMsg") public String delayMsg() throws Exception{ DefaultMQProducer producer = xpcProducer.getProducer(); Message message = new Message("xpc-topic-mq","*","","這是延時訊息".getBytes()); //延時訊息:1s,5s,10s,30s,1m,2m,3m,.....1h,2h //定義訊息延時級別 message.setDelayTimeLevel(2); SendResult sendResult = producer.send(message); log.info("訊息傳送結果:{}",sendResult.getSendStatus()); return "ok"; }
從傳送成功到訊息中間延遲了5s
順序訊息
生產者
@GetMapping("/orderLy") public String orderLy() throws Exception{ DefaultMQProducer producer = xpcProducer.getProducer(); for(int i = 0; i < 5; i ++) { Message message = new Message("xpc-topic-mq-orderly","*","","這是順序訊息-".getBytes()); producer.send(message, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { //arg的值就是send方法最後一個引數,也就是我們這裏傳的1 int parseInt = Integer.parseInt(arg.toString()); int index = parseInt % mqs.size(); return mqs.get(index); } },1); } return "ok"; }
消費者
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; public class MqOrderlyConsumer { private static final Logger log = LoggerFactory.getLogger(MqOrderlyConsumer.class); private static final String CONSUMER_GROUP = "xpc_topic_group_consumer_orderly"; private DefaultMQPushConsumer consumer; public MqOrderlyConsumer() { consumer = new DefaultMQPushConsumer(); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.setConsumerGroup(CONSUMER_GROUP); try { consumer.subscribe("xpc-topic-mq-orderly","*"); //這裏要切換成new MessageListenerOrderly() consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { MessageExt messageExt = msgs.get(0); try{ log.info("收到服務端訊息:{}",new String(messageExt.getBody())); return ConsumeOrderlyStatus.SUCCESS; }catch (Exception e) { return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } } }); consumer.start(); } catch (MQClientException e) { e.printStackTrace(); } } }
-- 注意:順序訊息只能同步傳送,不能使用非同步傳送,不然消費者順序會不一致
SpringBoot整合RocketMQ
一般我們很少使用上面這種方式來使用,比較常用的是接下來這種,新建一個SpringBoot專案,pom.xml檔案如下
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.xpc</groupId> <artifactId>springboot-rocketmq</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <packaging>jar</packaging> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.18</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.2</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.9.4</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.16.18</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> <scope>runtime</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
application.yml檔案如下
rocketmq: consumer: group: xpc_consumer_group pull-batch-size: 10 producer: group: xpc_producer_group send-message-timeout: 3000 retry-times-when-send-failed: 2 max-message-size: 4096 retry-next-server: false name-server: 127.0.0.1:9876 server: port: 7002
生產者
import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.TransactionSendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.util.UUID; @Slf4j @RestController public class ProducerController { @Resource private RocketMQTemplate rocketMQTemplate; /** * 同步傳送 * @return */ @GetMapping("/send") public String send() { Message<String> msg = MessageBuilder.withPayload("Hello,RocketMQ").build(); SendResult sendResult = rocketMQTemplate.syncSend("xpc-topic", msg); log.info("訊息傳送結果:{}",sendResult.getSendStatus()); return "ok"; } /** * 非同步傳送 */ @GetMapping("/asyncSend") public String asyncSend() { Message<String> msg = MessageBuilder.withPayload("這是非同步傳送").build(); rocketMQTemplate.asyncSend("xpc-topic", msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info("非同步傳送結果:{}",sendResult.getSendStatus()); } @Override public void onException(Throwable throwable) { log.info("非同步傳送異常:{}",throwable.getMessage()); } }); return "ok"; } /** * 單向傳送 */ @GetMapping("/oneWay") public String oneWay() { Message<String> msg = MessageBuilder.withPayload("這是單向傳送").build(); rocketMQTemplate.sendOneWay("xpc-topic",msg); return "ok"; } /** * 順序訊息 */ @GetMapping("/orderly") public String orderly() { Message<String> msg = MessageBuilder.withPayload("這是順序訊息").build(); //最後一個引數會被使用決定使用topic中的哪個queue,預設使用的Hash取模SelectMessageQueueByHash SendResult sendResult = rocketMQTemplate.syncSendOrderly("xpc-topic-orderly", msg, "1"); return "ok"; } /** * 事務訊息 */ @GetMapping("/tx") public String tx() { String uuid = UUID.randomUUID().toString(); Message<String> msg = MessageBuilder .withPayload("這是事務訊息") .setHeader(RocketMQHeaders.TRANSACTION_ID,uuid) .setHeader(RocketMQHeaders.KEYS,"1001") .build(); TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction("xpc-topic-tx", msg, null); log.info("事務訊息傳送結果:{}",transactionSendResult.getSendStatus()); return "ok"; } }
定義消費者
import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener; import org.springframework.stereotype.Component; @Slf4j @Component @RocketMQMessageListener( topic = "xpc-topic-tx", consumerGroup = "xpc_consumer_group-tx" ) public class SyncConsumer implements RocketMQListener<MessageExt> , RocketMQPushConsumerLifecycleListener { @Override public void onMessage(MessageExt message) { byte[] body = message.getBody(); log.info("消費者接收到訊息:{}",new String(body)); try{ int i = 1 / 0; }catch (Exception e) { //如果達到重試次數,執行後續的業務邏輯 log.error("重試次數:{}",message.getReconsumeTimes()); throw new RuntimeException("測試重試次數"); } } @Override public void prepareStart(DefaultMQPushConsumer consumer) { try { //設定消費端重試次數 consumer.setMaxReconsumeTimes(2); consumer.subscribe("xpc-topic-tx","*"); } catch (MQClientException e) { e.printStackTrace(); } } }
順序訊息消費者
import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; @Slf4j @Component @RocketMQMessageListener( consumerGroup = "orderly-group", //主要是把該值設定為ConsumeMode.ORDERLY consumeMode = ConsumeMode.ORDERLY, topic = "xpc-topic-orderly", messageModel = MessageModel.CLUSTERING ) public class OrderlyConsumer implements RocketMQListener { @Override public void onMessage(Object message) { log.info("收到的訊息是:{}",message); } }
事務訊息的業務程式碼需要寫在Listener中,如下
import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; @Slf4j @Component @RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate") public class TxConsumer implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { //執行本地事務 String payload = (String) msg.getPayload(); log.info("收到訊息:{}",payload); return RocketMQLocalTransactionState.UNKNOWN; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { log.info("開始執行本地回查"); //這裏的值在傳送事務訊息指定的兩個Header的值 Object txId = msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID); Object key = msg.getHeaders().get(RocketMQHeaders.KEYS); return RocketMQLocalTransactionState.COMMIT; } }