前面发过一篇 JAVA 实现SSE(Server-Sent Events)与Nginx转 的blog,使用的是WebFlux, 如果不想使用反应式编程,怎么实现SSE呢?看下面代码。
// Copyright (C) 2024 Holding Ltd. All Rights Reserved.
package com.windigniter.monitorserver.controller;
@Slf4j
@RestController
@RequestMapping("/sse")
@RequiredArgsConstructor(onConstructor_ = {@Autowired})
public class SseController {
private final SseManager sseManager;
@GetMapping(value = "/control/{jobNo}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter sseEmitterControlMonitor(@PathVariable String jobNo) {
log.info("SSE-Control-Monitor-JobNo: {}", jobNo);
return sseManager.createSseByJobNo(jobNo, "Monitor");
}
}
// Copyright (C) 2024 Holding Ltd. All Rights Reserved.
package com.windigniter.monitorserver.sse;
@Slf4j
@Component
@EnableScheduling
@RequiredArgsConstructor(onConstructor_ = {@Autowired})
public class SseManager {
private final String sensorType = "SINGLEBOX";
public SseEmitter createSseByJobNo(String elevatorCode, String messageType) {
var mapKey = WebSocketManager.buildSessionKey(elevatorCode, messageType, sensorType);
//默认30秒超时,设置为0L则永不超时
var sseEmitter = new SseEmitter(0L);
//完成后回调
sseEmitter.onCompletion(() -> {
log.info("[{}]结束连接...................", mapKey);
closeSse(mapKey, sseEmitter, elevatorCode, messageType);
});
//超时回调
sseEmitter.onTimeout(() -> {
log.info("[{}]连接超时...................", mapKey);
sseEmitter.complete();
});
//异常回调
sseEmitter.onError(
throwable -> {
log.info("[{}]连接异常--------------,{}", mapKey, ExceptionUtil.stacktraceToString(throwable));
sseEmitter.complete();
}
);
startSse(mapKey, sseEmitter, elevatorCode, messageType);
log.info("[{}]创建sse连接成功!", mapKey);
return sseEmitter;
}
public void startSse(String mapKey, SseEmitter sseEmitter, String elevatorCode, String messageType) {
log.info("浏览器控梯{}创建监控-----", sseEmitter.toString());
}
public void closeSse(String mapKey, SseEmitter sseEmitter, String elevatorCode, String messageType) {
log.info("浏览器控梯{}关闭监控------", sseEmitter.toString());
WebSocketManager.delSse(mapKey, sseEmitter, WebSocketManager.SESSION_TYPE_SSE_CONTROL);
// 判断sessionList是否为空
boolean isEmpty = WebSocketManager.sessionListIsEmpty(mapKey);
if (isEmpty) {
// 关闭监控
}
}
}
仅给出了重要代码,请根据实际情况补充业务逻辑。
(0)