Java, 原创

不使用WebFlux,JAVA 实现SSE(Server-Sent Events)

前面发过一篇 JAVA 实现SSE(Server-Sent Events)与Nginx转 的blog,使用的是WebFlux, 如果不想使用反应式编程,怎么实现SSE呢?看下面代码。

// Copyright (C) 2024 Holding Ltd. All Rights Reserved.

package com.windigniter.monitorserver.controller;

@Slf4j
@RestController
@RequestMapping("/sse")
@RequiredArgsConstructor(onConstructor_ = {@Autowired})
public class SseController {
    private final SseManager sseManager;

    @GetMapping(value = "/control/{jobNo}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter sseEmitterControlMonitor(@PathVariable String jobNo) {
        log.info("SSE-Control-Monitor-JobNo: {}", jobNo);
        return sseManager.createSseByJobNo(jobNo, "Monitor");
    }
}
// Copyright (C) 2024 Holding Ltd. All Rights Reserved.
package com.windigniter.monitorserver.sse;

@Slf4j
@Component
@EnableScheduling
@RequiredArgsConstructor(onConstructor_ = {@Autowired})
public class SseManager {
    private final String sensorType = "SINGLEBOX";
    public SseEmitter createSseByJobNo(String elevatorCode, String messageType) {
        var mapKey = WebSocketManager.buildSessionKey(elevatorCode, messageType, sensorType);
        //默认30秒超时,设置为0L则永不超时
        var sseEmitter = new SseEmitter(0L);
        //完成后回调
        sseEmitter.onCompletion(() -> {
            log.info("[{}]结束连接...................", mapKey);
            closeSse(mapKey, sseEmitter, elevatorCode, messageType);
        });
        //超时回调
        sseEmitter.onTimeout(() -> {
            log.info("[{}]连接超时...................", mapKey);
            sseEmitter.complete();
        });
        //异常回调
        sseEmitter.onError(
                throwable -> {
                    log.info("[{}]连接异常--------------,{}", mapKey, ExceptionUtil.stacktraceToString(throwable));
                    sseEmitter.complete();
                }
        );
        startSse(mapKey, sseEmitter, elevatorCode, messageType);
        log.info("[{}]创建sse连接成功!", mapKey);
        return sseEmitter;
    }

    public void startSse(String mapKey, SseEmitter sseEmitter, String elevatorCode, String messageType) {
        log.info("浏览器控梯{}创建监控-----", sseEmitter.toString());
    }

    public void closeSse(String mapKey, SseEmitter sseEmitter, String elevatorCode, String messageType) {
        log.info("浏览器控梯{}关闭监控------", sseEmitter.toString());
        WebSocketManager.delSse(mapKey, sseEmitter, WebSocketManager.SESSION_TYPE_SSE_CONTROL);
        // 判断sessionList是否为空
        boolean isEmpty = WebSocketManager.sessionListIsEmpty(mapKey);
        if (isEmpty) {
            // 关闭监控
        }
    }
}

仅给出了重要代码,请根据实际情况补充业务逻辑。

(0)

Related Post