Java, 原创JAVA, Nginx, Spring Boot, SSE
JAVA 实现SSE(Server-Sent Events)与Nginx转发
- by chenxue4076
- 15 hours ago
最近看到SSE相关文章,主要是考虑替换 前端定时调接口,以及替换单项通信的WebSocket,下面给出实现的例子,扩展原来的MVC项目,使用的 WebFlux实现,这样项目就可以支持普通MVC,又兼容WebFlux 。
POM文件增加webflux
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
JAVA类 MyController.java,列出了普通接口和 SSE接口, POST的实现方式完全一样
package com.shmashine.api.controller.my;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.shmashine.api.service.my.ElevatorService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.enums.SecuritySchemeType;
import io.swagger.v3.oas.annotations.security.SecurityRequirement;
import io.swagger.v3.oas.annotations.security.SecurityScheme;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* 默认说明
*
* @author chenx
*/
@Slf4j
@RestController
@RequiredArgsConstructor(onConstructor_ = {@Autowired})
@RequestMapping("/my")
@SecurityScheme(name = "token", scheme = "bearer", type = SecuritySchemeType.HTTP)
@Tag(name = "坐台相关接口", description = "坐台相关接口 - powered by ChenXue")
public class FujitecController extends BaseRequestEntity {
private final ElevatorService elevatorService;
/**
* 获取座台列表 获取所有座台状态
*/
@Operation(summary = "获取座台列表 获取所有座台状态", security = {@SecurityRequirement(name = "token")})
@GetMapping("/get-disposition-stations")
public ResponseResultDTO getDispositionStations() {
return ResponseResultDTO.successObj(elevatorService.getDispositionStations());
}
@Operation(summary = "获取座台列表 获取所有座台状态-SSE接口", security = {@SecurityRequirement(name = "token")})
@GetMapping(value = "/get-disposition-stations/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ResponseResultDTO> getDispositionStationsSSE() {
// 创建一个持续时间为指定秒数的 Flux 流,并确保当客户端断开连接时流能及时终止
var clientConnected = new AtomicBoolean(true);
return Flux.interval(Duration.ofSeconds(4))
.doFirst(() -> {
log.info("getDispositionStationsSSE 客户端连接成功");
})
.filter(p -> clientConnected.get())
.flatMap(sequence -> {
log.info("getDispositionStationsSSE 发送回复消息,sequence: {}", sequence);
var res = elevatorService.getDispositionStations();
log.info("res {}", res == null ? "null" : res.size());
return Mono.just(ResponseResultDTO.successObj(res));
})
.doOnCancel(() -> {
clientConnected.set(false);
log.info("getDispositionStationsSSE 客户端断开连接");
})
.onErrorResume(e -> {
clientConnected.set(false);
log.error("getDispositionStationsSSE 发生错误: ", e);
return Mono.just(ResponseResultDTO.error(e.getMessage()));
});
}
}
前端调用时 请求参数, 测试发现不需要传 header Content-Type: text/event-stream
GET http://localhost:8080/my/get-disposition-stations/sse
Cache-Control: no-cache
Connection: keep-alive
Authorization: Bearer aa0c56faec5b4a9dad724f73d5e4a2f9
如果需要Nginx转发, 只需要匹配 sse结尾的地址
location ~* ^/my/(.+)\/sse {
#设置允许跨域
add_header 'Access-Control-Allow-Origin' '*';
add_header 'Access-Control-Allow-Credentials' "true";
add_header 'Access-Control-Max-Age' 86400;
add_header 'Access-Control-Allow-Methods' 'GET, POST, OPTIONS, DELETE';
add_header 'Access-Control-Allow-Headers' 'reqid, nid, host, x-real-ip, x-forwarded-ip, event-type, event-id, accept, content-type, user_id, authorization';
if ($request_method = 'OPTIONS') {
return 204;
}
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_pass http://10.0.0.2:8003;
proxy_buffering off;
proxy_cache off;
chunked_transfer_encoding off;
proxy_read_timeout 86400s;
}
location ~* ^/my/ {
#设置允许跨域
add_header 'Access-Control-Allow-Origin' '*';
add_header 'Access-Control-Allow-Credentials' "true";
add_header 'Access-Control-Max-Age' 86400;
add_header 'Access-Control-Allow-Methods' 'GET, POST, OPTIONS, DELETE';
add_header 'Access-Control-Allow-Headers' 'reqid, nid, host, x-real-ip, x-forwarded-ip, event-type, event-id, accept, content-type, user_id, authorization';
if ($request_method = 'OPTIONS') {
return 204;
}
proxy_pass http://10.0.0.2:8003;
}
上面的Nginx配置不是必要的, 只需要简单补充下面的代码就可以通过nginx转发SSE
proxy_buffering off;
proxy_cache off;
chunked_transfer_encoding off;
无其他特别要求。
(2)