程序问答   发布时间:2022-06-01  发布网站:大佬教程  code.js-code.com
大佬教程收集整理的这篇文章主要介绍了Micronaut 中使用 RxJava 的 Websocket 请求路由大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。

如何解决Micronaut 中使用 RxJava 的 Websocket 请求路由?

开发过程中遇到Micronaut 中使用 RxJava 的 Websocket 请求路由的问题如何解决?下面主要结合日常开发的经验,给出你关于Micronaut 中使用 RxJava 的 Websocket 请求路由的解决方法建议,希望对你解决Micronaut 中使用 RxJava 的 Websocket 请求路由有所启发或帮助;

我正在 Micronaut 中实现一个 WebSocket 路由器,它在 WebSocket“客户端”和 WebSocket“服务器”应用程序之间路由 WebSocket 文本消息。 每条消息都传达了主题 ID,这是 url 路径 /{topic} 的一部分。来自同一主题的消息必须双向转发, 例如主题 A 的消息被转发到主题 A 等。see architecture diagram

WebSocket 路由
所以客户端创建到路由器的 WebSocket 连接,然后路由器创建到服务器的连接。这意味着一旦客户端创建到路由器的连接,路由器必须创建到服务器的连接。 为了在客户端 -> 服务器或服务器 -> 客户端的每个方向发送新消息,必须重用给定主题的现有连接。 这些连接将通过一些重试重新连接逻辑长期存在,我稍后会加入。

消息发送逻辑
客户端发送消息后,必须将其转发到服务器,反向服务器 -> 客户端也是如此。

问题
我被卡住的地方是:

  1. 如何使用 Micronaut 的 Flowable API 将消息从 ServerHandler 转发到 ClIEntHandler,反之亦然? 我需要的是一旦建立连接,只需保留连接建立的信息,并使用 Rx Java API 简单地转发 WebSocket 处理程序之间的消息。

最初我通过使用自定义的 WebSocketSession 缓存和 ServerHandler 和 ClIEntHandler 存储和检索来实现转发逻辑 来自缓存的 CompletableFuture。这种方法有效并且还解决了异步等待连接会话的问题, 但我想通过使用纯 RxJava Java 模式以更无状态的方式实现转发。 为会话使用自定义共享缓存的另一个设计缺陷是,它对于 Netty 中现有的会话注册表来说是多余的。

到目前为止,我已经基本实现了客户端和服务器的处理程序,客户端/服务器的 WebSocket 处理程序, 但我不知道如何正确集成 Flowable 的东西。

源代码ClIEntHandler:(处理从客户端到路由器的WebSocket连接)

@ServerWebSocket("/topic/{topicID}") public class ClIEntHandler {

private static final Logger LOG = LoggerFactory.getLogger(ClIEntHandler.class);

private RxWebSocketClIEnt webSocketClIEnt;

private ConnectionPropertIEs connectionPropertIEs;

public ClIEntHandler(@ClIEnt("${Connection.url}") RxWebSocketClIEnt webSocketClIEnt,ConnectionPropertIEs connectionPropertIEs) {
    this.webSocketClIEnt = webSocketClIEnt;
    this.connectionPropertIEs = connectionPropertIEs;
}

@Onopen
public voID onopen(String topicID,WebSocketSession session) {
    LOG.info("Open connection for clIEnt topic: {}",topicID);
}

@Onmessage
public voID onmessage(String topicID,String message,WebSocketSession session) {
    LOG.info("New message from clIEnt topic: {}",topicID);
    
    Flowable<ServerHandler> flowable = webSocketClIEnt.connect(ServerHandler.class,connectionPropertIEs.resolveURI(topicID));
    //? so Now what to do with this
    // the basic function works,but new connection is created with each message
    //and caching flowable would not be much better than caching WebSocket sessions
    //also it would be nicer to connect in onopen() and then subscribe in onmessage()

    flowable.subscribe(serverHandler -> {
        serverHandler.setClIEntSession(session); // register session to send messages BACk
        serverHandler.send(messagE); // send message after connection           
    },t -> LOG.error("Error handling clIEnt topic: {}",topicID,t)); //handle exception
}

@OnClose
public voID onClose(String topicID) {
    LOG.info("Close connection for clIEnt topic: {}",topicID);     
}

@OnError
public voID onError(String topicID,WebSocketSession session,Throwable t) {
    LOG.error("Error for clIEnt topic: {}",t);
}

}

源代码ServerHandler:(处理从路由器到服务器的WebSocket连接)

@ClIEntWebSocket("/topic/{topicID}") public class ServerHandler implements autoCloseable,WebSocketSessionAware{

private static final Logger LOG = LoggerFactory.getLogger(ServerHandler.class);

private volatile WebSocketSession clIEntSession;

private volatile WebSocketSession serverSession;

private volatile String topicID;

@Onopen
public voID onopen(String topicID,WebSocketSession session) {
    this.topicID = topicID;
    LOG.info("Open connection for server topic: {}",topicID);
}

@Onmessage
public Publisher<String> onmessage(String messagE) {
    LOG.info("New message from server topic: {}",this.topicID);        
    return clIEntSession.send(messagE); //Could potentially use WebSocketbroadcaster as well and send to topic based on preDicate
}

@OnClose
public voID onClose(WebSocketSession session) {
    LOG.info("Close connection for server topic: {}",this.topicID);
    if (clIEntSession != null) {            
        clIEntSession.close();
    }
}

@OverrIDe
public voID close() throws Exception {
    LOG.info("Closing handler for server topic: {}",this.topicID);
}

@OverrIDe
public voID setWebSocketSession(WebSocketSession serverSession) {
    this.serverSession = serverSession;
}

public voID send(String messagE) {
    if (serverSession == null) {
        throw new IllegalStateException("Can not send if connection not opened");
    }
    serverSession.sendAsync(messagE); //send message to server
}

public voID forceClose() {
    if (serverSession != null) {
        serverSession.close();
    }
}

public voID setClIEntSession(WebSocketSession clIEntSession) {
    this.CLIENtSession = clIEntSession;
}

}

类似的现有代码
我在其他项目中搜索了类似的代码,并在 Spring Cloud Gateway(WebsocketRoutIngFilter 类)中找到了类似功能的片段 在此处查看完整代码:https://github.com/spring-cloud/spring-cloud-gateway/blob/8722f4062ed21de28ebf56f69bccc5ad4ac1d29d/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/filter/WebsocketRoutIngFilter.java

这个例子使用了稍微不同的 API 和 tomcat WebSocketSession 和 Reactor 库 但无论如何,我不知道如何在 Micronaut Netty 之上用 RxJava Flowable API 表达类似的东西。

        return clIEnt.execute(url,this.headers,new WebSocketHandler() {
            @OverrIDe
            public Mono<VoID> handle(WebSocketSession proxySession) {
                // Use retain() for Reactor Netty
                Mono<VoID> proxySessionSend = proxySession
                        .send(session.receive().doOnNext(WebSocketmessage::retain));
                // .log("proxySessionSend",Level.FINE);
                Mono<VoID> serverSessionSend = session
                        .send(proxySession.receive().doOnNext(WebSocketmessage::retain));
                // .log("sessionSend",Level.FINE);
                return Mono.zip(proxySessionSend,serverSessionSend).then();
            } 

额外问题:
2)如何处理超时和重试与 Flowable 的连接? 如果我有这样的代码:

Flowable flowable = serverFlowable.connect(ServerHandler.class,uri);

2a) 如果在给定超时后连接尝试失败,如何连接想要在连接建立后发送消息的等待订阅者并传播错误?
2b) 如果使用 RxJava 连接尝试失败,如何以指数方式重试连接(5,20,30 秒?)?我想 repeat() 可以用于这个吗?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

大佬总结

以上是大佬教程为你收集整理的Micronaut 中使用 RxJava 的 Websocket 请求路由全部内容,希望文章能够帮你解决Micronaut 中使用 RxJava 的 Websocket 请求路由所遇到的程序开发问题。

如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。