Java, 原创, , ,

JAVA 实现SSE(Server-Sent Events)与Nginx转发

最近看到SSE相关文章,主要是考虑替换 前端定时调接口,以及替换单项通信的WebSocket,下面给出实现的例子,扩展原来的MVC项目,使用的 WebFlux实现,这样项目就可以支持普通MVC,又兼容WebFlux 。

POM文件增加webflux


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>

JAVA类 MyController.java,列出了普通接口和 SSE接口, POST的实现方式完全一样

package com.shmashine.api.controller.my;


import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.shmashine.api.service.my.ElevatorService;

import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.enums.SecuritySchemeType;
import io.swagger.v3.oas.annotations.security.SecurityRequirement;
import io.swagger.v3.oas.annotations.security.SecurityScheme;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/**
 * 默认说明
 *
 * @author chenx
 */

@Slf4j
@RestController
@RequiredArgsConstructor(onConstructor_ = {@Autowired})
@RequestMapping("/my")
@SecurityScheme(name = "token", scheme = "bearer", type = SecuritySchemeType.HTTP)
@Tag(name = "坐台相关接口", description = "坐台相关接口 - powered by ChenXue")
public class FujitecController extends BaseRequestEntity {
    private final ElevatorService elevatorService;

    /**
     * 获取座台列表 获取所有座台状态
     */
    @Operation(summary = "获取座台列表 获取所有座台状态", security = {@SecurityRequirement(name = "token")})
    @GetMapping("/get-disposition-stations")
    public ResponseResultDTO getDispositionStations() {
        return ResponseResultDTO.successObj(elevatorService.getDispositionStations());
    }

    @Operation(summary = "获取座台列表 获取所有座台状态-SSE接口", security = {@SecurityRequirement(name = "token")})
    @GetMapping(value = "/get-disposition-stations/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ResponseResultDTO> getDispositionStationsSSE() {
        // 创建一个持续时间为指定秒数的 Flux 流,并确保当客户端断开连接时流能及时终止
        var clientConnected = new AtomicBoolean(true);
        return Flux.interval(Duration.ofSeconds(4))
                .doFirst(() -> {
                    log.info("getDispositionStationsSSE 客户端连接成功");
                })
                .filter(p -> clientConnected.get())
                .flatMap(sequence -> {
                    log.info("getDispositionStationsSSE 发送回复消息,sequence: {}", sequence);
                    var res = elevatorService.getDispositionStations();
                    log.info("res {}", res == null ? "null" : res.size());
                    return Mono.just(ResponseResultDTO.successObj(res));
                })
                .doOnCancel(() -> {
                    clientConnected.set(false);
                    log.info("getDispositionStationsSSE 客户端断开连接");
                })
                .onErrorResume(e -> {
                    clientConnected.set(false);
                    log.error("getDispositionStationsSSE 发生错误: ", e);
                    return Mono.just(ResponseResultDTO.error(e.getMessage()));
                });
    }
}

前端调用时 请求参数, 测试发现不需要传 header Content-Type: text/event-stream

GET http://localhost:8080/my/get-disposition-stations/sse
Cache-Control: no-cache
Connection: keep-alive
Authorization: Bearer aa0c56faec5b4a9dad724f73d5e4a2f9

如果需要Nginx转发, 只需要匹配 sse结尾的地址

    location ~* ^/my/(.+)\/sse {
        #设置允许跨域
        add_header 'Access-Control-Allow-Origin' '*';
        add_header 'Access-Control-Allow-Credentials' "true";
        add_header 'Access-Control-Max-Age' 86400;
        add_header 'Access-Control-Allow-Methods' 'GET, POST, OPTIONS, DELETE';
        add_header 'Access-Control-Allow-Headers' 'reqid, nid, host, x-real-ip, x-forwarded-ip, event-type, event-id, accept, content-type, user_id, authorization';
        if ($request_method = 'OPTIONS') {
            return 204;
        }

        proxy_http_version  1.1;
        proxy_set_header  Upgrade $http_upgrade;
        proxy_set_header  Connection "upgrade";
	proxy_set_header Host $host;

	proxy_pass  http://10.0.0.2:8003;
	proxy_buffering off;
	proxy_cache off;
	chunked_transfer_encoding off;
	proxy_read_timeout 86400s;
    }

    location ~* ^/my/ {
        #设置允许跨域
        add_header 'Access-Control-Allow-Origin' '*';
        add_header 'Access-Control-Allow-Credentials' "true";
        add_header 'Access-Control-Max-Age' 86400;
        add_header 'Access-Control-Allow-Methods' 'GET, POST, OPTIONS, DELETE';
        add_header 'Access-Control-Allow-Headers' 'reqid, nid, host, x-real-ip, x-forwarded-ip, event-type, event-id, accept, content-type, user_id, authorization';
        if ($request_method = 'OPTIONS') {
            return 204;
        }
        proxy_pass  http://10.0.0.2:8003;
    }

上面的Nginx配置不是必要的, 只需要简单补充下面的代码就可以通过nginx转发SSE

proxy_buffering off;
proxy_cache off;
chunked_transfer_encoding off;

无其他特别要求。

(2)

Related Post