切換語言為:簡體
原生 RocketMq 使用方法以及 Spring Boot 整合 RocketMq 詳細步驟

原生 RocketMq 使用方法以及 Spring Boot 整合 RocketMq 詳細步驟

  • 爱糖宝
  • 2024-06-04
  • 2152
  • 0
  • 0

原生版本

新建一個SpringBoot專案,pom檔案如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.3.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <groupId>com.xpc</groupId>
    <artifactId>springboot-rocketmq</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>


        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.4.0</version>
        </dependency>


    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>


</project>

定義生產者

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.stereotype.Component;

@Component
public class XpcProducer {


    private DefaultMQProducer producer;

    /**
     * 定義生產者組
     */
    private final static String PRODUCER_GROUP = "xpc_topic_group";


    public XpcProducer() {
        producer = new DefaultMQProducer();
        producer.setProducerGroup(PRODUCER_GROUP);
        producer.setNamesrvAddr("127.0.0.1:9876");
        try {
            //啟動
            producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    public DefaultMQProducer getProducer() {
        return this.producer;
    }
}

傳送訊息

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController
public class MqController {

    private static final Logger log = LoggerFactory.getLogger(MqController.class);

    @Resource
    private XpcProducer xpcProducer;

    @GetMapping("/sendXpc")
    public String send() throws Exception{
        DefaultMQProducer producer = xpcProducer.getProducer();
        Message message = new Message("xpc-topic-mq","*","","hello mq".getBytes());
        SendResult sendResult = producer.send(message);
        log.info("傳送結果是:{}",sendResult.getSendStatus());
        return "ok";
    }
}

定義一個消費者

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

public class XpcConsumer {

    private static final Logger log = LoggerFactory.getLogger(XpcConsumer.class);

    private static final String CONSUMER_GROUP = "xpc_topic_group_consumer";

    private DefaultMQPushConsumer consumer;

    public XpcConsumer() {
        consumer = new DefaultMQPushConsumer();
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setConsumerGroup(CONSUMER_GROUP);

        try{
            consumer.subscribe("xpc-topic-mq","*");
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    try{
                        MessageExt messageExt = msgs.get(0);
                        String message = new String(messageExt.getBody());
                        log.info("收到訊息:{}",message);
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }catch (Exception e) {
                        log.error("消費訊息異常:{}",e.getMessage());
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                }
            });
            consumer.start();
        }catch (Exception e) {
            log.error("XpcConsumer | {}",e.getMessage());
        }
    }
}

在啟動類中啟動消費者

import com.xpc.xpc.XpcConsumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class App {

    public static void main(String[] args) {
        SpringApplication.run(App.class,args);
        new XpcConsumer();
    }
}

原生 RocketMq 使用方法以及 Spring Boot 整合 RocketMq 詳細步驟

生產者同步傳送

Message message = new Message("xpc-topic-mq","*","","hello mq".getBytes());
SendResult sendResult = producer.send(message);
log.info("傳送結果是:{}",sendResult.getSendStatus());

生產者非同步傳送

Message message = new Message("xpc-topic-mq","*","","hello mq".getBytes());
producer.send(message, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        log.info("傳送結果是:{}",sendResult.getSendStatus());
    }
    @Override
    public void onException(Throwable e) {
        log.info("傳送訊息異常: {}",e.getMessage());
    }
});

生產者ONEWAY傳送方式

Message message = new Message("xpc-topic-mq","*","","hello mq".getBytes());
producer.sendOneway(message);

延遲訊息

@GetMapping("/delayMsg")
public String delayMsg() throws Exception{
    DefaultMQProducer producer = xpcProducer.getProducer();
    Message message = new Message("xpc-topic-mq","*","","這是延時訊息".getBytes());
    //延時訊息:1s,5s,10s,30s,1m,2m,3m,.....1h,2h
    //定義訊息延時級別
    message.setDelayTimeLevel(2);
    SendResult sendResult = producer.send(message);
    log.info("訊息傳送結果:{}",sendResult.getSendStatus());
    return "ok";
}

從傳送成功到訊息中間延遲了5s

原生 RocketMq 使用方法以及 Spring Boot 整合 RocketMq 詳細步驟

順序訊息

生產者

@GetMapping("/orderLy")
public String orderLy() throws Exception{
    DefaultMQProducer producer = xpcProducer.getProducer();
    for(int i = 0; i < 5; i ++) {
        Message message = new Message("xpc-topic-mq-orderly","*","","這是順序訊息-".getBytes());
        producer.send(message, new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                //arg的值就是send方法最後一個引數,也就是我們這裏傳的1
                int parseInt = Integer.parseInt(arg.toString());
                int index = parseInt % mqs.size();
                return mqs.get(index);
            }
        },1);
    }
    return "ok";
}

消費者

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

public class MqOrderlyConsumer {

    private static final Logger log = LoggerFactory.getLogger(MqOrderlyConsumer.class);

    private static final String CONSUMER_GROUP = "xpc_topic_group_consumer_orderly";


    private DefaultMQPushConsumer consumer;

    public MqOrderlyConsumer() {
        consumer = new DefaultMQPushConsumer();
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setConsumerGroup(CONSUMER_GROUP);

        try {
            consumer.subscribe("xpc-topic-mq-orderly","*");
            
            //這裏要切換成new MessageListenerOrderly()
            consumer.registerMessageListener(new MessageListenerOrderly() {
                @Override
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                    MessageExt messageExt = msgs.get(0);
                    try{
                        log.info("收到服務端訊息:{}",new String(messageExt.getBody()));
                        return ConsumeOrderlyStatus.SUCCESS;
                    }catch (Exception e) {
                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                    }
                }
            });
            consumer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }
}

原生 RocketMq 使用方法以及 Spring Boot 整合 RocketMq 詳細步驟

-- 注意:順序訊息只能同步傳送,不能使用非同步傳送,不然消費者順序會不一致

SpringBoot整合RocketMQ

一般我們很少使用上面這種方式來使用,比較常用的是接下來這種,新建一個SpringBoot專案,pom.xml檔案如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.xpc</groupId>
    <artifactId>springboot-rocketmq</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <packaging>jar</packaging>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.18</version>
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.9.4</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.18</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
            <scope>runtime</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>


</project>

application.yml檔案如下

rocketmq:
  consumer:
    group: xpc_consumer_group
    pull-batch-size: 10

  producer:
    group: xpc_producer_group
    send-message-timeout: 3000
    retry-times-when-send-failed: 2
    max-message-size: 4096
    retry-next-server: false
  name-server: 127.0.0.1:9876

server:
  port: 7002

生產者

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.UUID;

@Slf4j
@RestController
public class ProducerController {


    @Resource
    private RocketMQTemplate rocketMQTemplate;


    /**
     * 同步傳送
     * @return
     */
    @GetMapping("/send")
    public String send() {
        Message<String> msg = MessageBuilder.withPayload("Hello,RocketMQ").build();
        SendResult sendResult = rocketMQTemplate.syncSend("xpc-topic", msg);
        log.info("訊息傳送結果:{}",sendResult.getSendStatus());
        return "ok";
    }

    /**
     * 非同步傳送
     */
    @GetMapping("/asyncSend")
    public String asyncSend() {
        Message<String> msg = MessageBuilder.withPayload("這是非同步傳送").build();
        rocketMQTemplate.asyncSend("xpc-topic", msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("非同步傳送結果:{}",sendResult.getSendStatus());
            }

            @Override
            public void onException(Throwable throwable) {
                log.info("非同步傳送異常:{}",throwable.getMessage());
            }
        });
        return "ok";
    }

    /**
     * 單向傳送
     */
    @GetMapping("/oneWay")
    public String oneWay() {
        Message<String> msg = MessageBuilder.withPayload("這是單向傳送").build();
        rocketMQTemplate.sendOneWay("xpc-topic",msg);
        return "ok";
    }

    /**
     * 順序訊息
     */
    @GetMapping("/orderly")
    public String orderly() {
        Message<String> msg = MessageBuilder.withPayload("這是順序訊息").build();
        //最後一個引數會被使用決定使用topic中的哪個queue,預設使用的Hash取模SelectMessageQueueByHash
        SendResult sendResult = rocketMQTemplate.syncSendOrderly("xpc-topic-orderly", msg, "1");
        return "ok";
    }

    /**
     * 事務訊息
     */
    @GetMapping("/tx")
    public String tx() {
        String uuid = UUID.randomUUID().toString();
        Message<String> msg = MessageBuilder
                .withPayload("這是事務訊息")
                .setHeader(RocketMQHeaders.TRANSACTION_ID,uuid)
                .setHeader(RocketMQHeaders.KEYS,"1001")
                .build();
        TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction("xpc-topic-tx", msg, null);
        log.info("事務訊息傳送結果:{}",transactionSendResult.getSendStatus());
        return "ok";
    }
}

定義消費者

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RocketMQMessageListener(
        topic = "xpc-topic-tx",
        consumerGroup = "xpc_consumer_group-tx"
        )
public class SyncConsumer implements RocketMQListener<MessageExt> , RocketMQPushConsumerLifecycleListener {

    @Override
    public void onMessage(MessageExt message) {
        byte[] body = message.getBody();
        log.info("消費者接收到訊息:{}",new String(body));

        try{
            int i = 1 / 0;
        }catch (Exception e) {
            //如果達到重試次數,執行後續的業務邏輯
            log.error("重試次數:{}",message.getReconsumeTimes());
            throw new RuntimeException("測試重試次數");
        }
    }

    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) {
        try {
            //設定消費端重試次數
            consumer.setMaxReconsumeTimes(2);
            consumer.subscribe("xpc-topic-tx","*");
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }
}

順序訊息消費者

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RocketMQMessageListener(
        consumerGroup = "orderly-group",
        //主要是把該值設定為ConsumeMode.ORDERLY
        consumeMode = ConsumeMode.ORDERLY,
        topic = "xpc-topic-orderly",
        messageModel = MessageModel.CLUSTERING
)
public class OrderlyConsumer implements RocketMQListener {


    @Override
    public void onMessage(Object message) {
        log.info("收到的訊息是:{}",message);
    }
}

事務訊息的業務程式碼需要寫在Listener中,如下

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;


@Slf4j
@Component
@RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")
public class TxConsumer implements RocketMQLocalTransactionListener {

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        //執行本地事務
        String payload = (String) msg.getPayload();
        log.info("收到訊息:{}",payload);
        return RocketMQLocalTransactionState.UNKNOWN;
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        log.info("開始執行本地回查");
        //這裏的值在傳送事務訊息指定的兩個Header的值
        Object txId = msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
        Object key = msg.getHeaders().get(RocketMQHeaders.KEYS);
        return RocketMQLocalTransactionState.COMMIT;
    }
}

原生 RocketMq 使用方法以及 Spring Boot 整合 RocketMq 詳細步驟

0則評論

您的電子郵件等資訊不會被公開,以下所有項目均必填

OK! You can skip this field.