一、引入依赖
<dependency><groupId>com.squareup.okhttp3</groupId><artifactId>okhttp</artifactId><version>4.10.0</version>
</dependency>
<dependency><groupId>com.squareup.okhttp3</groupId><artifactId>okhttp-sse</artifactId><version>4.10.0</version>
</dependency>
二、创建 SSE 客户端服务类
import okhttp3.*;
import okhttp3.sse.EventSource;
import okhttp3.sse.EventSourceListener;
import okhttp3.sse.EventSources;
import org.springframework.stereotype.Service;import java.util.concurrent.TimeUnit;@Service
public class SseClientService {private final OkHttpClient okHttpClient;private EventSource eventSource;public SseClientService() {this.okHttpClient = new OkHttpClient.Builder().connectTimeout(10, TimeUnit.SECONDS).readTimeout(0, TimeUnit.SECONDS) .writeTimeout(10, TimeUnit.SECONDS).build();}public void connectToSseServer(String url) {Request request = new Request.Builder().url(url).build();EventSource.Factory factory = EventSources.createFactory(okHttpClient);this.eventSource = factory.newEventSource(request, new EventSourceListener() {@Overridepublic void onOpen(EventSource eventSource, Response response) {System.out.println("SSE连接已建立");}@Overridepublic void onEvent(EventSource eventSource, String id, String type, String data) {System.out.printf("收到事件: id=%s, type=%s, data=%s%n", id, type, data);}@Overridepublic void onClosed(EventSource eventSource) {System.out.println("SSE连接已关闭");}@Overridepublic void onFailure(EventSource eventSource, Throwable t, Response response) {System.err.println("SSE连接失败: " + t.getMessage());reconnect(url);}});}private void reconnect(String url) {try {Thread.sleep(5000); connectToSseServer(url);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}public void closeConnection() {if (eventSource != null) {eventSource.cancel();}okHttpClient.dispatcher().executorService().shutdown();}
}
三、 创建控制器测试
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/sse-client")
public class SseClientController {private final SseClientService sseClientService;public SseClientController(SseClientService sseClientService) {this.sseClientService = sseClientService;}@GetMapping("/connect")public String connect() {sseClientService.connectToSseServer("http://localhost:8080/sse-server/subscribe");return "SSE客户端已启动";}@GetMapping("/disconnect")public String disconnect() {sseClientService.closeConnection();return "SSE客户端已关闭";}
}
四、 高级功能实现
1. 自定义事件处理
@Override
public void onEvent(EventSource eventSource, String id, String type, String data) {switch (type) {case "message":handleMessageEvent(data);break;case "system-alert":handleSystemAlert(data);break;default:handleDefaultEvent(data);}
}
2. 添加认证头
public void connectToSseServerWithAuth(String url, String token) {Request request = new Request.Builder().url(url).header("Authorization", "Bearer " + token).build();
}
3. 心跳检测
@Override
public void onEvent(EventSource eventSource, String id, String type, String data) {if ("heartbeat".equals(type)) {System.out.println("收到心跳: " + data);return;}
}
五、OkHttpConfig单独配置
import okhttp3.OkHttpClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.concurrent.TimeUnit;@Configuration
public class OkHttpConfig {@Beanpublic OkHttpClient okHttpClient() {return new OkHttpClient.Builder().connectTimeout(15, TimeUnit.SECONDS).readTimeout(0, TimeUnit.SECONDS) .writeTimeout(15, TimeUnit.SECONDS).retryOnConnectionFailure(true).build();}
}