大佬教程收集整理的这篇文章主要介绍了将Flink计算完毕后的数据Sink到Nebula,大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。
Flink是目前流计算的隐形王者,在国际国内有有庞大的拥趸。
Nebula是国产图数据库的后起之秀,在DBENGInes中排名也逐年上升。
将两者进行结合,可以产生很多应用场景:比如实时计算服务链路调用关系并将结果存到Nebula中、实时计算业务访问风控情况并将结果存到Nebula中、实时计算预警发生情况并将结果存到Nebula中等。
将Flink计算完毕后的结果,Sink到Nebula,Nebula官方提供了一个Flink Connector,但是很不易用。
笔者根据项目实际应用情况,写了一个更简洁直接的Sink,作为抛砖引玉,欢迎各位Flink及Nebula爱好者共同交流。
一、NebulaUtil
由于Nebula提供的Java Client是非线程安全的,所以我们首先封装一个单例的NebulaUtil,主要代码如下:
import lombok.val;
import lombok.var;
/** * Nebula工具类 */ public class NebulaUtil { // Nebula会话 private Session session = null; // Nebula连接池 private NebulaPool pool = new NebulaPool();/** * 获得Nebula工具类单例 * * @return NebulaUtil */ public static NebulaUtil geTinstance() { return NebulaUtilHolder.instance; } /** * 执行NGQL * * @param nGQL NGQL * @return 返回执行结果 */ public ResultSet execute(String nGQL) { try { if (session != null) { return session.execute(nGQL); } } catch (IOErrorException E) { e.printStackTrace(); } catch (UnsupportedEncodingException E) { e.printStackTrace(); } return null; } /** * 释放会话 */ public void releaseSession() { // 释放连接 if (session != null) { session.release(); } // 关闭连接池 pool.close(); } private static class NebulaUtilHolder { private static final NebulaUtil instance = new NebulaUtil(); } private NebulaUtil() { initSession(); } /** * 初始化会话 */ private void initSession() {// 连接地址,多个间用逗号“,”隔开 val host = "127.0.0.1"; val port = 9669; val user = "user"; val password = "password"; val space = "MySpace"; var nebulaPoolConfig = new NebulaPoolConfig(); nebulaPoolConfig.setMaxConnSize(100); var hostAddressList = new ArrayList<HostAddress>(); val hostArray = host.split(","); for (val hostAddress : hostArray) { hostAddressList.add(new HostAddress(hostAddress, port)); } try { pool.init(hostAddressList, nebulaPoolConfig); } catch (UnknownHostException E) { e.printStackTrace(); } try { session = pool.getSession(user, password, false); } catch (NotValidConnectionException E) { e.printStackTrace(); } catch (IOErrorException E) { e.printStackTrace(); } catch (AuthFailedException E) { e.printStackTrace(); } // 切换图空间 val resp = execute(String.format("USE %s;", spacE)); if (resp == null || !resp.isSucceeded()) { System.out.println("切换图空间失败!" + spacE); } } }
二、NebulaSink
有了NebulaUtil,实现NebulaSink就非常简单了,每个方法里只有几行代码:
import lombok.val;/** * Sink到Nebula数据库 */ public class NebulaSink extends RichSinkFunction<List<String>> { /** * 打开连接 * * @param parameters 配置参数 */ @Override public void open(Configuration parameters) { } /** * 调用 * * @param nGQLList NGQL列表 * @param context 上下文 */ @Override public void invoke(List<String> nGQLList, Context context) { for (val nGQL : nGQLList) { NebulaUtil.geTinstance().execute(nGQL); } } /** * 关闭连接 */ @Override public void close() throws Exception { super.close(); NebulaUtil.geTinstance().releaseSession(); } }
三、将Vertex及Edge数据组装成NGQL语句
有了NebulaUtil以及NebulaSink后,Sink到Nebula之前,我们主要的工作就是将Vertex及Edge数据,组装对应的NGQL语句即可。
以上是大佬教程为你收集整理的将Flink计算完毕后的数据Sink到Nebula全部内容,希望文章能够帮你解决将Flink计算完毕后的数据Sink到Nebula所遇到的程序开发问题。
如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。