当前位置: 首页 > news >正文

Springboot集成websocket实现消息推送

假设有个需求需要多个用户同时在对应的消息面板实时查看相关接口的执行流程进度,此时可以可考虑使用websocket来实现结果进度推送

一、引入websocket依赖,并编写WebSocket配置类

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;/*** WebSocket配置类*/
@Configuration
public class WebSocketConfig {/*** 注入ServerEndpointExporter,该Bean会自动注册使用@ServerEndpoint注解申明的websocket endpoint*/@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}
}

二、编写WebSocket服务类,用户和客户端进行交互

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;/*** WebSocket服务*/
@ServerEndpoint("/ws/{uid}")
@Component
@Slf4j
public class WebSocketServer {/*** 当前在线连接数*/private static final AtomicInteger onlineNum = new AtomicInteger(0);/*** 存放每个客户端对应的Session*/private static final Map<String, Session> SESSION_POOLS = new ConcurrentHashMap<>();/*** 建立连接** @param session 当前会话* @param uid     用户id*/@OnOpenpublic void openConnection(Session session, @PathParam(value = "uid") String uid) {SESSION_POOLS.put(uid, session);onlineNum.incrementAndGet();log.info(uid + "建立连接! 当前连接数为: {}", onlineNum);}/*** 关闭连接** @param uid 用户id*/@OnClosepublic void closeConnection(@PathParam(value = "uid") String uid) {SESSION_POOLS.remove(uid);int cnt = onlineNum.decrementAndGet();log.info(uid + "断开连接, 当前连接数为:{}", cnt);}/*** 消息发送** @param session 当前会话* @param message 消息内容* @throws IOException IO异常*/public void sendMessage(Session session, String message) throws IOException {if (session != null && session.isOpen()) {synchronized (session) {session.getBasicRemote().sendText(message);}}}/*** 消息广播** @param message 消息内容*/public void broadCastMessage(String message) {SESSION_POOLS.forEach((k, v) -> {if (v.isOpen()) {try {sendMessage(v, message);} catch (IOException e) {throw new RuntimeException(e);}}});}/*** 发生错误** @param uid       用户id* @param throwable 错误信息*/@OnErrorpublic void errorConnection(@PathParam(value = "uid") String uid, Throwable throwable) {log.error("用户: " + uid + " 发生错误", throwable);}
}

三、编写controller模拟接口执行进度监控

import com.yx.analyze.demo.component.WebSocketServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.concurrent.TimeUnit;/*** WebSocket接口控制器*/
@RestController
@RequestMapping("/ws")
@Slf4j
public class WebSocketController {@Autowiredprivate WebSocketServer webSocketServer;/*** 管理员广播消息,模拟某个接口执行的进度消息推送** @param id      用户id* @param message 消息内容*/@PostMapping(value = "/broadCast")public void broadCast(String id, String message) {//模拟管理员身份判断if ("admin".equals(id)) {try {webSocketServer.broadCastMessage(id + ":" + "监控接口开始执行...");webSocketServer.broadCastMessage(id + ":" + "监控接口开始校验...");TimeUnit.SECONDS.sleep(1);webSocketServer.broadCastMessage(id + ":" + "监控接口开始取数: 获取到1024条记录");TimeUnit.SECONDS.sleep(1);webSocketServer.broadCastMessage(id + ":" + "监控接口开始处理数据...");TimeUnit.SECONDS.sleep(1);webSocketServer.broadCastMessage(id + ":" + message);webSocketServer.broadCastMessage(id + ":" + "数据处理完毕...");} catch (Exception e) {log.error("监控接口消息发生异常: ", e);}}}
}

四、使用在线websocket连接工具进行测试

相关文章:

  • 【C++教程】C++中为什么优先使用 cout/cin流
  • Windows网络及服务:制作系统盘
  • rk3588上完成halcon的形状模型配准以及和opencv的图像转换
  • 算法 | 成长优化算法(Growth Optimizer,GO)原理,公式,应用,算法改进研究综述,matlab代码
  • 【MySQL】MySQL中的数据类型详解
  • 【前端】【面试】【业务场景】前端如何获取并生成设备唯一标识
  • 使用pyinstaller打包fastapi项目的问题记录
  • 01-初识前端
  • 级联vs端到端、全双工、轮次检测、方言语种、商业模式…语音 AI 开发者都在关心什么?丨Voice Agent 学习笔记
  • 深度学习3.5 图像分类数据集
  • 每日算法-250421
  • Java 并发包核心机制深度解析:锁的公平性、异步调度、AQS 原理全解
  • 【MySQL】:数据库事务管理
  • JavaEE--2.多线程
  • 把dll模块注入到游戏进程的方法_基于文件修改的注入方式
  • MCP:AI时代的“万能插座”,开启大模型无限可能
  • SvelteKit 最新中文文档教程(22)—— 最佳实践之无障碍与 SEO
  • 进程与线程:02 多进程图像
  • 在统信UOS 1060上实现自动关机
  • 高防IP能抵御哪些类型的网络攻击?
  • “五一”假期前多地规范旅游市场:要求明码标价,禁止强迫购物
  • 日媒:日本公明党党首将访华,并携带石破茂亲笔信
  • ESG领跑者|每一步都向前,李宁要让可持续发展成为可持续之事
  • 民建吉林省委提案:当前生育政策集中鼓励多孩生育,应该转变思路
  • 经济日报:锚定重点领域和关键环节,上海浦东谋划高水平对外开放
  • 著名电化学家、我国工业电化学奠基人之一郭鹤桐逝世