Spring Boot中使用RSocket的示例代码
1.概述
RSocket应用层协议支持 ReactiveStreams语义,例如:用RSocket作为HTTP的一种替代方案。在本教程中,我们将看到 RSocket用在springboot中,特别是springboot如何帮助抽象出更低级别的RSocketAPI。
2.依赖
让我们从添加 spring-boot-starter-rsocket依赖开始:
org.springframework.boot spring-boot-starter-rsocket
这个依赖会传递性的拉取 RSocket相关的依赖,比如: rsocket-core和 rsocket-transport-netty
3.示例的应用程序
现在继续我们的简单应用程序。为了突出 RSocket提供的交互模式,我打算创建一个交易应用程序,交易应用程序包括客户端和服务器。
3.1.服务器设置
首先,我们设置由springboot应用程序引导的 RSocketserver服务器。因为有 spring-boot-starter-rsocketdependency依赖,所以springboot会自动配置 RSocketserver。跟平常一样,可以用属性驱动的方式修改 RSocketserver默认配置值。例如:通过增加如下配置在 application.properties中,来修改 RSocket端口
spring.rsocket.server.port=7000
也可以根据需要进一步修改服务器的其他属性
3.2.设置客户端
接下来,我们来设置客户端,也是一个springboot应用程序。虽然springboot自动配置大部分RSocket相关的组件,但还要自定义一些对象来完成设置。
@Configuration publicclassClientConfiguration{ @Bean publicRSocketrSocket(){ returnRSocketFactory .connect() .mimeType(MimeTypeUtils.APPLICATION_JSON_VALUE,MimeTypeUtils.APPLICATION_JSON_VALUE) .frameDecoder(PayloadDecoder.ZERO_COPY) .transport(TcpClientTransport.create(7000)) .start() .block(); } @Bean RSocketRequesterrSocketRequester(RSocketStrategiesrSocketStrategies){ returnRSocketRequester.wrap(rSocket(),MimeTypeUtils.APPLICATION_JSON,rSocketStrategies); } }
这儿我们正在创建 RSocket客户端并且配置TCP端口为:7000。注意:该服务端口我们在前面已经配置过。接下来我们定义了一个RSocket的装饰器对象 RSocketRequester。这个对象在我们跟 RSocketserver交互时会为我们提供帮助。定义这些对象配置后,我们还只是有了一个骨架。在接下来,我们将暴露不同的交互模式,并看看springboot在这个地方提供帮助的。
4. springbootRSocket中的 Request/Response
我们从 Request/Response开始, HTTP也使用这种通信方式,这也是最常见的、最相似的交互模式。在这种交互模式里,由客户端初始化通信并发送一个请求。之后,服务器端执行操作并返回一个响应给客户端--这时通信完成。在我们的交易应用程序里,一个客户端询问一个给定的股票的当前的市场数据。作为回复,服务器会传递请求的数据。
4.1.服务器
在服务器这边,我们首先应该创建一个 controller来持有我们的处理器方法。我们会使用 @MessageMapping注解来代替像SpringMVC中的 @RequestMapping或者 @GetMapping注解
@Controller publicclassMarketDataRSocketController{ privatefinalMarketDataRepositorymarketDataRepository; publicMarketDataRSocketController(MarketDataRepositorymarketDataRepository){ this.marketDataRepository=marketDataRepository; } @MessageMapping("currentMarketData") publicMonocurrentMarketData(MarketDataRequestmarketDataRequest){ returnmarketDataRepository.getOne(marketDataRequest.getStock()); } }
来研究下我们的控制器。我们将使用 @Controller注解来定义一个控制器来处理进入RSocket的请求。另外,注解 @MessageMapping让我们定义我们感兴趣的路由和如何响应一个请求。在这个示例中,服务器监听路由 currentMarketData,并响应一个单一的结果 Mono
4.2.客户端
接下来,我们的RSocket客户端应该询问一只股票的价格并得到一个单一的响应。为了初始化请求,我们该使用 RSocketRequester类,如下:
@RestController publicclassMarketDataRestController{ privatefinalRSocketRequesterrSocketRequester; publicMarketDataRestController(RSocketRequesterrSocketRequester){ this.rSocketRequester=rSocketRequester; } @GetMapping(value="/current/{stock}") publicPublishercurrent(@PathVariable("stock")Stringstock){ returnrSocketRequester .route("currentMarketData") .data(newMarketDataRequest(stock)) .retrieveMono(MarketData.class); } }
注意:在示例中, RSocket客户端也是一个 REST风格的 controller,以此来访问我们的 RSocket服务器。因此,我们使用 @RestController和 @GetMapping注解来定义我们的请求/响应端点。在端点方法中,我们使用的是类 RSocketRequester并指定了路由。事实上,这个是服务器端 RSocket所期望的路由,然后我们传递请求数据。最后,当调用 retrieveMono()方法时,springboot会帮我们初始化一个请求/响应交互。
5. SpringBootRSocket中的 FireAndForget模式
接下来我们将查看 FireAndForget交互模式。正如名字提示的一样,客户端发送一个请求给服务器,但是不期望服务器的返回响应回来。在我们的交易程序中,一些客户端会作为数据资源服务,并且推送市场数据给服务器端。
5.1.服务器端
我们来创建另外一个端点在我们的服务器应用程序中,如下:
@MessageMapping("collectMarketData") publicMonocollectMarketData(MarketDatamarketData){ marketDataRepository.add(marketData); returnMono.empty(); }
我们又一次定义了一个新的 @MessageMapping路由为 collectMarketData。此外,SpringBoot自动转换传入的负载为一个 MarketData实例。但是,这儿最大的不同是我们返回一个 Mono
5.2.客户端
来看看我们如何初始化我们的 fire-and-forget模式的请求。我们将创建另外一个REST风格的端点,如下:
@GetMapping(value="/collect") publicPublishercollect(){ returnrSocketRequester .route("collectMarketData") .data(getMarketData()) .send(); }
这儿我们指定路由和负载将是一个 MarketData实例。由于我们使用 send()方法来代替 retrieveMono(),所有交互模式变成了 fire-and-forget模式。
6. SpringBootRSocket中的 RequestStream
请求流是一种更复杂的交互模式,这个模式中客户端发送一个请求,但是在一段时间内从服务器端获取到多个响应。为了模拟这种交互模式,客户端会询问给定股票的所有市场数据。
6.1.服务器端
我们从服务器端开始。我们将添加另外一个消息映射方法,如下:
@MessageMapping("feedMarketData") publicFluxfeedMarketData(MarketDataRequestmarketDataRequest){ returnmarketDataRepository.getAll(marketDataRequest.getStock()); }
正如所见,这个处理器方法跟其他的处理器方法非常类似。不同的部分是我们返回一个 Flux
6.2.客户端
在客户端这边,我们该创建一个端点来初始化请求/响应通信,如下:
@GetMapping(value="/feed/{stock}",produces=MediaType.TEXT_EVENT_STREAM_VALUE) publicPublisherfeed(@PathVariable("stock")Stringstock){ returnrSocketRequester .route("feedMarketData") .data(newMarketDataRequest(stock)) .retrieveFlux(MarketData.class); }
我们来研究下RSocket请求。首先我们定义了路由和请求负载。然后,我们定义了使用 retrieveFlux()调用的响应期望。这部分决定了交互模式。另外注意:由于我们的客户端也是 REST风格的服务器,客户端也定义了响应媒介类型 MediaType.TEXT_EVENT_STREAM_VALUE。
7.异常的处理
现在让我们看看在服务器程序中,如何以声明式的方式处理异常。当处理请求/响应式,我可以简单的使用 @MessageExceptionHandler注解,如下:
@MessageExceptionHandler publicMonohandleException(Exceptione){ returnMono.just(MarketData.fromException(e)); }
这里我们给异常处理方法标记注解为 @MessageExceptionHandler。作为结果,这个方法将处理所有类型的异常,因为 Exception是所有其他类型的异常的超类。我们也可以明确地创建更多的不同类型的,不同的异常处理方法。这当然是请求/响应模式,并且我们返回的是 Mono
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。