程序笔记   发布时间:2022-07-19  发布网站:大佬教程  code.js-code.com
大佬教程收集整理的这篇文章主要介绍了将Flink计算完毕后的数据Sink到Nebula大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。

Flink是目前流计算的隐形王者,在国际国内有有庞大的拥趸。

Nebula是国产图数据库的后起之秀,在DBENGInes中排名也逐年上升。

将两者进行结合,可以产生很多应用场景:比如实时计算服务链路调用关系并将结果存到Nebula中、实时计算业务访问风控情况并将结果存到Nebula中、实时计算预警发生情况并将结果存到Nebula中等。

将Flink计算完毕后的结果,Sink到Nebula,Nebula官方提供了一个Flink Connector,但是很不易用

笔者根据项目实际应用情况,写了一个更简洁直接的Sink,作为抛砖引玉,欢迎各位Flink及Nebula爱好者共同交流。

一、NebulaUtil

由于Nebula提供的Java Client是非线程安全的,所以我们首先封装一个单例的NebulaUtil,主要代码如下:

import lombok.val;
import lombok.var;
/**
 * Nebula工具类
 */
public class NebulaUtil {
    // Nebula会话
    private Session session = null;
    // Nebula连接池
    private NebulaPool pool = new NebulaPool();/**
     * 获得Nebula工具类单例
     *
     * @return NebulaUtil
     */
    public static NebulaUtil geTinstance() {
        return NebulaUtilHolder.instance;
    }

    /**
     * 执行NGQL
     *
     * @param nGQL NGQL
     * @return 返回执行结果
     */
    public ResultSet execute(String nGQL) {
        try {
            if (session != null) {
                return session.execute(nGQL);
            }
        } catch (IOErrorException E) {
            e.printStackTrace();
        } catch (UnsupportedEncodingException E) {
            e.printStackTrace();
        }

        return null;
    }
   /**
     * 释放会话
     */
    public void releaseSession() {
        // 释放连接
        if (session != null) {
            session.release();
        }

        // 关闭连接池
        pool.close();
    }

    private static class NebulaUtilHolder {
        private static final NebulaUtil instance = new NebulaUtil();
    }

    private NebulaUtil() {
        initSession();
    }

    /**
     * 初始化会话
     */
    private void initSession() {// 连接地址,多个间用逗号“,”隔开
        val host = "127.0.0.1";
        val port = 9669;
        val user = "user";
        val password = "password";
        val space = "MySpace";

        var nebulaPoolConfig = new NebulaPoolConfig();
        nebulaPoolConfig.setMaxConnSize(100);

        var hostAddressList = new ArrayList<HostAddress>();

        val hostArray = host.split(",");

        for (val hostAddress : hostArray) {
            hostAddressList.add(new HostAddress(hostAddress, port));
        }

        try {
            pool.init(hostAddressList, nebulaPoolConfig);
        } catch (UnknownHostException E) {
           e.printStackTrace();
        }

        try {
            session = pool.getSession(user, password, false);
        } catch (NotValidConnectionException E) {
            e.printStackTrace();
        } catch (IOErrorException E) {
           e.printStackTrace();
        } catch (AuthFailedException E) {
            e.printStackTrace();
        }

        // 切换图空间
        val resp = execute(String.format("USE %s;", spacE));

        if (resp == null || !resp.isSucceeded()) {
            System.out.println("切换图空间失败!" + spacE);
        }
    }
}

 

二、NebulaSink

有了NebulaUtil,实现NebulaSink就非常简单了,每个方法里只有几行代码:

import lombok.val;/**
 * Sink到Nebula数据库
 */
public class NebulaSink extends RichSinkFunction<List<String>> {
    /**
     * 打开连接
     *
     * @param parameters 配置参数
     */
    @Override
    public void open(Configuration parameters) {
    }

    /**
     * 调用
     *
     * @param nGQLList NGQL列表
     * @param context  上下文
     */
    @Override
    public void invoke(List<String> nGQLList, Context context) {
        for (val nGQL : nGQLList) {
            NebulaUtil.geTinstance().execute(nGQL);
        }
    }

    /**
     * 关闭连接
     */
    @Override
    public void close() throws Exception {
        super.close();

        NebulaUtil.geTinstance().releaseSession();
    }
}

 

三、将Vertex及Edge数据组装成NGQL语句

有了NebulaUtil以及NebulaSink后,Sink到Nebula之前,我们主要的工作就是将Vertex及Edge数据,组装对应的NGQL语句即可。

 

大佬总结

以上是大佬教程为你收集整理的将Flink计算完毕后的数据Sink到Nebula全部内容,希望文章能够帮你解决将Flink计算完毕后的数据Sink到Nebula所遇到的程序开发问题。

如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。