Spring WebFlux 基础教程:WebSocket 使用

WebSocket 协议简介

WebSocket 协议提供了一种标准化的方式,在客户端和服务端建立在一个TCP 连接之上的全双工,双向通信的协议。

WebSocket 交互开始于 HTTP 请求,使用 HTTP 请求的 header 中的 Upgrade 进行切换到 WebSocket 协议。

HTTP 和 WebSocket 对比

即使 WebSocket 的设计兼容 HTTP 协议和开始于 HTTP 请求,了解两个协议是不同架构和应用模型是非常重要的。

在 HTTP 和 REST, 一个应用有多个URL, 对于交互的应用,客户端可以访问这些 URL,请求和响应的风格。服务端会根据 URL、http method、header 等等信息进行路由处理。

相比之下, WebSocket 协议是使用一个URL 初始化连接。应用所有的消息通过同一个 TCP 通信,这一点是完全不同的异步、事件驱动的消息传递体系结构。

WebSocket API 的使用

Webflux 依赖

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency>

定义一个Handler,用于处理数据的请求和响应

import java.time.Duration; import org.springframework.web.reactive.socket.WebSocketHandler; import org.springframework.web.reactive.socket.WebSocketMessage; import org.springframework.web.reactive.socket.WebSocketSession; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class MyWebSocketHandler implements WebSocketHandler { // 每间隔 1 秒,数字加 1 private Flux<Long> intervalFlux = Flux.interval(Duration.ofSeconds(1L), Duration.ofSeconds(1L)); @Override public Mono<Void> handle(WebSocketSession session) { return session.send(intervalFlux.map(item -> session.textMessage(item + ""))) .and(session.receive().map(WebSocketMessage::getPayloadAsText).log()); } }

配置类,配置 WebSocket 的 URL 信息

@Configuration public class WebConfig { @Bean public WebSocketHandlerAdapter handlerAdapter() { return new WebSocketHandlerAdapter(); } @Bean public HandlerMapping handlerMapping() { Map<String, WebSocketHandler> urlMap = new HashMap<>(); urlMap.put("/path", new MyWebSocketHandler()); int order = -1; return new SimpleUrlHandlerMapping(urlMap, order); } }

客户端

import java.net.URI; import java.net.URISyntaxException; import java.time.Duration; import org.springframework.web.reactive.socket.WebSocketMessage; import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient; import org.springframework.web.reactive.socket.client.WebSocketClient; import reactor.core.publisher.Mono; public class Client { public static void main(String[] args) throws URISyntaxException { WebSocketClient client = new ReactorNettyWebSocketClient(); URI url = new URI("ws://localhost:8080/path"); client.execute(url, session -> session.send(Mono.just(session.textMessage("hello world"))) .thenMany(session.receive().map(WebSocketMessage::getPayloadAsText).log()) .then()) .block(Duration.ofSeconds(10)); } }

服务端日志

2021-08-10 22:42:36.162 INFO 85792 --- [ main] o.s.b.web.embedded.netty.NettyWebServer : Netty started on port(s): 8080 2021-08-10 22:42:36.168 INFO 85792 --- [ main] c.e.s.SpringBootDemoApplication : Started SpringBootDemoApplication in 3.583 seconds (JVM running for 4.462) 2021-08-10 22:42:51.518 INFO 85792 --- [ctor-http-nio-2] reactor.Flux.Map.1 : onSubscribe(FluxMap.MapSubscriber) 2021-08-10 22:42:51.522 INFO 85792 --- [ctor-http-nio-2] reactor.Flux.Map.1 : request(unbounded) 2021-08-10 22:42:51.534 INFO 85792 --- [ctor-http-nio-2] reactor.Flux.Map.1 : onNext(hello world) 2021-08-10 22:43:00.956 INFO 85792 --- [ctor-http-nio-2] reactor.Flux.Map.1 : onComplete()

客户端日志

22:42:51.527 [reactor-http-nio-1] DEBUG reactor.netty.channel.FluxReceive - [id: 0xd95be56c, L:/127.0.0.1:50773 - R:localhost/127.0.0.1:8080] Subscribing inbound receiver [pending: 0, cancelled:false, inboundDone: false] 22:42:51.530 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onSubscribe(FluxMap.MapSubscriber) 22:42:51.531 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - request(unbounded) 22:42:52.518 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(0) 22:42:53.513 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(1) 22:42:54.518 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(2) 22:42:55.513 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(3) 22:42:56.514 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(4) 22:42:57.516 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(5) 22:42:58.518 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(6) 22:42:59.512 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(7) 22:43:00.513 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(8) 22:43:00.947 [main] INFO reactor.Flux.Map.1 - cancel() 22:43:00.948 [reactor-http-nio-1] DEBUG reactor.netty.http.client.HttpClientOperations - [id: 0xd95be56c, L:/127.0.0.1:50773 - R:localhost/127.0.0.1:8080] Cancelling Websocket inbound. Closing Websocket Exception in thread "main" java.lang.IllegalStateException: Timeout on blocking read for 10000 MILLISECONDS

至此,Spring WebFlux 中 WebSocket 协议使用的结束了。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zyszfs.html