嘿,EveryDeveopers,服务器如何实时地将数据推送给客户端?
一提到这个,很多人的第一反应可能是:“上 WebSocket!”。没错,WebSocket 功能强大,提供了全双工通信,非常适合做聊天室、在线游戏这类应用。
但它就像一把牛刀,你总不想用它来切水果吧?在很多“服务器到客户端”的单向推送场景中,引入 WebSocket 不仅增加了复杂性,还可能带来不必要的资源开销。
那么,有没有更轻量、更原生的解决方案呢?当然有!ResponseBodyEmitter
,就是 Spring 框架为我们提供的一把锋利的“水果刀”。它允许我们在一个 HTTP 连接上,异步、分块地向客户端持续推送数据。
准备好了吗?让我们发车!
一. ResponseBodyEmitter 是什么?它能解决什么问题?
简单来说,ResponseBodyEmitter
是 Spring MVC 的一个组件,从Spring4.2 就开始提供了。
它允许控制器方法以流式的方式,分多次向响应体(Response Body)写入数据。
想象一下这个流程:
-
客户端发起一个普通的 HTTP 请求。
-
服务器接受请求,但并不立即关闭连接。
-
服务器的控制器方法返回一个
ResponseBodyEmitter
对象,此时 HTTP 连接保持打开状态。 -
服务器在后台进行一些耗时的操作(比如处理文件、查询数据库、调用第三方服务)。
-
每当后台任务产生一块新的数据,就通过
ResponseBodyEmitter
将这块数据“发射”给客户端。 -
所有数据都发射完毕后,服务器关闭连接。
这种模式非常适合以下场景:
-
实时进度更新:比如文件上传/下载进度、数据处理任务的完成百分比。用户可以实时看到任务状态,而不是面对一个无尽的加载圈。
-
流式传输大量数据:当需要返回一个巨大的数据集(如大型报表、日志文件)时,一次性加载到内存再返回可能会导致服务器内存溢出。使用
ResponseBodyEmitter
可以一块一块地读取和发送,极大地降低了内存压力。 -
简单的实时数据推送:比如体育比赛的实时比分、股票价格的更新。虽然不如 WebSocket 实时,但对于秒级更新的场景已经足够,且实现简单得多。
二. 它是如何工作的?(原理与架构)
为了更好地理解 ResponseBodyEmitter
,我们需要了解其背后的 Servlet 3.0 异步请求处理机制。
当一个控制器方法返回 ResponseBodyEmitter
时,Spring MVC 会启动异步处理。它会从应用服务器(如 Tomcat)的线程池中释放当前的工作线程,但保持 HTTP 连接不关闭。然后,Spring 会将 ResponseBodyEmitter
对象保存在内存中,并允许你在另一个线程中调用它的 send()
方法来发送数据。
图接:
核心要点:
-
异步非阻塞:控制器方法迅速返回,释放了 Tomcat 的主工作线程,使其可以去处理其他请求,提高了服务器的吞吐能力。
-
长连接:利用了 HTTP 的长连接特性,在一个请求-响应周期内持续推送数据。
-
线程分离:数据生成和数据发送在不同的线程中,实现了逻辑解耦。
三. 上手实战:Spring Boot 3 代码示例
光说不练假把式。我们来用 Spring Boot 3 构建一个简单的例子:模拟一个耗时 5 秒的任务,每秒向客户端推送一次进度。
3.1:项目搭建
确保你有一个 Spring Boot 3 项目,并加入了 spring-boot-starter-web
依赖。
pom.xml
:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
3.2:创建控制器 (Controller)
StreamingController.java
:
package com.example.emitterdemo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@RestController
public class StreamingController {
private static final Logger logger = LoggerFactory.getLogger(StreamingController.class);
// 使用一个线程池来处理任务,避免每次请求都创建新线程
private final ExecutorService executor = Executors.newSingleThreadExecutor();
@GetMapping(path = "/progress", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public ResponseBodyEmitter streamProgress() {
// 1. 创建 Emitter,可以设置一个超时时间
final ResponseBodyEmitter emitter = new ResponseBodyEmitter(60_000L); // 60秒超时
// 2. 提交一个任务到线程池
executor.execute(() -> {
try {
for (int i = 1; i <= 5; i++) {
// 模拟耗时操作
Thread.sleep(1000);
// 3. 发送数据,可以是一个字符串或一个对象(需要配置 HttpMessageConverter)
String message = "Processing step " + i + "/5 ...\n";
emitter.send(message, MediaType.TEXT_PLAIN);
logger.info("Sent: {}", message.trim());
}
// 4. 所有数据发送完毕,调用 complete
emitter.complete();
logger.info("Emitter completed.");
} catch (IOException e) {
// IO 异常,通常是客户端断开了连接
logger.error("IOException, client might have disconnected. {}", e.getMessage());
emitter.completeWithError(e);
} catch (InterruptedException e) {
logger.error("Task interrupted. {}", e.getMessage());
Thread.currentThread().interrupt();
emitter.completeWithError(e);
}
});
// 注册回调,在 Emitter 完成(complete/completeWithError)或超时(timeout)时触发
emitter.onCompletion(() -> logger.info("Emitter onCompletion callback."));
emitter.onTimeout(() -> logger.warn("Emitter onTimeout callback."));
emitter.onError(e -> logger.error("Emitter onError callback.", e));
logger.info("Controller method finished, emitter returned.");
return emitter;
}
}
代码解析:
-
我们创建了一个
ResponseBodyEmitter
并设置了 60 秒的超时时间。如果 60 秒内没有任何数据发送,连接会自动断开。 -
我们使用
ExecutorService
来管理后台线程。这是一个好习惯,可以避免资源耗尽。 -
在后台任务中,我们循环 5 次,每次
sleep(1000)
模拟耗时,然后调用emitter.send()
发送进度信息。 -
任务结束后,必须调用
emitter.complete()
来正常关闭连接。如果发生异常,调用emitter.completeWithError()
。 -
onCompletion
,onTimeout
,onError
是非常有用的回调,可以帮助我们记录日志和清理资源。
3.3:创建客户端 (HTML + JavaScript)
现在,我们需要一个前端页面来接收并展示这些流式数据。这里我们使用现代浏览器都支持的 fetch
API。
index.html
:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>ResponseBodyEmitter Demo</title>
<style>
body { font-family: sans-serif; padding: 20px; }
#progress-container { border: 1px solid #ccc; padding: 10px; min-height: 100px; background-color: #f9f9f9; }
</style>
</head>
<body>
<h1>Task Progress</h1>
<button id="startButton">Start Task</button>
<div id="progress-container">
<p>Click "Start Task" to see the progress.</p>
</div>
<script>
const startButton = document.getElementById('startButton');
const progressContainer = document.getElementById('progress-container');
startButton.addEventListener('click', async () => {
progressContainer.innerHTML = '<p>Starting task...</p>';
startButton.disabled = true;
try {
// 1. 使用 fetch API 发起请求
const response = await fetch('/progress');
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
// 2. 获取一个 ReadableStream 的 reader
const reader = response.body.getReader();
const decoder = new TextDecoder(); // 用于将 Uint8Array 解码成字符串
while (true) {
// 3. 读取数据块
const { done, value } = await reader.read();
if (done) {
console.log('Stream finished.');
progressContainer.innerHTML += '<p><strong>Task complete!</strong></p>';
break;
}
// 4. 解码并追加到 UI
const chunk = decoder.decode(value, { stream: true });
console.log('Received chunk:', chunk);
progressContainer.innerHTML += `<p>${chunk}</p>`;
}
} catch (error) {
console.error('Error fetching stream:', error);
progressContainer.innerHTML += `<p style="color: red;">Error: ${error.message}</p>`;
} finally {
startButton.disabled = false;
}
});
</script>
</body>
</html>
代码解析:
-
fetch('/progress')
发起请求。 -
response.body
是一个ReadableStream
对象。我们通过getReader()
获取读取器。 -
在一个
while(true)
循环中,我们反复调用reader.read()
。这个方法会返回一个 Promise,当服务器发送新数据时,Promise 会 resolve 并带上数据块value
。 -
value
是一个Uint8Array
,我们需要用TextDecoder
将它转换成可读的字符串。 -
当服务器调用
emitter.complete()
后,reader.read()
返回的对象的done
属性会变为true
,我们就可以退出循环了。
现在,运行你的 Spring Boot 应用,访问 http://localhost:8080/index.html
,点击按钮,你就能看到进度条实时更新了!
四. 技术选型对比:Emitter vs. The WebSocket
了解了 ResponseBodyEmitter
的用法后,一个关键问题来了:我应该在什么时候用它?它和 SseEmitter
、WebSocket
以及传统的异步方法有什么不同?
图解时刻:
特性 |
ResponseBodyEmitter |
SseEmitter (Server-Sent Events) |
WebSocket |
传统 @Async |
---|---|---|---|---|
通信方向 |
单向 (服务器 -> 客户端) |
单向 (服务器 -> 客户端) |
双向 (全双工) |
单向 (一次性响应) |
底层协议 |
HTTP/1.1 |
HTTP/1.1 |
WebSocket (ws/wss) |
HTTP/1.1 |
客户端 API |
|
|
|
标准 |
自动重连 |
否,需手动实现 |
是 (浏览器标准) |
否,需手动实现 |
不适用 |
事件类型 |
否,数据格式自定义 |
是 (可定义 event name) |
否,消息格式自定义 |
不适用 |
复杂度 |
低 |
极低 |
高 |
中 |
最佳场景 |
自定义格式的流式数据,进度更新 |
标准化的事件推送,新闻 feed,通知 |
实时聊天,在线游戏,协同编辑 |
后台耗时任务,返回单个完整结果 |
ResponseBodyEmitter vs. SseEmitter
SseEmitter
是 ResponseBodyEmitter
的一个子类。它专门用于实现 HTML5 Server-Sent Events (SSE) 规范。
SSE 的好处是浏览器有原生支持的 EventSource
API,它会自动处理重连、消息解析等问题,前端代码更简洁。
结论:
如果你需要的是标准的事件流,并且不介意遵循 SSE 的数据格式(data: ...\n\n
),优先选择 SseEmitter
。
如果你的数据格式非常特殊,或者想完全控制传输的每一个字节,那么 ResponseBodyEmitter
提供了更大的灵活性。
ResponseBodyEmitter vs. WebSocket
-
方向性:Emitter 是单向的,WebSocket 是双向的。如果你需要客户端也频繁地向服务器发送消息,请直接选择 WebSocket。
-
协议:Emitter 跑在标准的 HTTP 之上,对网络防火墙和代理更友好。WebSocket 是一个独立的协议,有时可能会被公司网络策略限制。
-
开销:建立 WebSocket 连接的握手过程比普通 HTTP 请求更复杂。对于只需要服务器推送的简单场景,Emitter 更轻量。
结论:杀鸡焉用牛刀。对于只需要服务器向客户端单向推送信息的场景(如进度条、数据流),ResponseBodyEmitter
或 SseEmitter
是更简单、更合适的选择。
ResponseBodyEmitter vs. 传统 @Async
一个用 @Async
注解的控制器方法,通常返回一个 CompletableFuture<V>
或 DeferredResult<V>
。它的目的是不阻塞请求处理线程,但它最终仍然只会返回一个完整的 HTTP 响应。而 ResponseBodyEmitter
的目的是在一个连接中返回多个数据块。
结论:@Async
用于优化单个、耗时响应的吞吐量。ResponseBodyEmitter
用于实现流式、分块的响应。两者解决的问题维度不同。
5. thinking
ResponseBodyEmitter
是 Spring 工具箱中一件被低估的利器。它为我们提供了一种优雅、高效的方式来处理服务器到客户端的流式数据推送,而无需引入 WebSocket 的复杂性。:
-
ResponseBodyEmitter
适用于进度更新、大数据流式传输等单向推送场景。 -
其核心是利用 Servlet 3.0 的异步特性,在后台线程中向一个保持开放的 HTTP 连接发送数据。
-
结合 Spring Boot 3 和
fetch
API,我们可以轻松地实现一个完整的推流应用。 -
在技术选型时,应根据通信方向、客户端支持和业务复杂度来决定使用 Emitter、SSE 还是 WebSocket。
so,下次当你的产品经理提出“我要一个实时更新的进度条”时,别急着说“好的,我需要搭个 WebSocket 服务”。先想一想,ResponseBodyEmitter
这把轻巧而锋利的“水果刀”,是不是更适合这个任务呢?
Having so funny, Happy coding!