别再滥用 WebSocket!Spring ResponseBodyEmitter 才是某些场景的银弹

Source

嘿,EveryDeveopers,服务器如何实时地将数据推送给客户端?

一提到这个,很多人的第一反应可能是:“上 WebSocket!”。没错,WebSocket 功能强大,提供了全双工通信,非常适合做聊天室、在线游戏这类应用。

但它就像一把牛刀,你总不想用它来切水果吧?在很多“服务器到客户端”的单向推送场景中,引入 WebSocket 不仅增加了复杂性,还可能带来不必要的资源开销。 

那么,有没有更轻量、更原生的解决方案呢?当然有!ResponseBodyEmitter,就是 Spring 框架为我们提供的一把锋利的“水果刀”。它允许我们在一个 HTTP 连接上,异步、分块地向客户端持续推送数据。

准备好了吗?让我们发车!

一. ResponseBodyEmitter 是什么?它能解决什么问题?

简单来说,ResponseBodyEmitter 是 Spring MVC 的一个组件,从Spring4.2 就开始提供了。

它允许控制器方法以流式的方式,分多次向响应体(Response Body)写入数据。

想象一下这个流程:

  • 客户端发起一个普通的 HTTP 请求。

  • 服务器接受请求,但并不立即关闭连接

  • 服务器的控制器方法返回一个 ResponseBodyEmitter 对象,此时 HTTP 连接保持打开状态。

  • 服务器在后台进行一些耗时的操作(比如处理文件、查询数据库、调用第三方服务)。

  • 每当后台任务产生一块新的数据,就通过 ResponseBodyEmitter 将这块数据“发射”给客户端。

  • 所有数据都发射完毕后,服务器关闭连接。

这种模式非常适合以下场景:

  • 实时进度更新:比如文件上传/下载进度、数据处理任务的完成百分比。用户可以实时看到任务状态,而不是面对一个无尽的加载圈。

  • 流式传输大量数据:当需要返回一个巨大的数据集(如大型报表、日志文件)时,一次性加载到内存再返回可能会导致服务器内存溢出。使用 ResponseBodyEmitter 可以一块一块地读取和发送,极大地降低了内存压力。

  • 简单的实时数据推送:比如体育比赛的实时比分、股票价格的更新。虽然不如 WebSocket 实时,但对于秒级更新的场景已经足够,且实现简单得多。

二. 它是如何工作的?(原理与架构)

为了更好地理解 ResponseBodyEmitter,我们需要了解其背后的 Servlet 3.0 异步请求处理机制。

当一个控制器方法返回 ResponseBodyEmitter 时,Spring MVC 会启动异步处理。它会从应用服务器(如 Tomcat)的线程池中释放当前的工作线程,但保持 HTTP 连接不关闭。然后,Spring 会将 ResponseBodyEmitter 对象保存在内存中,并允许你在另一个线程中调用它的 send() 方法来发送数据。

图接:

核心要点

  • 异步非阻塞:控制器方法迅速返回,释放了 Tomcat 的主工作线程,使其可以去处理其他请求,提高了服务器的吞吐能力。

  • 长连接:利用了 HTTP 的长连接特性,在一个请求-响应周期内持续推送数据。

  • 线程分离:数据生成和数据发送在不同的线程中,实现了逻辑解耦。

三. 上手实战:Spring Boot 3 代码示例

光说不练假把式。我们来用 Spring Boot 3 构建一个简单的例子:模拟一个耗时 5 秒的任务,每秒向客户端推送一次进度。

3.1:项目搭建

确保你有一个 Spring Boot 3 项目,并加入了 spring-boot-starter-web 依赖。

pom.xml:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
</dependencies>

3.2:创建控制器 (Controller)

StreamingController.java:

package com.example.emitterdemo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@RestController
public class StreamingController {

    private static final Logger logger = LoggerFactory.getLogger(StreamingController.class);

    // 使用一个线程池来处理任务,避免每次请求都创建新线程
    private final ExecutorService executor = Executors.newSingleThreadExecutor();

    @GetMapping(path = "/progress", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public ResponseBodyEmitter streamProgress() {
        // 1. 创建 Emitter,可以设置一个超时时间
        final ResponseBodyEmitter emitter = new ResponseBodyEmitter(60_000L); // 60秒超时

        // 2. 提交一个任务到线程池
        executor.execute(() -> {
            try {
                for (int i = 1; i <= 5; i++) {
                    // 模拟耗时操作
                    Thread.sleep(1000);

                    // 3. 发送数据,可以是一个字符串或一个对象(需要配置 HttpMessageConverter)
                    String message = "Processing step " + i + "/5 ...\n";
                    emitter.send(message, MediaType.TEXT_PLAIN);
                    logger.info("Sent: {}", message.trim());
                }
                // 4. 所有数据发送完毕,调用 complete
                emitter.complete();
                logger.info("Emitter completed.");
            } catch (IOException e) {
                // IO 异常,通常是客户端断开了连接
                logger.error("IOException, client might have disconnected. {}", e.getMessage());
                emitter.completeWithError(e);
            } catch (InterruptedException e) {
                logger.error("Task interrupted. {}", e.getMessage());
                Thread.currentThread().interrupt();
                emitter.completeWithError(e);
            }
        });

        // 注册回调,在 Emitter 完成(complete/completeWithError)或超时(timeout)时触发
        emitter.onCompletion(() -> logger.info("Emitter onCompletion callback."));
        emitter.onTimeout(() -> logger.warn("Emitter onTimeout callback."));
        emitter.onError(e -> logger.error("Emitter onError callback.", e));

        logger.info("Controller method finished, emitter returned.");
        return emitter;
    }
}

代码解析

  • 我们创建了一个 ResponseBodyEmitter 并设置了 60 秒的超时时间。如果 60 秒内没有任何数据发送,连接会自动断开。

  • 我们使用 ExecutorService 来管理后台线程。这是一个好习惯,可以避免资源耗尽。

  • 在后台任务中,我们循环 5 次,每次 sleep(1000) 模拟耗时,然后调用 emitter.send() 发送进度信息。

  • 任务结束后,必须调用 emitter.complete() 来正常关闭连接。如果发生异常,调用 emitter.completeWithError()

  • onCompletion, onTimeout, onError 是非常有用的回调,可以帮助我们记录日志和清理资源。

3.3:创建客户端 (HTML + JavaScript)

现在,我们需要一个前端页面来接收并展示这些流式数据。这里我们使用现代浏览器都支持的 fetch API。

index.html:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>ResponseBodyEmitter Demo</title>
    <style>
        body { font-family: sans-serif; padding: 20px; }
        #progress-container { border: 1px solid #ccc; padding: 10px; min-height: 100px; background-color: #f9f9f9; }
    </style>
</head>
<body>
    <h1>Task Progress</h1>
    <button id="startButton">Start Task</button>
    <div id="progress-container">
        <p>Click "Start Task" to see the progress.</p>
    </div>

    <script>
        const startButton = document.getElementById('startButton');
        const progressContainer = document.getElementById('progress-container');

        startButton.addEventListener('click', async () => {
            progressContainer.innerHTML = '<p>Starting task...</p>';
            startButton.disabled = true;

            try {
                // 1. 使用 fetch API 发起请求
                const response = await fetch('/progress');

                if (!response.ok) {
                    throw new Error(`HTTP error! status: ${response.status}`);
                }

                // 2. 获取一个 ReadableStream 的 reader
                const reader = response.body.getReader();
                const decoder = new TextDecoder(); // 用于将 Uint8Array 解码成字符串

                while (true) {
                    // 3. 读取数据块
                    const { done, value } = await reader.read();

                    if (done) {
                        console.log('Stream finished.');
                        progressContainer.innerHTML += '<p><strong>Task complete!</strong></p>';
                        break;
                    }

                    // 4. 解码并追加到 UI
                    const chunk = decoder.decode(value, { stream: true });
                    console.log('Received chunk:', chunk);
                    progressContainer.innerHTML += `<p>${chunk}</p>`;
                }

            } catch (error) {
                console.error('Error fetching stream:', error);
                progressContainer.innerHTML += `<p style="color: red;">Error: ${error.message}</p>`;
            } finally {
                startButton.disabled = false;
            }
        });
    </script>
</body>
</html>

代码解析

  • fetch('/progress') 发起请求。

  • response.body 是一个 ReadableStream 对象。我们通过 getReader() 获取读取器。

  • 在一个 while(true) 循环中,我们反复调用 reader.read()。这个方法会返回一个 Promise,当服务器发送新数据时,Promise 会 resolve 并带上数据块 value

  • value 是一个 Uint8Array,我们需要用 TextDecoder 将它转换成可读的字符串。

  • 当服务器调用 emitter.complete() 后,reader.read() 返回的对象的 done 属性会变为 true,我们就可以退出循环了。

现在,运行你的 Spring Boot 应用,访问 http://localhost:8080/index.html,点击按钮,你就能看到进度条实时更新了!

四. 技术选型对比:Emitter vs. The WebSocket

了解了 ResponseBodyEmitter 的用法后,一个关键问题来了:我应该在什么时候用它?它和 SseEmitterWebSocket 以及传统的异步方法有什么不同?

图解时刻:

特性

ResponseBodyEmitter

SseEmitter (Server-Sent Events)

WebSocket

传统 @Async

通信方向

单向 (服务器 -> 客户端)

单向 (服务器 -> 客户端)

双向 (全双工)

单向 (一次性响应)

底层协议

HTTP/1.1

HTTP/1.1

WebSocket (ws/wss)

HTTP/1.1

客户端 API

fetch API (需要手动解析)

EventSource (浏览器原生支持)

WebSocket API

标准 fetch / XHR

自动重连

否,需手动实现

(浏览器标准)

否,需手动实现

不适用

事件类型

否,数据格式自定义

(可定义 event name)

否,消息格式自定义

不适用

复杂度

极低

最佳场景

自定义格式的流式数据,进度更新

标准化的事件推送,新闻 feed,通知

实时聊天,在线游戏,协同编辑

后台耗时任务,返回单个完整结果

ResponseBodyEmitter vs. SseEmitter

SseEmitterResponseBodyEmitter 的一个子类。它专门用于实现 HTML5 Server-Sent Events (SSE) 规范。

SSE 的好处是浏览器有原生支持的 EventSource API,它会自动处理重连、消息解析等问题,前端代码更简洁。

结论

如果你需要的是标准的事件流,并且不介意遵循 SSE 的数据格式(data: ...\n\n),优先选择 SseEmitter

如果你的数据格式非常特殊,或者想完全控制传输的每一个字节,那么 ResponseBodyEmitter 提供了更大的灵活性。

ResponseBodyEmitter vs. WebSocket
  • 方向性:Emitter 是单向的,WebSocket 是双向的。如果你需要客户端也频繁地向服务器发送消息,请直接选择 WebSocket。

  • 协议:Emitter 跑在标准的 HTTP 之上,对网络防火墙和代理更友好。WebSocket 是一个独立的协议,有时可能会被公司网络策略限制。

  • 开销:建立 WebSocket 连接的握手过程比普通 HTTP 请求更复杂。对于只需要服务器推送的简单场景,Emitter 更轻量。

结论:杀鸡焉用牛刀。对于只需要服务器向客户端单向推送信息的场景(如进度条、数据流),ResponseBodyEmitterSseEmitter 是更简单、更合适的选择。

ResponseBodyEmitter vs. 传统 @Async

一个用 @Async 注解的控制器方法,通常返回一个 CompletableFuture<V>DeferredResult<V>。它的目的是不阻塞请求处理线程,但它最终仍然只会返回一个完整的 HTTP 响应。而 ResponseBodyEmitter 的目的是在一个连接中返回多个数据块

结论@Async 用于优化单个、耗时响应的吞吐量。ResponseBodyEmitter 用于实现流式、分块的响应。两者解决的问题维度不同。

5. thinking

ResponseBodyEmitter 是 Spring 工具箱中一件被低估的利器。它为我们提供了一种优雅、高效的方式来处理服务器到客户端的流式数据推送,而无需引入 WebSocket 的复杂性。:

  • ResponseBodyEmitter 适用于进度更新、大数据流式传输等单向推送场景。

  • 其核心是利用 Servlet 3.0 的异步特性,在后台线程中向一个保持开放的 HTTP 连接发送数据。

  • 结合 Spring Boot 3 和 fetch API,我们可以轻松地实现一个完整的推流应用。

  • 在技术选型时,应根据通信方向客户端支持业务复杂度来决定使用 Emitter、SSE 还是 WebSocket。

so,下次当你的产品经理提出“我要一个实时更新的进度条”时,别急着说“好的,我需要搭个 WebSocket 服务”。先想一想,ResponseBodyEmitter 这把轻巧而锋利的“水果刀”,是不是更适合这个任务呢?

Having so funny, Happy coding!