大佬教程收集整理的这篇文章主要介绍了FLINK实例(132):FLINK-SQL应用场景(23) CONNECTORS(23) 自定义 redis 数据维表(作为source表)(附源码),大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。
来源:https://mp.weixin.qq.com/s/b_zV_tGp5QJQjgnSaxNT_Q
本文主要介绍了 flink sql redis 维表的实现过程。
如果想在本地测试下:
set a "{"score":3,"name":"namehhh","name1":"namehhh112"}"
flink.examples.sql._03.source_sink.redisLookupTest
测试类,就可以在 console 中看到结果。如果想直接在集群环境使用:
flink-examples-0.0.1-SNAPSHOT.jar
引入 flink lib 中即可,无需其它设置。2.1.啥是维表?事实表?
Dimension Table 概念多出现于数据仓库里面,维表与事实表相互对应。
给两个场景来看看:
比如需要统计分性别的 DAU:
https://blog.csdn.net/weixin_47482194/article/details/105855116?spm=1001.2014.3001.5501
比如目前想要统计整体销售额:
事实数据和维度数据的识别必须依据具体的主题问题而定。“事实表” 用来存储事实的度量及指向各个维的外键值。维表用来保存该维的元数据。
参考:https://blog.csdn.net/lindan1984/article/details/96566626
2.2.为啥需要 redis 维表?
目前在实时计算的场景中,熟悉 datastream 的同学大多数都使用过 mysqlHbaseredis 作为维表引擎存储一些维度数据,然后在 datastream api 中调用 mysqlHbaseredis 客户端去获取到维度数据进行维度扩充。
而 redis 作为 flink 实时场景中最常用的高速维表引擎,官方是没有提供 flink sql api 的 redis 维表 connector 的。如下图,基于 1.13 版本。
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/
阿里云 flink 是提供了这个能力的。但是这个需要使用阿里云的产品才能使用。有钱人可以直接上。
https://www.alibaBACloud.com/Help/zh/faq-detail/122722.htm?spm=a2c63.q38357.a3.7.a1227a53TBMuSY
因此本文在介绍怎样自定义一个 sql 数据维表的同时,实现一个 sql redis 来给大家使用。
redis 作为维表在 datastream 中的最常用的数据结构就是 kv、hmap 两种。本文实现主要实现 kv 结构,map 结构大家可以拿到源码之后进行自定义实现。也就多加几行代码就完事了。
预期效果就如阿里云的 flink redis:
下面是我在本地跑的结果,先看看 redis 中存储的数据,只有这一条数据,是 json 字符串:
下面是预期 flink sql:
CREATE TABLE dimTable ( name StriNG, name1 StriNG, score BIGINT -- redis 中存储数据的 scheR_441_11845@a ) WITH ( 'connector' = 'redis', -- 指定 connector 是 redis 类型的 'hostname' = '127.0.0.1', -- redis server ip 'port' = '6379', -- redis server 端口 'format' = 'json' -- 指定 format 解析格式 'lookup.cache.max-rows' = '500', -- guava local cache 最大条目 'lookup.cache.ttl' = '3600', -- guava local cache ttl 'lookup.max-retries' = '1' -- redis 命令执行失败后重复次数 )
SELECT o.f0, o.f1, c.name, c.name1, c.score FROM leftTable AS o -- 维表 join LEFT JOIN dimTable FOR SYSTEM_TIME AS OF o.proctime AS c ON o.f0 = c.nam
结果如下,后面三列就对应到 c.name, c.name1, c.score
:
+I[a, b, namehhh, namehhh112, 3] +I[a, b, namehhh, namehhh112, 3] +I[a, b, namehhh, namehhh112, 3] +I[a, b, namehhh, namehhh112, 3] +I[a, b, namehhh, namehhh112, 3] +I[a, b, namehhh, namehhh112, 3] +I[a, b, namehhh, namehhh112, 3] +I[a, b, namehhh, namehhh112, 3] +I[a, b, namehhh, namehhh112, 3]
因此博主在实现时,就定了一个基调。
在实现 redis 维表之前,不得不谈谈 flink 维表加载和使用机制。
5.1.flink 维表原理
其实上节已经详细描述了 flink sql 对于 sourcesink 的加载机制,维表属于 source 的中的 lookup 表,在具体 flink 程序运行的过程之中可以简单的理解为一个 map,在 map 中调用 redis-client 接口访问 redis 进行扩充维度的过程。
Factory
如图 source 和 sink 是通过 FactoryUtil.createTablesource
和 FactoryUtil.createTableSink
创建的
所有通过 SPI 的 sourcesinkformt 插件都继承自 Factory
。
整体创建 source 方法的调用链如下图。
5.2.flink 维表实现方案
先看下博主的最终实现。
总重要的三个实现类:
具体流程:
redisDynamicTableFactory implements DynamicTablesourceFactory
,并且在 resourceMETA-INF 下创建 SPI 的插件文件redis
redisDynamicTableFactory#createDynamicTablesource
来创建对应的 source redisDynamicTablesource
redisDynamicTablesource implements LookupTablesource
redisDynamicTableFactory#getLookupRuntimeProvider
方法,创建具体的维表 UDF TableFunction<T>
,定义为 redisRowDataLookupFunction
redisRowDataLookupFunction
的 eval 方法,这个方法就是用于访问 redis 扩充维度的。介绍完流程,进入具体实现方案细节:
redisDynamicTableFactory
主要创建 source 的逻辑:
public class redisDynamicTableFactory implements DynamicTablesourceFactory { ... @Override public String factoryIdentifier() { // 标识 redis return "redis"; } @Override public DynamicTablesource createDynamicTablesource(Context context) { // either implement your custom validation logic here ... // or use the provided Helper utility final FactoryUtil.TableFactoryHelper Helper = FactoryUtil.createTableFactoryHelper(this, context); // discover a suitable decoding format // format 实现 final DecodingFormat<DeserializationscheR_441_11845@a<RowData>> decodingFormat = Helper.discoverDecodingFormat( DeserializationFormatFactory.class, FactoryUtil.FORMAT); // validate all options // 所有 option 配置的校验,比如 cache 类参数 Helper.validate(); // get the validated options final ReadableConfig options = Helper.getOptions(); final redisLookupOptions redisLookupOptions = redisOptions.getredisLookupOptions(options); TablescheR_441_11845@a scheR_441_11845@a = context.getCatalogTable().getscheR_441_11845@a(); // 创建 redisDynamicTablesource return new redisDynamicTablesource( scheR_441_11845@a.toPhysicalRowDataType() , decodingFormat , redisLookupOptions); } }
resourcesMETA-INF 文件:
redisDynamicTablesource
主要创建 table udf 的逻辑:
public class redisDynamicTablesource implements LookupTablesource { ... @Override public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { // 初始化 redis 客户端配置 FlinkjedisConfigBase flinkjedisConfigBase = new FlinkjedisPoolConfig.builder() .setHost(this.redisLookupOptions.getHostname()) .setPort(this.redisLookupOptions.getPort()) .build(); // redis key,value 序列化器 LookupredisR_441_11845@apper lookupredisR_441_11845@apper = new LookupredisR_441_11845@apper( this.createDeserialization(context, this.decodingFormat, createValueFormatProjection(this.physicalDataTypE))); // 创建 table udf return TableFunctionProvider.of(new redisRowDataLookupFunction( flinkjedisConfigBase , lookupredisR_441_11845@apper , this.redisLookupOptions)); } }
redisRowDataLookupFunction
table udf 执行维表关联的主要流程:
public class redisRowDataLookupFunction extends TableFunction<RowData> { ... /** * 具体 redis 执行方法 */ public void eval(Object... objects) throws IOException { for (int retry = 0; retry <= maxRetryTimes; retry++) { try { // fetch result this.evaler.accept(objects); break; } catch (Exception E) { LOG.error(String.format("HBase lookup error, retry times = %d", retry), E); if (retry >= maxRetryTimes) { throw new RuntimeException("Execution of redis lookup failed.", E); } try { Thread.sleep(1000 * retry); } catch (InterruptedException e1) { throw new RuntimeException(e1); } } } } @Override public void open(FunctionContext context) { LOG.info("start open ..."); // redis 命令执行器,初始化 redis 链接 try { this.redisCommandsContainer = redisCommandsContainerBuilder .build(this.flinkjedisConfigBasE); this.redisCommandsContainer.open(); } catch (Exception E) { LOG.error("redis has not been properly initialized: ", E); throw new RuntimeException(E); } // 初始化 local cache this.cache = cacheMaxSize <= 0 || cacheExpireMs <= 0 ? null : CacheBuilder.newBuilder() .recordStats() .expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS) .maximumSize(cacheMaxSizE) .build(); if (cache != null) { context.getMetricGroup() .gauge("lookupCacheHitRate", (Gauge<Double>) () -> cache.stats().hitRate()); this.evaler = in -> { RowData cacheRowData = cache.getIfPresent(in); if (cacheRowData != null) { collect(cacheRowData); } else { // fetch result byte[] key = lookupredisR_441_11845@apper.serialize(in); byte[] value = null; switch (redisCommand) { case GET: value = this.redisCommandsContainer.get(key); break; case HGET: value = this.redisCommandsContainer.hget(key, this.additionalKey.getBytes()); break; default: throw new IllegalArgumentexception("CAnnot process such data type: " + redisCommand); } RowData rowData = this.lookupredisR_441_11845@apper.deserialize(value); collect(rowData); cache.put(key, rowData); } }; } ... } }
5.2.1.复用 bahir connector
如图是 bahir redis connector 的实现。
可以看到目录结构是与 bahir redis connector 一致的。
其中 redis 客户端及其配置
是直接复用了 bahir redis 的。由于 bahir redis 基本都是 sink 实现,某些实现没法继承复用,所以这里我单独开辟了目录,redis 命令执行器
和 redis 命令定义器
,但是也基本和 bahir 一致。如果你想要在生产环境中进行使用,可以直接将两部分代码合并,成本很低。
5.2.2.复用 format
博主直接复用了 flink 本身自带的 format 机制来作为维表反序列化机制。参考 HBase connector 实现将 cache 命中率添加到 metric 中。
public class redisDynamicTableFactory implements DynamicTablesourceFactory { ... @Override public DynamicTablesource createDynamicTablesource(Context context) { ... // discover a suitable decoding format // 复用 format 实现 final DecodingFormat<DeserializationscheR_441_11845@a<RowData>> decodingFormat = Helper.discoverDecodingFormat( DeserializationFormatFactory.class, FactoryUtil.FORMAT); ... } }
format 同样也是 SPI 机制加载。
源码公众号后台回复flink sql 知其所以然(二)| sql 自定义 redis 数据维表获取。
5.2.3.维表 local cache
local cache 在初始化时可以指定 cache 大小,缓存时长等。
this.evaler = in -> { RowData cacheRowData = cache.getIfPresent(in); if (cacheRowData != null) { collect(cacheRowData); } else { // fetch result byte[] key = lookupredisR_441_11845@apper.serialize(in); byte[] value = null; switch (redisCommand) { case GET: value = this.redisCommandsContainer.get(key); break; case HGET: value = this.redisCommandsContainer.hget(key, this.additionalKey.getBytes()); break; default: throw new IllegalArgumentexception("CAnnot process such data type: " + redisCommand); } RowData rowData = this.lookupredisR_441_11845@apper.deserialize(value); collect(rowData); cache.put(key, rowData); } };
以上是大佬教程为你收集整理的FLINK实例(132):FLINK-SQL应用场景(23) CONNECTORS(23) 自定义 redis 数据维表(作为source表)(附源码)全部内容,希望文章能够帮你解决FLINK实例(132):FLINK-SQL应用场景(23) CONNECTORS(23) 自定义 redis 数据维表(作为source表)(附源码)所遇到的程序开发问题。
如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。