Java, 原创,

解决WebFlux使用ServerRequest流处理异常Only one connection receive subscriber allowed问题

当我们需要返回数据即有列表有需要统计数据,按常规流程可能会出现Only one connection receive subscriber allowed的错误。

先来看下错误代码:

@Slf4j
@Component
@Tag(name = "Book接口", description = "Book接口 developer: chenxue")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class BookHandler {
    private final BookService service;

    public Mono<ServerResponse> listAll(ServerRequest request) {
		// BookRequestDTO 继承 PageDTO
		var requestMono = request.bodyToMono(BookRequestDTO.class);
		// 结果为Mono<Long>
		var total = requestMono.flatMap(service::countByCondition);
		// 结果为 Mono<List<BookDTO>>
		var listMono = requestMono.flatMap(service::pageByCondition).collectList();
		// 分页大小, Mono<Long>
		var pageSize = requestMono.map(PageDTO::pageSize);
		// 当前页码, Mono<Long>
		var pageIndex = requestMono.map(PageDTO::pageIndex);
		
		return Mono.zip(listMono, total, pageSize, pageIndex)
				.flatMap(p -> ServerResponse.ok().bodyValue(PageResponseDTO.of(p.getT1(), p.getT2(), p.getT3(), p.getT4())));
		
    }
}

以上运行起来就会报Only one connection receive subscriber allowed

解题思路: request.bodyToMono(BookRequestDTO.class) 的结果为 Mono, 而且不允许多次调用,但Mono里的对象多次调用没有问题。

那这多次的请求放在后面,不使用Mono的形式 pageWithTotalByCondition 里面封装好最终数据PageResponseDTO

@Slf4j
@Component
@Tag(name = "Book接口", description = "Book接口 developer: chenxue")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class BookHandler {
    private final BookService service;

    public Mono<ServerResponse> listAll(ServerRequest request) {

        return request.bodyToMono(BookRequestDTO.class)
                .flatMap(service::pageWithTotalByCondition)
                .flatMap(ServerResponse.ok()::bodyValue)
                .log();

    }
}
@Override
    public Mono<PageResponseDTO> pageWithTotalByCondition(BookRequestDTO requestDTO) {
        return gateway.countByCondition(requestDTO)
                .zipWhen(t -> gateway.pageByCondition(requestDTO)
                        .collectList())
                .map(p -> PageResponseDTO.of(p.getT2(), p.getT1(), requestDTO.getPageSize(),
                        requestDTO.getPageIndex()));
    }

以上代码是将 结果的总数 和 分页的列表,以及分页信息整体封装成PageResponseDTO。

zipWhen 或者 zipWith 都可以成功返回, 具体zipWhen 的触发条件我也还没有搞太明白。

后期有更深入理解在更新。

(1152)

Related Post