大佬教程收集整理的这篇文章主要介绍了Micronaut 中使用 RxJava 的 Websocket 请求路由,大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。
我正在 Micronaut 中实现一个 WebSocket 路由器,它在 WebSocket“客户端”和 WebSocket“服务器”应用程序之间路由 WebSocket 文本消息。 每条消息都传达了主题 ID,这是 url 路径 /{topic} 的一部分。来自同一主题的消息必须双向转发, 例如主题 A 的消息被转发到主题 A 等。see architecture diagram
WebSocket 路由
所以客户端创建到路由器的 WebSocket 连接,然后路由器创建到服务器的连接。这意味着一旦客户端创建到路由器的连接,路由器必须创建到服务器的连接。
为了在客户端 -> 服务器或服务器 -> 客户端的每个方向发送新消息,必须重用给定主题的现有连接。
这些连接将通过一些重试重新连接逻辑长期存在,我稍后会加入。
消息发送逻辑
客户端发送消息后,必须将其转发到服务器,反向服务器 -> 客户端也是如此。
问题
我被卡住的地方是:
最初我通过使用自定义的 WebSocketSession 缓存和 ServerHandler 和 ClIEntHandler 存储和检索来实现转发逻辑 来自缓存的 CompletableFuture。这种方法有效并且还解决了异步等待连接会话的问题, 但我想通过使用纯 RxJava Java 模式以更无状态的方式实现转发。 为会话使用自定义共享缓存的另一个设计缺陷是,它对于 Netty 中现有的会话注册表来说是多余的。
到目前为止,我已经基本实现了客户端和服务器的处理程序,客户端/服务器的 WebSocket 处理程序, 但我不知道如何正确集成 Flowable 的东西。
源代码ClIEntHandler:(处理从客户端到路由器的WebSocket连接)
@ServerWebSocket("/topic/{topicID}") public class ClIEntHandler {
private static final Logger LOG = LoggerFactory.getLogger(ClIEntHandler.class);
private RxWebSocketClIEnt webSocketClIEnt;
private ConnectionPropertIEs connectionPropertIEs;
public ClIEntHandler(@ClIEnt("${Connection.url}") RxWebSocketClIEnt webSocketClIEnt,ConnectionPropertIEs connectionPropertIEs) {
this.webSocketClIEnt = webSocketClIEnt;
this.connectionPropertIEs = connectionPropertIEs;
}
@Onopen
public voID onopen(String topicID,WebSocketSession session) {
LOG.info("Open connection for clIEnt topic: {}",topicID);
}
@Onmessage
public voID onmessage(String topicID,String message,WebSocketSession session) {
LOG.info("New message from clIEnt topic: {}",topicID);
Flowable<ServerHandler> flowable = webSocketClIEnt.connect(ServerHandler.class,connectionPropertIEs.resolveURI(topicID));
//? so Now what to do with this
// the basic function works,but new connection is created with each message
//and caching flowable would not be much better than caching WebSocket sessions
//also it would be nicer to connect in onopen() and then subscribe in onmessage()
flowable.subscribe(serverHandler -> {
serverHandler.setClIEntSession(session); // register session to send messages BACk
serverHandler.send(messagE); // send message after connection
},t -> LOG.error("Error handling clIEnt topic: {}",topicID,t)); //handle exception
}
@OnClose
public voID onClose(String topicID) {
LOG.info("Close connection for clIEnt topic: {}",topicID);
}
@OnError
public voID onError(String topicID,WebSocketSession session,Throwable t) {
LOG.error("Error for clIEnt topic: {}",t);
}
}
源代码ServerHandler:(处理从路由器到服务器的WebSocket连接)
@ClIEntWebSocket("/topic/{topicID}") public class ServerHandler implements autoCloseable,WebSocketSessionAware{
private static final Logger LOG = LoggerFactory.getLogger(ServerHandler.class);
private volatile WebSocketSession clIEntSession;
private volatile WebSocketSession serverSession;
private volatile String topicID;
@Onopen
public voID onopen(String topicID,WebSocketSession session) {
this.topicID = topicID;
LOG.info("Open connection for server topic: {}",topicID);
}
@Onmessage
public Publisher<String> onmessage(String messagE) {
LOG.info("New message from server topic: {}",this.topicID);
return clIEntSession.send(messagE); //Could potentially use WebSocketbroadcaster as well and send to topic based on preDicate
}
@OnClose
public voID onClose(WebSocketSession session) {
LOG.info("Close connection for server topic: {}",this.topicID);
if (clIEntSession != null) {
clIEntSession.close();
}
}
@OverrIDe
public voID close() throws Exception {
LOG.info("Closing handler for server topic: {}",this.topicID);
}
@OverrIDe
public voID setWebSocketSession(WebSocketSession serverSession) {
this.serverSession = serverSession;
}
public voID send(String messagE) {
if (serverSession == null) {
throw new IllegalStateException("Can not send if connection not opened");
}
serverSession.sendAsync(messagE); //send message to server
}
public voID forceClose() {
if (serverSession != null) {
serverSession.close();
}
}
public voID setClIEntSession(WebSocketSession clIEntSession) {
this.CLIENtSession = clIEntSession;
}
}
类似的现有代码
我在其他项目中搜索了类似的代码,并在 Spring Cloud Gateway(WebsocketRoutIngFilter 类)中找到了类似功能的片段
在此处查看完整代码:https://github.com/spring-cloud/spring-cloud-gateway/blob/8722f4062ed21de28ebf56f69bccc5ad4ac1d29d/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/filter/WebsocketRoutIngFilter.java
这个例子使用了稍微不同的 API 和 tomcat WebSocketSession 和 Reactor 库 但无论如何,我不知道如何在 Micronaut Netty 之上用 RxJava Flowable API 表达类似的东西。
return clIEnt.execute(url,this.headers,new WebSocketHandler() {
@OverrIDe
public Mono<VoID> handle(WebSocketSession proxySession) {
// Use retain() for Reactor Netty
Mono<VoID> proxySessionSend = proxySession
.send(session.receive().doOnNext(WebSocketmessage::retain));
// .log("proxySessionSend",Level.FINE);
Mono<VoID> serverSessionSend = session
.send(proxySession.receive().doOnNext(WebSocketmessage::retain));
// .log("sessionSend",Level.FINE);
return Mono.zip(proxySessionSend,serverSessionSend).then();
}
额外问题:
2)如何处理超时和重试与 Flowable 的连接?
如果我有这样的代码:
Flowable flowable = serverFlowable.connect(ServerHandler.class,uri);
2a) 如果在给定超时后连接尝试失败,如何连接想要在连接建立后发送消息的等待订阅者并传播错误?
2b) 如果使用 RxJava 连接尝试失败,如何以指数方式重试连接(5,20,30 秒?)?我想 repeat() 可以用于这个吗?
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)
以上是大佬教程为你收集整理的Micronaut 中使用 RxJava 的 Websocket 请求路由全部内容,希望文章能够帮你解决Micronaut 中使用 RxJava 的 Websocket 请求路由所遇到的程序开发问题。
如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。