使用 SSE + WebFlux 推送日志信息到前端
为什么使用 SSE 而不使用 WebSocket, 请看 SEE 对比 Websocket 的优缺点。
特性 | SSE | WebSocket |
---|---|---|
通信方向 | 单向(服务器→客户端) | 双向(全双工) |
协议 | 基于 HTTP | 独立协议(需 ws:// 前缀) |
兼容性 | 现代浏览器(IE 不支持) | 广泛支持 |
复杂度 | 简单(服务器只需返回流数据) | 较复杂(需处理握手、帧协议等) |
适用场景 | 实时推送(日志、通知、新闻) | 双向交互(聊天、实时协作) |
我的架构如下:
前端
后端中间件
后端服务
[前端] → 打开模态框 → 发起SSE连接 → [后端中间件] → 转发请求 → [后端服务] → 查询日志信息
← 实时日志推送 ← (SSE流) ← 捕获进程输出流 ← 返回实时日志流
前端html代码:
前端代码使用 bootstrap + jquery
<a href='#' data-bs-toggle='modal' data-bs-target='#logModal'>日志</a><!-- Modal -->
<div class="modal fade" id="logModal" tabindex="-1" aria-labelledby="logModalLabel" aria-hidden="true"><div class="modal-dialog modal-fullscreen"><div class="modal-content"><div class="modal-header"><h1 class="modal-title fs-5" id="logModalLabel">Modal title</h1><button type="button" class="btn-close" data-bs-dismiss="modal" aria-label="Close"></button></div><div class="modal-body" id="logContent">...</div><div class="modal-footer"><button type="button" class="btn btn-secondary" data-bs-dismiss="modal">Close</button><button type="button" class="btn btn-primary">Save changes</button></div></div></div>
</div>
前端js代码:
$("#logModal").on('shown.bs.modal', function (){const eventSource = new EventSource("http://localhost:9998/middle/logs");const logContent = $('#logContent');eventSource.onmessage = function (event){const logLine = $('<div>').text(event.data)logContent.append(logLine);logContent[0].scrollTop = logContent[0].scrollHeight; // 自动滚动到底部};eventSource.onerror = function (error ) {console.log('日志连接失败: ', error);eventSource.close();logContent.text('日志获取失败,请检查容器常或网络连接')}$("#logModal").on('hide.bs.modal', function (){eventSource.close();logContent.empty(); // 清空日志})
})
后端中间件需要引入 webflux 的依赖:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
后端中间件代码:
controller:
@GetMapping(value = "/middle/logs", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter getDockerLogs(){return publishServerService.getDockerLogs();
}
service:
private final WebClient webClient = WebClient.builder().clientConnector(new ReactorClientHttpConnector(HttpClient.create())).build();
private final ExecutorService executorService = Executors.newSingleThreadExecutor();public SseEmitter getDockerLogs() {String url = "http://127.0.0.1:9999/service/logs";SseEmitter emitter = new SseEmitter();executorService.submit(() -> {try {webClient.get().uri(url).retrieve().bodyToFlux(String.class).subscribe(log -> {try {emitter.send(SseEmitter.event().data(log)); // 发送消息到前端} catch (IOException e) {emitter.completeWithError(e);}},error -> emitter.completeWithError(error),emitter::complete);} catch (Exception e) {emitter.completeWithError(e);}});return emitter;
}
后端服务代码:
controller:
@GetMapping(value = "/service/logs", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public SseEmitter getDockerLogs() {return publishServerService.getDockerLogs();}
service:
private final ExecutorService executorService = Executors.newSingleThreadScheduledExecutor();public SseEmitter getDockerLogs() {SseEmitter emitter = new SseEmitter();executorService.submit(() -> {try {int i = 0;while (i < 100000000) {try {emitter.send(SseEmitter.event().data(i)); // 发送消息到后端中间件Thread.sleep(1000);i++;} catch (InterruptedException e) {throw new RuntimeException(e);}}emitter.complete();} catch (IOException e) {throw new RuntimeException(e);}});return emitter;
}
提示:你可以把 i 换成真实的应用程序日志,如下:
public SseEmitter getDockerLogs() {SseEmitteremitter = new SseEmitter();executorService.submit(() -> {try {Process process = Runtime.getRuntime().exec("docker logs -f nginx");InputStreamReader inputStreamReader = new InputStreamReader(process.getInputStream());BufferedReader reader = new BufferedReader(inputStreamReader);String line;while ((line = reader.readLine()) != null) {emitter.send(SseEmitter.event().data(line));}emitter.complete();} catch (IOException e) {throw new RuntimeException(e);}});return emitter;
}
最终前端显示效果如下:
到此前端就可以实时的获取后端日志在页面中显示了。
但是你会发现后端控制台会时不时报错,错误信息如下:
2025-04-24T11:01:12.488Z WARN 26616 --- [nio-9990-exec-2] .w.s.m.s.DefaultHandlerExceptionResolver : Ignoring exception, response committed. : org.springframework.web.context.request.async.AsyncRequestTimeoutException
2025-04-24T11:01:12.489Z WARN 26616 --- [nio-9990-exec-2] .w.s.m.s.DefaultHandlerExceptionResolver : Resolved [org.springframework.web.context.request.async.AsyncRequestTimeoutException]org.springframework.web.context.request.async.AsyncRequestTimeoutExceptionat org.springframework.web.context.request.async.TimeoutDeferredResultProcessingInterceptor.handleTimeout(TimeoutDeferredResultProcessingInterceptor.java:42)at org.springframework.web.context.request.async.DeferredResultInterceptorChain.triggerAfterTimeout(DeferredResultInterceptorChain.java:81)at org.springframework.web.context.request.async.WebAsyncManager.lambda$startDeferredResultProcessing$5(WebAsyncManager.java:442)at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)at org.springframework.web.context.request.async.StandardServletAsyncWebRequest.onTimeout(StandardServletAsyncWebRequest.java:154)at org.apache.catalina.core.AsyncListenerWrapper.fireOnTimeout(AsyncListenerWrapper.java:44)at org.apache.catalina.core.AsyncContextImpl.timeout(AsyncContextImpl.java:135)at org.apache.catalina.connector.CoyoteAdapter.asyncDispatch(CoyoteAdapter.java:135)at org.apache.coyote.AbstractProcessor.dispatch(AbstractProcessor.java:243)at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:57)at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:896)at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1744)at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:52)at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1191)at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659)at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)javaweb 报错如下:
2025-04-24T11:02:41.538Z WARN 26653 --- [nio-9991-exec-1] .w.s.m.s.DefaultHandlerExceptionResolver : Ignoring exception, response committed. : org.springframework.web.context.request.async.AsyncRequestTimeoutException
2025-04-24T11:02:41.538Z WARN 26653 --- [nio-9991-exec-1] .w.s.m.s.DefaultHandlerExceptionResolver : Resolved [org.springframework.web.context.request.async.AsyncRequestTimeoutException]
2025-04-24T11:02:42.480Z ERROR 26653 --- [or-http-epoll-2] reactor.core.publisher.Operators : Operator called default onErrorDroppedjava.lang.IllegalStateException: The response object has been recycled and is no longer associated with this facadeat org.apache.catalina.connector.ResponseFacade.checkFacade(ResponseFacade.java:478) ~[tomcat-embed-core-10.1.16.jar!/:na]at org.apache.catalina.connector.ResponseFacade.isFinished(ResponseFacade.java:154) ~[tomcat-embed-core-10.1.16.jar!/:na]at org.apache.catalina.connector.ResponseFacade.flushBuffer(ResponseFacade.java:240) ~[tomcat-embed-core-10.1.16.jar!/:na]at org.springframework.http.server.ServletServerHttpResponse.flush(ServletServerHttpResponse.java:104) ~[spring-web-6.0.14.jar!/:6.0.14]at org.springframework.http.server.DelegatingServerHttpResponse.flush(DelegatingServerHttpResponse.java:61) ~[spring-web-6.0.14.jar!/:6.0.14]at org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitterReturnValueHandler$HttpMessageConvertingHandler.complete(ResponseBodyEmitterReturnValueHandler.java:231) ~[spring-webmvc-6.0.14.jar!/:6.0.14]at org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter.complete(ResponseBodyEmitter.java:266) ~[spring-webmvc-6.0.14.jar!/:6.0.14]at reactor.core.publisher.LambdaSubscriber.onComplete(LambdaSubscriber.java:132) ~[reactor-core-3.5.12.jar!/:3.5.12]at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onComplete(MonoFlatMapMany.java:260) ~[reactor-core-3.5.12.jar!/:3.5.12]at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2205) ~[reactor-core-3.5.12.jar!/:3.5.12]at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onComplete(FluxOnAssembly.java:549) ~[reactor-core-3.5.12.jar!/:3.5.12]at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.onComplete(FluxConcatMapNoPrefetch.java:240) ~[reactor-core-3.5.12.jar!/:3.5.12]at reactor.core.publisher.DrainUtils.postComplete(DrainUtils.java:183) ~[reactor-core-3.5.12.jar!/:3.5.12]at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onComplete(FluxBufferPredicate.java:356) ~[reactor-core-3.5.12.jar!/:3.5.12]at reactor.core.publisher.FluxPeekFuseable$PeekFuseableConditionalSubscriber.onComplete(FluxPeekFuseable.java:595) ~[reactor-core-3.5.12.jar!/:3.5.12]at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onComplete(FluxMapFuseable.java:350) ~[reactor-core-3.5.12.jar!/:3.5.12]at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onComplete(FluxContextWrite.java:126) ~[reactor-core-3.5.12.jar!/:3.5.12]at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:128) ~[reactor-core-3.5.12.jar!/:3.5.12]at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:230) ~[reactor-core-3.5.12.jar!/:3.5.12]at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:371) ~[reactor-core-3.5.12.jar!/:3.5.12]at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:724) ~[reactor-core-3.5.12.jar!/:3.5.12]at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onComplete(FluxFlattenIterable.java:273) ~[reactor-core-3.5.12.jar!/:3.5.12]at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144) ~[reactor-core-3.5.12.jar!/:3.5.12]at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260) ~[reactor-core-3.5.12.jar!/:3.5.12]at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144) ~[reactor-core-3.5.12.jar!/:3.5.12]at reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:483) ~[reactor-netty-core-1.1.13.jar!/:1.1.13]at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:275) ~[reactor-netty-core-1.1.13.jar!/:1.1.13]at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:419) ~[reactor-netty-core-1.1.13.jar!/:1.1.13]at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:446) ~[reactor-netty-core-1.1.13.jar!/:1.1.13]at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:500) ~[reactor-netty-core-1.1.13.jar!/:1.1.13]at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:768) ~[reactor-netty-http-1.1.13.jar!/:1.1.13]at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:114) ~[reactor-netty-core-1.1.13.jar!/:1.1.13]at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.101.Final.jar!/:4.1.101.Final]at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.101.Final.jar!/:4.1.101.Final]at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.101.Final.jar!/:4.1.101.Final]at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) ~[netty-transport-4.1.101.Final.jar!/:4.1.101.Final]at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[netty-codec-4.1.101.Final.jar!/:4.1.101.Final]at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[netty-codec-4.1.101.Final.jar!/:4.1.101.Final]at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) ~[netty-transport-4.1.101.Final.jar!/:4.1.101.Final]at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[netty-transport-4.1.101.Final.jar!/:4.1.101.Final]at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.101.Final.jar!/:4.1.101.Final]at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.101.Final.jar!/:4.1.101.Final]at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.101.Final.jar!/:4.1.101.Final]at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.101.Final.jar!/:4.1.101.Final]at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.101.Final.jar!/:4.1.101.Final]at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.101.Final.jar!/:4.1.101.Final]at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800) ~[netty-transport-classes-epoll-4.1.101.Final.jar!/:4.1.101.Final]at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:509) ~[netty-transport-classes-epoll-4.1.101.Final.jar!/:4.1.101.Final]at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407) ~[netty-transport-classes-epoll-4.1.101.Final.jar!/:4.1.101.Final]at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.101.Final.jar!/:4.1.101.Final]at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.101.Final.jar!/:4.1.101.Final]at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.101.Final.jar!/:4.1.101.Final]at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
这是因为 SpringMVC 默认设置了 30 秒超时时间,你需要修改的长一点,具体多少由你自己定义。后端中间件和后端服务都需要配置。
错误分析
1. AsyncRequestTimeoutException
此错误表明异步请求超时。在 Spring 里,当一个异步请求在规定的时间内没有完成时,就会抛出该异常。默认情况下,Spring 的异步请求超时时间是 30 秒。在 SSE 场景中,因为要保持长连接以实现实时数据推送,所以很容易超出这个时间限制。
2. IllegalStateException
该错误显示响应对象已被回收,不再和当前的请求关联。这通常是在异步请求超时后,响应对象被关闭或者回收,而代码还尝试对其进行操作时发生的。
解决办法
1. 增加异步请求超时时间
你可以通过配置 WebMvcConfigurer 来增加异步请求的超时时间,避免因超时引发异常。
在 后端中间件 和 后端服务 中添加如下配置类:
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.AsyncSupportConfigurer;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;@Configuration
class LogWebConfig implements WebMvcConfigurer {@Overridepublic void configureAsyncSupport(AsyncSupportConfigurer configurer) {// 设置异步请求超时时间为 3600 秒(1 小时)configurer.setDefaultTimeout(3600 * 1000); }
}
扩展:
我通过前端传递参数(容器id)给后端执行,我发现了一个奇怪的问题。
我第一次打开模态框,可以正常的实时显示日志。但是我点击 close 的时候,这段代码确实执行了。
$("#logModal").on('hide.bs.modal', function (){eventSource.close();logContent.empty(); // 清空日志
})
但是我再次打开模态框,发现没有日志显示了,我甚至刷新页面,然后再次打开模态框依然没有日志显示,只有我把后端服务重启之后,再次打开模态框,才有日志显示,这是为什么?
这个问题可能是由于EventSource 实例未正确释放或重复创建导致的资源冲突。以下是具体分析和解决方案:
问题原因分析
1. EventSource 作用域问题:
在当前代码中,eventSource变量是在shown.bs.modal事件处理函数的闭包中创建的。当模态框关闭时,虽然通过hide.bs.modal事件调用了eventSource.close(),但闭包中的变量可能未被正确释放,导致再次打开模态框时,旧的eventSource实例残留或引用错误。
2. 事件绑定重复问题:
每次打开模态框时,shown.bs.modal事件处理函数会重复绑定hide.bs.modal事件,导致多个处理函数共存,可能混淆不同实例的eventSource。
3. 后端连接未正确断开:
若后端未正确处理EventSource的断开(如未关闭响应流),可能导致端口或连接资源被占用,后续请求无法建立新连接。
解决方案:
使用模态框数据存储 EventSource 实例
修改代码,将eventSource实例存储在模态框元素的data属性中,确保每次关闭时正确释放当前实例:
// 修改后的模态框显示事件处理函数
$("#logModal").on('shown.bs.modal', function (event) {const trigger = $(event.relatedTarget);const itemStr = trigger.data('item');if (!itemStr) return;const item = JSON.parse(itemStr);const modal = $(this); // 获取当前模态框实例// 先关闭可能存在的旧连接const oldEventSource = modal.data('eventSource');if (oldEventSource) {oldEventSource.close();modal.removeData('eventSource');}// 创建新的EventSource并存储到模态框数据中const eventSource = new EventSource(`http://localhost:9998/middle/logs?containerId=${item.containerId}&ip=${item.ip}`);modal.data('eventSource', eventSource);const logContent = $('#logContent');logContent.empty();eventSource.onmessage = function (event) {const logLine = $('<div>').text(event.data);logContent.append(logLine);logContent[0].scrollTop = logContent[0].scrollHeight;};eventSource.onerror = function (error) {console.log('日志连接失败: ', error);eventSource.close();logContent.text('日志获取失败,请检查容器状态或网络连接');modal.removeData('eventSource'); // 清除错误状态的实例};
});// 修改后的模态框隐藏事件处理函数
$("#logModal").on('hide.bs.modal', function () {const modal = $(this);const eventSource = modal.data('eventSource');if (eventSource) {eventSource.close(); // 关闭当前实例modal.removeData('eventSource'); // 清除数据引用}$('#logContent').empty(); // 清空日志
});
其实不是上面这个问题,经过再次排查,我发现了问题所在,当我的后端服务的代码这样写的时候,就会出现前面说的问题。
public SseEmitter getDockerLogs(PublishServerDto publishServerDto) {SseEmitter emitter = new SseEmitter();executorService.submit(() -> {try {Process process = Runtime.getRuntime().exec("docker logs -f " + publishServerDto.getContainerId());InputStreamReader inputStreamReader = new InputStreamReader(process.getInputStream());BufferedReader reader = new BufferedReader(inputStreamReader);String line;while ((line = reader.readLine()) != null) {emitter.send(SseEmitter.event().data(line));}emitter.complete();} catch (IOException e) {emitter.completeWithError(e);}});return emitter;}
当我的后端服务的代码修改成这样的时候。
public SseEmitter getDockerLogs(PublishServerDto publishServerDto) {SseEmitter emitter = new SseEmitter();executorService.submit(() -> {try {int i = 0;while (i < 100000000) {try {emitter.send(SseEmitter.event().data(i)); // 发送消息到后端中间件Thread.sleep(1000);i++;} catch (InterruptedException e) {throw new RuntimeException(e);}}emitter.complete();} catch (IOException e) {emitter.completeWithError(e);}});return emitter;}
前端关闭再打开模态框也能正常显示数据。为什么?
这是因为 docker logs -f 命令会持续阻塞直到进程被终止,而我的后端代码在 EventSource 关闭时未正确终止正在运行的 Process 对象,导致资源占用和连接冲突。
docker logs -f 命令导致线程阻塞,无法响应 SseEmitter 的关闭事件。当前端关闭 EventSource 时,后端的 reader.readLine() 处于阻塞状态,无法感知 SseEmitter 的异常或完成信号,导致 finally 块和异常处理代码无法执行。
另外,
以下是详细分析和解决方案:
问题根源:未终止后台进程
1. docker logs -f 的阻塞特性
-
docker logs -f 会持续读取容器日志并阻塞当前线程,直到容器停止或命令被中断(如按下 Ctrl+C)。
-
当前端关闭 EventSource 时,后端的 SseEmitter 会触发 completeWithError,但 Process 对象(docker logs 进程)仍在后台运行,其输入流被占用,导致:
- 再次打开模态框时,新的 Process 无法创建(端口 / 资源被占用)。
- 旧的 Process 残留数据可能干扰新连接。
2. 模拟数据与真实命令的差异
模拟数据场景: 模拟数据的循环中使用了 Thread.sleep(1000),该方法会响应线程中断(InterruptedException)。当 SseEmitter 关闭时,会抛出异常并中断线程,而 docker logs -f 的阻塞式读取无法响应中断,导致清理逻辑失效。
真实命令场景: docker logs -f 是外部进程,不受 Java 线程控制,EventSource 关闭时未终止该进程,导致资源泄漏。
3. 阻塞式读取的局限性
BufferedReader.readLine() 是阻塞式方法,当 docker logs -f 没有新日志时,线程会一直阻塞在此处,无法处理 SseEmitter 的关闭事件(如 emitter.completeWithError() 或客户端断开连接引发的异常)。可以使用非阻塞式读取或中断机制 解决这个问题。
解决方案:
分离读取逻辑
将日志读取放到独立线程中,避免主线程被 docker logs -f 阻塞,确保能响应 SseEmitter 的关闭事件。
监听 SseEmitter 事件
使用 emitter.onCompletion() 和 emitter.onError() 回调,在连接关闭或出错时执行清理逻辑。
修改后的代码:
service :
// 使用原子引用存储进程对象,确保多线程环境下的可见性和原子性private final AtomicReference<Process> processHolder = new AtomicReference<>();// 使用原子布尔控制日志读取循环,确保线程安全的状态变更private final AtomicBoolean isRunning = new AtomicBoolean(true);public SseEmitter getDockerLogs(PublishServerDto publishServerDto) {SseEmitter emitter = new SseEmitter();// 提交异步任务处理日志流executorService.submit(() -> {try {// 1. 执行docker logs命令,获取容器实时日志String containerId = publishServerDto.getContainerId();Process process = Runtime.getRuntime().exec("docker logs -f " + containerId);processHolder.set(process); // 保存进程引用到原子容器中// 2. 启动独立线程读取日志输入流(避免主线程阻塞)new Thread(() -> {try ( // 使用try-with-resources自动关闭流BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {String line;// 循环读取日志,直到isRunning为false或读取到null(流结束)while (isRunning.get() && (line = reader.readLine()) != null) {// 通过SSE发送日志数据到前端emitter.send(SseEmitter.event().data(line));}} catch (IOException e) {// 读取流时发生异常(如管道关闭),终止进程Process p = processHolder.get();if (p != null) {p.destroy(); // 尝试正常终止进程System.out.println("taskagent - 读取日志流异常: " + e.getMessage());}}}).start();// 3. 监听SSE连接的生命周期事件// 当前端正常关闭连接时触发(如关闭模态框)emitter.onCompletion(() -> {handleShutdown("正常关闭", processHolder.get()); // 统一处理关闭逻辑});// 当SSE连接发生错误时触发(如网络中断)emitter.onError(ex -> {handleShutdown("错误关闭: " + ex.getMessage(), processHolder.get());});// 当SSE连接超时时触发(需配置超时时间,默认30秒)emitter.onTimeout(() -> {handleShutdown("超时关闭", processHolder.get());});} catch (Exception e) {// 初始化进程时发生异常(如命令格式错误)isRunning.set(false);Process p = processHolder.get();if (p != null) p.destroy();System.out.println("taskagent - 初始化docker进程失败: " + e.getMessage());emitter.completeWithError(e); // 通知前端连接失败}});return emitter;}/*** 统一处理进程关闭逻辑* @param reason 关闭原因(用于日志输出)* @param process 待关闭的进程*/private void handleShutdown(String reason, Process process) {isRunning.set(false); // 停止日志读取循环if (process != null) {process.destroy(); // 发送中断信号(等效于Ctrl+C)System.out.println("taskagent - " + reason + ", 已终止docker进程"); // 关键日志输出点}}
现在我发现另一个问题。我的页面有很多容器,首先我打开nginx容器的日志,模态框被打开,实时日志能正常显示在页面上,当我关闭模态框,然后再次打开 nginx的日志,模态框被打开,依然能正常显示实时日志。 关键问题来了,然后我关闭模态框,接着打开 redis 的容器日志,日志没有显示在页面上了。
问题分析:
1. isRunning 标志的重置问题 - 可能
在 handleShutdown 方法里,isRunning 被设置为 false,不过在开启新的日志流时,没有将其重置为 true。这会让后续的日志读取循环无法正常运行。
解决办法:
在开启新的日志流之前,把 isRunning 重置为 true。
2. processHolder 未正确更新 - 可能
当切换容器查看日志时,processHolder 可能还保存着之前的 Process 对象,这会对新的日志读取产生影响。
解决办法:
在开启新的日志流之前,确保 processHolder 被清空。
3. webClient 连接问题(taskweb 端) - 可能
taskweb 端的 webClient 在处理多个连接时,可能会出现连接复用或者资源未正确释放的情况。
解决办法:
确保 webClient 在每次请求结束后都能正确释放资源。可以考虑为每个请求创建独立的 webClient 实例。
4. 前端 EventSource 管理问题 - 可能
前端在切换容器时,EventSource 可能没有正确关闭或者重新创建。
解决办法:
确保在切换容器时,EventSource 被正确关闭并重新创建。你的前端代码已经有了关闭逻辑,不过可以添加一些调试日志来确认是否正常执行。
最终修改的代码如下:
前端js代码:
// 显示实时日志$("#logModal").on('shown.bs.modal', function (event){// 从触发模态框的元素中获取 containerIdconst trigger = $(event.relatedTarget);const itemStr = trigger.data('item');if (!itemStr) {return;}const modal = $(this);let oldEventSource = modal.data('eventSource');if (oldEventSource) {oldEventSource.close();oldEventSource = null;modal.removeData('eventSource');}const item = JSON.parse(itemStr);let eventSource = new EventSource(`http://localhost:9998/middle/logs?containerId=${item.containerId}&ip=${item.ip}`);modal.data('eventSource', eventSource);const logContent = $('#logContent');logContent.empty(); // 清空日志eventSource.onmessage = function (event){const logLine = $('<div>').text(event.data)logContent.append(logLine);logContent[0].scrollTop = logContent[0].scrollHeight;};eventSource.onerror = function (error ) {console.log('日志连接失败: ', error);eventSource.close();eventSource = null; // 确保引用被清除logContent.text('日志获取失败,请检查容器常或网络连接')}})// 监听模态框关闭逻辑$("#logModal").on('hide.bs.modal', function (){const modal = $(this);let eventSource = modal.data('eventSource');if (eventSource) {eventSource.close(); // 关闭当前实例eventSource = null;modal.removeData('eventSource'); // 清除数据引用}$("#logContent").empty(); // 清空日志})
后端中间件代码:
service:
public SseEmitter getDockerLogs(PublishServer publishServer) {String url = taskagentConfig.getPrefixAddress(publishServer.getIp(), taskagentConfig.getGetDockerLogsUrl());SseEmitter emitter = new SseEmitter();executorService.submit(() -> {try {WebClient webClient = WebClient.builder().clientConnector(new ReactorClientHttpConnector(HttpClient.create())).build();webClient.get().uri(url, publishServer.getContainerId()).retrieve().bodyToFlux(String.class).subscribe(log -> {try {emitter.send(SseEmitter.event().data(log));} catch (IOException e) {emitter.completeWithError(e);}},emitter::completeWithError,emitter::complete);} catch (Exception e) {emitter.completeWithError(e);}});return emitter;}
后端服务代码:
// 使用原子应用存储进程对象,确保多线程环境下的可见性和原子性private final AtomicReference<Process> processHolder = new AtomicReference<>();// 使用原子布尔值控制日志读取循环,确保线程安全的状态变更private final AtomicBoolean isRunning = new AtomicBoolean(true);public SseEmitter getDockerLogs(PublishServerDto publishServerDto) {SseEmitter emitter = new SseEmitter();isRunning.set(true);processHolder.set(null); // 清空进程引用// 提交异步任务处理日志流executorService.submit(() -> {try {// 1. 执行 docker logs 命令,获取容器实时日志Process process = Runtime.getRuntime().exec("docker logs -f "+publishServerDto.getContainerId());processHolder.set(process); // 保存进程引用到原子容器中// 2. 启动独立线程读取日志输入流(避免主线程阻塞)new Thread(() -> {// 使用 try-with-resources 自动关闭流资源try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))){String line;// 循环读取日志,直到isRunning为false或读取到null(流结束)while (isRunning.get() && (line = reader.readLine()) != null) {// 通过 SSE 发送日志数据到taskwebemitter.send(SseEmitter.event().data(line));}} catch (IOException e) {// 读取流时发生异常(如管道关闭),终止进程Process p = processHolder.get();if (p != null) {p.destroy(); // 尝试正常终止进程log.error("taskagent - 读取流时发生异常", e.getMessage());}}}).start();// 3. 监听SSE连接的生命周期事件// 当前端正常关闭连接时触发emitter.onCompletion(() -> {// 统一处理关闭逻辑handleShutdown("正常关闭", processHolder.get());});// 当SSE连接发生错误时触发(如网络中断)emitter.onError(ex -> {handleShutdown("错误关闭: "+ex.getMessage(), processHolder.get());});// 当SSE连接超时时触发emitter.onTimeout(() -> {handleShutdown("超时关闭", processHolder.get());});} catch (IOException e) {// 初始化进程时发生异常isRunning.set(false);Process p = processHolder.get();if (p != null) {p.destroy();}log.error("taskagent - 初始化 docker 进程失败:", e.getMessage());emitter.completeWithError(e); // 通知taskweb连接失败}});return emitter;}/*** 统一处理进程关闭逻辑* @param reason 关闭原因(用于日志输出)* @param process 待关闭的进程*/private void handleShutdown(String reason, Process process) {isRunning.set(false); // 停止日志读取循环if (process != null) {process.destroy(); // 发送中断信号(等效于Ctrl+C)log.info("taskagent - " + reason + ", 已终止docker进程");}}
修改成上面之后,前端实时显示日志的效果就全部正常了。