切换语言为:繁体

使用WebSocket和RabbitMQ实现任务进度推送

  • 爱糖宝
  • 2024-06-24
  • 2104
  • 0
  • 0

1、WebSocket简述

WebSocket 是 HTML5 新增的 API,是一种基于 TCP 连接的持久化双向通信协议。

WebSocket 默认连接端口是80,运行端口是443。

WebSocket 连接地址示例(以 ws 或者 wss 开头):ws://text.com:8080 或 wss://text.com:8080(加密)。

Springboot项目导入WebSocket依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>


注册WebSocket节点

@Slf4j
@Configuration
public class WebSocketConfig { 
    /**
     * ServerEndpointExporter 作用
     * 这个Bean会自动注册使用@ServerEndpoint注解声明的websocket endpoint
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        log.info("serverEndpointExporter被注入了");
        return new ServerEndpointExporter();
    }
}

2、RabbitMQ简述

RabbitMQ是一款开源的消息中间件,实现了高级消息队列协议(AMQP),提供了可靠的消息传递机制,用于在分布式系统中传递和存储消息。

Springboot项目导入RabbitMQ依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>


在application.yml文件中配置信息:

spring:
  rabbitmq:
    host: 172.16.10.XXX
    port: 5672
    virtual-host: /
    username: XXX
    password: XXXXXX


生产者生产消息,并向消息队列发送消息简单示例,这里以接口的形式呈现。

若要生产进度消息,还需要再任务模型里进行设计,或按分页计算进度,或预估计算时间

@Slf4j
@RestController
@RequestMapping("/inform")
public class MQProducerController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    ObjectMapper mapper = new ObjectMapper();

    @RequestMapping("/test")
    public Result<String> index(String msgType, String userId, String sessionId, Float result) throws JsonProcessingException {
        AnalysisTaskProgressMessage<Float> progressMsg = new AnalysisTaskProgressMessage<>(msgType, userId, sessionId, result);

        String progressMsgJson = mapper.writeValueAsString(progressMsg);
        rabbitTemplate.convertAndSend("simple.queue", progressMsgJson);
        log.info(progressMsgJson + "消息发送成功!");
        return Result.OK("消息发送成功!");
    }
}


3、注册WebSocket端点

@Slf4j
@ServerEndpoint(value = "/webSocket/analysisTask/{userId}", encoders = {BaseModelEncoder.class})
@Component
public class AnalysisTaskWSSvr {
    /**
     * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
     */
    private static ConcurrentHashMap<AnalysisTaskCompositeId, AnalysisTaskWSSvr> webSocketMap = new ConcurrentHashMap<>();

    /**
     * 记录当前在线用户的ID
     */
    private static Set<String> onlineUserSet = new CopyOnWriteArraySet<>();

    /**
     * 与某个客户端的连接会话,需要通过它来给客户端发送数据
     */
    private Session session;

    /**
     * 接收 AnalysisTaskCompositeId
     */
    private AnalysisTaskCompositeId analysisTaskCompositeId;

    /**
     * 连接建立成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("userId") String userId, @PathParam("requestId") String requestId) {
        // 设置长连接时间
        // session.setMaxIdleTimeout(3600000);
        this.session = session;
        AnalysisTaskCompositeId analysisTaskId = new AnalysisTaskCompositeId(userId, this.session.getId());
        this.analysisTaskCompositeId = analysisTaskId;
        if (webSocketMap.containsKey(analysisTaskId)) {
            webSocketMap.remove(analysisTaskId);
            webSocketMap.put(analysisTaskId, this);
        } else {
            // 加入set中
            webSocketMap.put(analysisTaskId, this);
            addOnlineCount(analysisTaskId.getUserId());
        }
        try {
            AnalysisTaskWSSvr.sendSuccessMessage(analysisTaskId);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        log.info("用户连接:" + userId + ",当前在线人数为:" + getOnlineCount());
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        if (webSocketMap.containsKey(analysisTaskCompositeId)) {
            webSocketMap.remove(analysisTaskCompositeId);
            //从set中删除
            subOnlineCount(analysisTaskCompositeId.getUserId());
        }
        log.info("用户退出:" + analysisTaskCompositeId.getUserId() + ",当前在线人数为:" + getOnlineCount());
    }

    /**
     * 收到客户端消息后调用的方法
     *
     * @param message 客户端发送过来的消息
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("用户消息:" + analysisTaskCompositeId.getUserId() + ", 报文:" + message);
        if (StringUtils.isEmpty(message)) return;
        // TODO: 接收信息,调用相关服务
        
    }

    /**
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.warn("用户错误:" + this.analysisTaskCompositeId.getUserId() + ",原因:" + error.getMessage());
        error.printStackTrace();
    }

    public static void sendSuccessMessage(AnalysisTaskCompositeId compositeId) throws IOException {
        AnalysisTaskWSSvr webSocketServer = webSocketMap.get(compositeId);
        webSocketServer.session.getBasicRemote().sendText(String.valueOf(compositeId));
    }

    public static int getOnlineCount() {
        return onlineUserSet.size();
    }

    public static void addOnlineCount(String userId) {
        if (!onlineUserSet.contains(userId)) {
            onlineUserSet.add(userId);
        }
    }

    public static void subOnlineCount(String userId) {
        if (onlineUserSet.contains(userId)) {
            onlineUserSet.remove(userId);
        }
    }
}


4、监听队列,消费消息

构建消息监听器,同时调用相关功能处理消息,这里有多种方式可以选择:

1、采用Direct订阅模型,不同的消息被不同的队列进行消费;

2、采用简单队列模式,生产者生产的消息采用同一个方式进行封装,并标注好消息类型,监听器接收到消息后会根据消息类型分发给不同的websocket连接。

@Slf4j
@Configuration
@RequiredArgsConstructor
public class AnalysisTaskProgressListener {
    /**
     * 监听任务进度状态
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "analysis.queue1", durable = "true"),
            exchange = @Exchange(name = "analysisTask", type = ExchangeTypes.DIRECT),
            key = "task1"
    ))
    public void listenTaskProgress(String msg) {
        executeTask(msg);
    }
    
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(String msg) {
        executeTask(msg);
    }
    
    ```
    private static void executeTask(String msg) {
        log.info("监听到消息" + msg);
        JSONObject jsonMsg = JSON.parseObject(msg);
        // 对消息类型做判断
        AnalysisTaskWSSvr.sendProgressMessage(
                jsonMsg.getString("userId"),
                jsonMsg.getString("sessionId"),
                jsonMsg.getString("taskId"),
                jsonMsg.getString("taskType"),
                jsonMsg.getFloatValue("progress")
        );
    }
}


5、使用vue3构建WebSocket连接

这里使用vue3构建前端界面,并建立WebSocket连接

WebSocket连接:ws://localhost:8889/webSocket/analysisTask/#{userId}

export default class SocketService {
    static instance = null;
    static get Instance() {
      if (!this.instance) {
        this.instance = new SocketService();
      }
      return this.instance;
    }
    // 和服务端连接的socket对象
    ws = null;
    // 存储回调函数
    callBackMapping = {};
    // 标识是否连接成功
    connected = false;
    // 记录重试的次数
    sendRetryCount = 0;
    // 重新连接尝试的次数
    connectRetryCount = 0;
    //  定义连接服务器的方法
    connect() {
      // 连接服务器
      if (!window.WebSocket) {
        return console.log('您的浏览器不支持WebSocket');
      }
      // let token = $.cookie('123');
      // let token = '4E6EF539AAF119D82AC4C2BC84FBA21F';
      let url = 'ws://localhost:8889/webSocket/analysisTask/001';
      this.ws = new WebSocket(url);
      // 连接成功的事件
      this.ws.onopen = () => {
        console.log('连接服务端成功了');
        
        this.connected = true;
        // 重置重新连接的次数
        this.connectRetryCount = 0;
      };
      // 1.连接服务端失败
      // 2.当连接成功之后, 服务器关闭的情况
      this.ws.onclose = () => {
        console.log('连接服务端失败');
        this.connected = false;
        this.connectRetryCount++;
        setTimeout(() => {
          this.connect();
        }, 500 * this.connectRetryCount);
      };
      // 得到服务端发送过来的数据
      this.ws.onmessage = msg => {
        console.log(msg.data, '从服务端获取到了数据');
      };
    }
    // 回调函数的注册
    registerCallBack(socketType, callBack) {
      this.callBackMapping[socketType] = callBack;
    }
    // 取消某一个回调函数
    unRegisterCallBack(socketType) {
      this.callBackMapping[socketType] = null;
    }
    // 发送数据的方法
    send(data) {
      // 判断此时此刻有没有连接成功
      if (this.connected) {
        this.sendRetryCount = 0;
        try {
          this.ws.send(JSON.stringify(data));
        } catch (e) {
          this.ws.send(data);
        }
      } else {
        this.sendRetryCount++;
        setTimeout(() => {
          this.send(data);
        }, this.sendRetryCount * 500);
      }
    }
  }


<script setup>
import SocketService from "../websocket/SocketService.js"
import { reactive } from "vue";

const data = reactive({
  socketServe: SocketService.Instance,
});

const sendData = () => {
  data.socketServe.send({
    "type": "SoilFertilityAnalysisTask",
    "taskId": "002"
  });
  console.log('发送数据');
};

const buildWS = () => {
  SocketService.Instance.connect();
  data.socketServe = SocketService.Instance;
  data.socketServe.registerCallBack('callback1', data.socketServe);
  setTimeout(() => {
    sendData()
  }, 1000)
}
</script>

<template>
  <div>
    <button @click="buildWS">运行</button>
  </div>
</template>

<style scoped>
.read-the-docs {
  color: #888;
}
</style>


效果展示如下:

使用WebSocket和RabbitMQ实现任务进度推送

0条评论

您的电子邮件等信息不会被公开,以下所有项均必填

OK! You can skip this field.