Springboot集成websocket实现消息推送
假设有个需求需要多个用户同时在对应的消息面板实时查看相关接口的执行流程进度,此时可以可考虑使用websocket来实现结果进度推送
一、引入websocket依赖,并编写WebSocket配置类
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;/*** WebSocket配置类*/
@Configuration
public class WebSocketConfig {/*** 注入ServerEndpointExporter,该Bean会自动注册使用@ServerEndpoint注解申明的websocket endpoint*/@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}
}
二、编写WebSocket服务类,用户和客户端进行交互
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;/*** WebSocket服务*/
@ServerEndpoint("/ws/{uid}")
@Component
@Slf4j
public class WebSocketServer {/*** 当前在线连接数*/private static final AtomicInteger onlineNum = new AtomicInteger(0);/*** 存放每个客户端对应的Session*/private static final Map<String, Session> SESSION_POOLS = new ConcurrentHashMap<>();/*** 建立连接** @param session 当前会话* @param uid 用户id*/@OnOpenpublic void openConnection(Session session, @PathParam(value = "uid") String uid) {SESSION_POOLS.put(uid, session);onlineNum.incrementAndGet();log.info(uid + "建立连接! 当前连接数为: {}", onlineNum);}/*** 关闭连接** @param uid 用户id*/@OnClosepublic void closeConnection(@PathParam(value = "uid") String uid) {SESSION_POOLS.remove(uid);int cnt = onlineNum.decrementAndGet();log.info(uid + "断开连接, 当前连接数为:{}", cnt);}/*** 消息发送** @param session 当前会话* @param message 消息内容* @throws IOException IO异常*/public void sendMessage(Session session, String message) throws IOException {if (session != null && session.isOpen()) {synchronized (session) {session.getBasicRemote().sendText(message);}}}/*** 消息广播** @param message 消息内容*/public void broadCastMessage(String message) {SESSION_POOLS.forEach((k, v) -> {if (v.isOpen()) {try {sendMessage(v, message);} catch (IOException e) {throw new RuntimeException(e);}}});}/*** 发生错误** @param uid 用户id* @param throwable 错误信息*/@OnErrorpublic void errorConnection(@PathParam(value = "uid") String uid, Throwable throwable) {log.error("用户: " + uid + " 发生错误", throwable);}
}
三、编写controller模拟接口执行进度监控
import com.yx.analyze.demo.component.WebSocketServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.concurrent.TimeUnit;/*** WebSocket接口控制器*/
@RestController
@RequestMapping("/ws")
@Slf4j
public class WebSocketController {@Autowiredprivate WebSocketServer webSocketServer;/*** 管理员广播消息,模拟某个接口执行的进度消息推送** @param id 用户id* @param message 消息内容*/@PostMapping(value = "/broadCast")public void broadCast(String id, String message) {//模拟管理员身份判断if ("admin".equals(id)) {try {webSocketServer.broadCastMessage(id + ":" + "监控接口开始执行...");webSocketServer.broadCastMessage(id + ":" + "监控接口开始校验...");TimeUnit.SECONDS.sleep(1);webSocketServer.broadCastMessage(id + ":" + "监控接口开始取数: 获取到1024条记录");TimeUnit.SECONDS.sleep(1);webSocketServer.broadCastMessage(id + ":" + "监控接口开始处理数据...");TimeUnit.SECONDS.sleep(1);webSocketServer.broadCastMessage(id + ":" + message);webSocketServer.broadCastMessage(id + ":" + "数据处理完毕...");} catch (Exception e) {log.error("监控接口消息发生异常: ", e);}}}
}
四、使用在线websocket连接工具进行测试