若依集成WebSocket
导入包:
<dependency>
<groupId>io.reactivex.rxjava3</groupId>
<artifactId>rxjava</artifactId>
<version>3.1.8</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
- security放开websocket的连接
- 设置拦截器,拦截用户信息
@Component
@EnableWebSocketMessageBroker
@Order(Ordered.HIGHEST_PRECEDENCE+99)
public class wsConfig implements WebSocketMessageBrokerConfigurer {
@Autowired
ISysUserService sysUserService;
private static long HEART_BEAT = 10000;
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws")
.setAllowedOrigins("*");
// .setAllowedOriginPatterns("*")
// .withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
ThreadPoolTaskScheduler te = new ThreadPoolTaskScheduler();
te.setPoolSize(1);
te.setThreadNamePrefix("wss-heartbeat-thread");
te.initialize();
registry.enableSimpleBroker("/topic","/queue")
// .setHeartbeatValue()
.setHeartbeatValue(new long[]{HEART_BEAT,HEART_BEAT})
.setTaskScheduler(te);
}
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
ChannelInterceptor interceptor = new ChannelInterceptor(){
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if(accessor == null || accessor.getCommand() == null){
return null;
}
if (StompCommand.CONNECT.equals(accessor.getCommand())){
// 处理连接的请求
String token = accessor.getFirstNativeHeader("Authorization");
SysUser user = sysUserService.selectUserById(1L);
LoginUser loginUser = new LoginUser();
BeanUtils.copyProperties(user,loginUser);
UsernamePasswordAuthenticationToken principal = new UsernamePasswordAuthenticationToken(loginUser, null, Collections.EMPTY_LIST);
// UserContextHolder.set(user);
accessor.setUser(principal);
}else{
// 其他的请求,token只用设置一次
// SysUser user = sysUserService.selectUserById(1L);
// UserContextHolder.set(user);
System.out.println("other请求");
Authentication authentication = (Authentication) accessor.getUser();
LoginUser loginUser = (LoginUser) authentication.getPrincipal();
System.out.println("loginUser111 = " + loginUser);
}
return message;
}
};
registration.interceptors(interceptor);
}
}
- 请求获取用户信息
@MessageMapping("/send")
public void sendMessage(Message<?> message, com.cj.project.ws.model.Message messageBody){
System.out.println("message = " + message);
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
Principal principal = accessor.getUser();
Authentication authentication = (Authentication) principal;
Object user = authentication.getPrincipal();
System.out.println("user = " + user);
System.out.println(user instanceof LoginUser);
if (user instanceof LoginUser){
LoginUser loginUser = (LoginUser) user;
System.out.println("loginUser = " + loginUser);
}
Object payload = message.getPayload();
System.out.println("payload = " + payload);
System.out.println("payload = " + new String((byte[]) payload));
System.out.println("messageBody = " + messageBody);
String messageBodyTo = messageBody.getTo();
System.out.println("messageBodyTo = " + messageBodyTo);
this.pushMessage();
}
心跳推送,send到/heartbeat
引用:https://juejin.cn/post/7345310754470821922
引用:https://blog.csdn.net/print_helloword/article/details/142816204
https://blog.csdn.net/2301_80817413/article/details/139561814
https://blog.csdn.net/qq_45032714/article/details/112261976