大佬教程收集整理的这篇文章主要介绍了实时数据湖在字节跳动的实践,大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。
数据湖的概念是比较宽泛的,不同的人可能有着不同的解读。这个名词诞生以来,在不同的阶段被赋予了不同的含义。
数据湖的概念最早是在 Hadoop World 大会上提出的。当时的提出者给数据湖赋予了一个非常抽象的含义,他认为它能解决数据集市面临的一些重要问题。
其中最主要的两个问题是:首先,数据集市只保留了部分属性,只能解决预先定义好的问题;另外,数据集市中反映细节的原始数据丢失了,限制了通过数据解决问题。从解决问题的角度出发,希望有一个合适的存储来保存这些明细的、未加工的数据。因此在这个阶段,人们对数据湖的解读更多的是聚焦在中心化的存储之上。
不同的云厂商也把自己的对象产存储产品称为数据湖。比如 AWS 在那个阶段就强调数据湖的存储属性,对应的就是自家的对象存储 S3。在 Wiki 的定义中也是强调数据湖是一个中心化存储,可以存海量的不同种类的数据。但是当对象存储满足了大家对存储海量数据的诉求之后,人们对数据湖的解读又发生了变化。
第二阶段,对数据湖的解读更多的是从开源社区和背后的商业公司发起的。比如 Databricks 作为一个云中立的产品,它将云厂商的这个对象存储称为 data lakes storage,然后把自己的重心聚焦在如何基于一个中心化的存储构建一个数据分析、数据科学和机器学习的数据湖解决方案,并且把这个方案称之为 lake。他们认为在这个中心化的存储之上构建事务层、索引层,元数据层,可以去解决数据湖上的可靠性、性能和安全的问题。
与此同时,Uber 最初也将 Hudi 对外称为一个事务型的数据湖,名字实际上也是由 Hadoop updates and Incrementals 缩写而来,最早也是被用于解决 Uber 内部离线数据的合规问题。现在他们更倾向的定义是一个流式数据湖平台,Iceberg 也常常被人们纳入数据湖的讨论。尽管 Ryan Blue 一直宣称 Iceberg 是一个 Open Table Format。这三者有一些共同点,一个是对 ACID 的支持,引入了一个事务层,第二是对 streaming 和 batch 的同等支持,第三就是聚焦在如何能更快的查询数据。国内也有人将 Hudi、Iceberg、Delta Lake 称为数据湖的三剑客。
讲完了业界的解读,来看一下字节跳动对数据湖的解读。我们是结合字节的业务场景来解读的。通过实践总结,我们发现数据湖需要具备六大能力:
第二是智能的查询加速。 用户使用数据湖的时候,不希望感知到数据湖的底层实现细节,数据湖的解决方案应该能够自动地优化数据分布,提供稳定的产品性能。
第三是批流一体的存储。 数据湖这个技术出现以来,被数仓行业给予了厚望,他们认为数据湖可以最终去解决一份存储流批两种使用方式的问题,从而从根本上提升开发效率和数据质量。
第四是统一的元数据和权限。 在一个企业级的数据湖当中,元数据和权限肯定是不能少的。同时在湖仓共存的情况下,用户不希望元数据和权限在湖仓两种情况下是割裂的。
第五是极致的查询性能。 用户对于数据湖的期望就是能够在数据实时入湖的同时还能做到数据的秒级可视化。
第六是 AI + BI。 数据湖数据的对外输出,不只局限于 BI,同时 AI 也是数据湖的一等公民,数据湖也被应用在了字节的整个推荐体系,尤其是特征工程当中。实时数据湖其实是数据湖之上,更加注重数据的实时属性或者说流属性的一个数据湖发展方向。当然,正如业界对于数据湖的解读一直在演变,我们对数据湖的解读也不会局限于以上场景和功能。
接下来介绍数据湖落地的挑战和应对。字节内部的数据湖最初是基于开源的数据湖框架 Hudi 构建的,选择 Hudi,最简单的一个原因就是因为相比于 Iceberg 和 Delta Lake,Hudi 原生支持可扩展的索引系统,能够帮助数据快速定位到所在的位置,达到高效更新的效果。
在尝试规模化落地的过程中,我们主要遇到了四个挑战:数据难管理,并发更新弱,更新性能差,以及日志难入湖。
接下来会一一介绍这些挑战背后出现的原因以及我们应对的策略。
下图是一个典型的基于中心化存储构建数仓机器学习和数据科学的架构。这里将加工过后的数据保存在数仓中,通过数仓的元数据进行组织。数据科学家和机器学习框架都会直接去这个中心化的存储中获取原始数据。因此在这个中心化存储之上的数据对用户来说是完全分散的,没有一个全局的视图。
我们在数据湖和数仓之上,构建了一层统一的元数据层,这层元数据层屏蔽了下层各个系统的元数据的异构性,由统一的元数据层去对接 BI 工具,对接计算引擎,以及数据开发、治理和权限管控的一系列数据工具。而这一层对外暴露的 API 是与 Hive 兼容的。尽管 Hive 这个引擎已经逐渐被其他的更新的计算引擎代替了,比如 Spark、Presto、Flink,但是它的源数据管理依旧是业界的事实标准。另外一些云厂商即使选择构建了自己的元数据服务,也都同时提供了和 HMS 兼容的元数据查询接口,各个计算引擎也都内置了 Hive Catalog 这一层。
这就@R_22_9446@湖管理元数据的特殊性。以 Hudi 为例,作为一个典型的事务型数据湖,Hudi 使用时间线 Timeline 来追踪针对表的各种操作。比如 commit compaction clean, Timeline 类似于数据湖里的事务管理器,记录对表的更改情况。而这些更改或事务记录了每次更新的操作是发生在哪些文件当中,哪些文件为新增,哪些文件失效,哪些数据新增,哪些数据更新。
第一个问题就是分区的元数据是分散在两个系统当中的,缺乏 single source of true。第二个是分区的元数据的获取需要从 HDFS 拉取多个文件,没有办法给出类似于 HMS 这样的秒级访问响应。服务在线的数据应用和开发工具时,这个延迟是没有办法满足需求的。第三个是读表的时候需要拉取大量的目录和 Timeline 上记录的表操作对应的元数据进行比对,找出最新的这个版本包含的文件。元数据读取本身就很重,并且缺乏裁剪能力,这在近实时的场景下带来了比较大的 overhead。
Hudi Metastore Server 融合了 Hive Metastore 和 Hudi MetaData 管理的优势。首先,Hudi Metastore Server 提供了多租户的、中心化的元数据管理服务,将文件一级的元数据保存在适合随机读写的存储中,让数据湖的元数据不再分散在多个文件当中,满足了 single source of true。其次,Hudi Metastore Server 针对元数据的查询,尤其是一些变更操作。比如 Job position 提供了与 Hive Metastore 完全兼容的接口,用户在使用一张数据湖上的表的时候,享受到这些增加的高效更新、删除、增量消费等能力的同时,也能享受到一张 Hive 表所具备的功能,例如通过 Spark、Flink、Presto 查询,以及在一些数据开发工具上在线的去获取到元数据以及一些分区 TTL 清理的能力。此外,Hudi Metastore Server 还解决了一个关键性的问题,就是多任务并发更新弱的问题。
我们最早是基于 Hudi 社区的 0.7 版本的内核进行研发的,当时 Hudi 的 Timeline 中的操作必须是完全顺序的,每一个新的事务都会去回滚之前未完成的事务,因此无法支持并发写入。后续社区也实现了一个并发写入的方案,整体是基于分布式锁实现的,并且只支持了 Spark COW 表的并发写,并不适用于 Flink 或者实时的 MOR 表。但是多任务的并发写入是我们内部实践当中一个非常通用的诉求。因此我们在 Hudi Metastore Server 的 Timeline 之上,使用乐观锁去重新实现了这个并发的更新能力。同时我们这个并发控制模块还能支持更灵活的行列级别并发写策略,为后续要介绍到的实时数据关联的场景的落地提供了一个可能。
除了多任务的并发写入之外,我们在单个 Flink 任务的并发写入也遇到了瓶颈。由于 Hudi 设计之初严重依赖 Spark。0.7.0 的版本才刚刚支持 Flink。不管是在稳定性还是在功能上都和 Spark On Hudi 有非常大的差距。因此在进行高 QPS 入湖的情况下,我们就遇到了单个 Flink 任务的扩展性问题。
我们通过在 Flink 的 embedding term server 上支持对当前进行中的事务元信息进行一下缓存,大幅提升了单个任务能够并发写入的文件量级,基本上是在 80 倍的量级。结合分区级别的并发写入,我们整体支撑了近千万 QPS 的数据量的增量入湖。
下一步的并发问题是批流并发冲突的问题。批流并发冲突问题类似于一个我们在传统数据湖中遇到的场景,就是有一连串的小事务和一个周期比较长的长事务,如果这两者发生冲突,应该如何处理。
为了解决批流冲突的问题,我们的思路是提供更灵活的冲突检查和数据合并策略。最基础的就是行级并发,首先两个独立的 writer 写入的数据在物理上就是隔离的,借助文件系统的租约机制也能够保证对于一个文件同时只有一个 writer。所以这个冲突实际上不是发生在数据层面的,而是发生在元数据层面。那数据的冲突与否,就可以交由用户来定义。很多时候入湖的数据实际上并不是一个现实中正在发生的事情,而是一个现实操作的回放。比如图中的这个场景,我们假设删除的作业是针对一个特定的 Snapshot。即使有冲突,我们可以认为整个删除的过程是瞬时完成的,后续的新事物可以追加的发生在这次删除作业之后。
最后就是冲突合并。假如两个数据真的是在行级别和列级别都发生了冲突,那真的只能通过 fail 掉一个事务才能完成吗?我觉得是不一定的,这里我们受到了 git 的启发。假如两次 commit 冲突了,我们是不是可以提供 merge 值的策略,比如数据中带有时间戳,在合并时就可以按照时间戳的先后顺序来做合并。
我们最早选择基于 Hudi 也是因为可扩展的索引系统,通过这个索引系统可以快速地定位到需要跟新的文件。这带来了三点好处,一个是避免读取不需要的文件;二是避免更新不必要的文件;三是避免将更新的数据和历史的数据做分布式关联,而是通过提前将文件分好组的方式直接在文件组内进行合并。
根本原因是 Bloom Filter 存在假阳性,一旦命中假阳性的 case,我们就需要把整个文件组中的主键链读取上来,再进一步地去判断这个数据是否已经存在。通过这种方式来区分这个到底是 update 还是 insert。upsert 本身就是 update 和 insert 两个操作的结合,如果发现相同组件数据不存在,就进行 insert。如果存在,我们就进行 update。而 Bloom Filter 由于假阳性的存在,只能加速数据的 insert 而没有办法去加速 update。这就和我们观察到的现象很一致。因为这个 pipeline 在运行初期,大部分数据都是第一次入湖,是 insert 操作,因此可以被索引加速。但是规模达到一定量级之后,大部分数据都是更新操作,没有办法再被索引加速。为了解决这个问题,我们急需一个更稳定更高效的索引。
Bloom Filter 索引的问题,根因是读取历史数据进行定位,导致定位的时间越来越长。那有没有什么办法是无需读历史数据,也可以快速定位到数据所在位置呢?很自然的,我们就想到了类似于 Hive 的 bucket,也就是哈希的方法来解决这个问题。
当然这种索引方式我们也遇到了一些扩展性的问题,用户需要提前一步做桶数的容量规划,给一个比较安全的值,避免单个桶扩大,以便应对接下来的数据增长。在数据倾斜的场景下,为了让倾斜值尽可能分散在不同的 bucket,会将 bucket 的数量调到很大。而每个 bucket 平均大小很小,会带来大量的小文件,给文件系统带来冲击的同时也会带来查询侧性能下滑和写入侧的资源浪费。同时在一线快速增长的业务,很难对容量有一个精准的预估。如果估算少了,数据量飞速增长,单个的 bucket 的平均大小就会很大,这就会导致写入和查询的并发度不足,影响性能。如果估算多了,就会和倾斜的场景一样出现大量的小文件。整体的 rehash 又是一个很重的运维操作,会直接影响业务侧对数据的生产和使用。因此不管从业务的易用性出发,还是考虑到资源的使用率和查询的效率,我们认为兼具高效导入和查询性能,也能支持弹性扩展的索引系统是一个重要的方向。
这时我们就想到了可扩展 hash 这个数据结构。利用这个结构,我们可以很自然地去做桶的分裂和合并,让整个 bucket 的索引从手动驾驶进化到自动驾驶。 在数据写入的时候,我们也可以快速地根据现有的总数,推断出最深的有效哈希值的长度,通过不断地对 2 的桶深度次方进行取余的方式,匹配到最接近的分桶写入。我们将 Bucket Index 这个索引贡献到了社区,已在 Hudi 的 0.11 版本对外发布。
本质原因也是因为 Hudi 的索引系统。因为这个索引系统要求数据按照组件聚集,一个最简单的方式就是把这个组件设成 UUID。但这样就会带来性能上的问题以及资源上的浪费。因此我们在 Hudi 之内实现了一套新的机制,我们认为是无索引。就是绕过 Hudi 的索引机制,去做到数据的实时入湖。同时因为没有主键, Upsert 的能力也失效了。我们提供了用更通用的 update 能力,通过 shuffle hash join 和 broadcast join 去完数据实时更新。
接下来详细介绍实时数据湖在字节的实践场景。电商是字节发展非常快速的业务之一,数据增长非常快,这也对数仓的建设提出了较高的要求。目前电商业务数据还是典型的 lambda 架构,分为是离线数仓和实时数仓建设。在实际场景中, lambda 架构的问题相信大家都已经比较了解了,我就不多做赘述了。这次的场景介绍是围绕一个主题,通过数据湖来构建实时数仓,使实时数据湖切入到实时数仓的建设当中。这不是一蹴而就的,是分阶段一步一步渗透到实时数仓的建设当中。而实时数据湖的终极目标也是在存储侧形成一个真正意义上的批流一体的架构。
在采用数据湖的方案之前,数据湖的验证环节需要将结果导入到 Kafka 然后再 dump 到 Hive。进行全量数据校验。这里存在的一个问题就是数据无法实时或者近实时可见可检的,基本上都是一个小时级的延迟。在很多紧急上线的场景下,因为延时的问题,只能去抽测数据进行测试验证,就会影响数据质量。实时数据湖的方案,是通过将实时数据低成本的增量导入到数据湖中,然后通过 Presto 进行查询,然后进行实时计算汇总,计算的结果做到近实时的全面的可见可测。
我们借助了 Hudi Metastore Server 的能力,封装了大量的参数。同时使用 Flink Catalog 的能力,对 Meta Server 进一步封装,让用户在配置一个 Fink SQL 任务的时候,从最初的写 DDL 配置十多个参数,到现在只要写一条 create table like 的语句,配置一张临时表,用户对这种方式的接受度普遍是比较高的。
第二个阶段,也就是第二个应用场景是数据的实时入湖和实时分析。数据湖可以同时满足高效的实时数据增量导入和交互式分析的需求,让数据分析师可以自助地去搭建看板,同时也可以进行低成本的数据回刷,真正做到一份数据批流两种使用方式。在这个阶段,由于数据实际上已经开始生产了,用户对于数据入湖的稳定性和查询性能都有很高的要求。我们通过将 Compaction 任务与实时导入任务拆分,首先解决了资源抢占导致的入湖时效性比较低的问题,同时设计了 compaction @R_874_9260@ce,负责 compaction 任务的调度,整个过程对业务侧同学完全屏蔽。我们在服务层面也对报警和监控进行了加强,能够做到先于业务去发现问题,处理问题,进一步提升了任务的稳定性,也让我们的使用方能够更有信心地去使用实时数据湖。
第三个阶段的应用场景是数据的实时多维汇总。在这个阶短最重要的目标是实时数据的普惠。因为很多的实时数据使用方都是通过可视化查询或者是数据服务去消费一个特定的汇总数据。而这些重度汇总过后的实时数据使用率相对来说是比较低的。因此我们和数仓的同学共同推进了一个实时多维汇总的方案落地。数仓的同学通过实时计算引擎完成数据的多维度的轻度汇总,并且实时地更新入湖。下游可以灵活地按需获取重度汇总的数据,这种方式可以缩短数据链路,提升研发效能。
下面一个问题,多维重度汇总的多维计算结果是从我们湖里批量读出来,然后定时地去写入 KV 存储,由存储去直接对接数据产品。从长期来看,我们下一步计划就是对实时数据湖之上的表去进行自动地构建物化视图,并且加载进缓存,以此来兼顾灵活性和查询性能,让用户在享受这种低运维成本的同时,又能满足低延低查询延迟、高查询频率和灵活使用的诉求。
第四个典型的场景是实时数据关联,数据的关联在数仓中是一个非常基础的诉求。数仓的同学需要将多个流的指标和维度列进行关联,形成一张宽表。但是使用维表 join,尤其是通过缓存加速的方式,数据准确性往往很难保障。而使用多流 join 的方式又需要维持一个大状态,尤其是对于一些关联周期不太确定的场景,稳定性和准确性之间往往很难取舍。
最后一个阶段。我们认为是实时数据湖的终极阶段,目前仍在探索中。我们只在部分场景开启了验证。在这个架构里面,数据可以从外部的不同数据源中实时或者批量的入湖和出湖,而流批作业完成湖内的数据实时流转,形成真正意义上的存储层批流一体。
最后来看一下未来规划。主要聚焦于三个维度:功能层面的规划,开源层面的规划,以及商业化输出相关的一些规划。
首先是功能维度,我们认为一个更智能的实时数据湖的加速系统是我们最重要的目标之一。
其次就是数据的加速。当前的实时数据湖由于其 serverless 架构对文件系统的重度依赖,在生产实践中还是处于分钟级,秒级依旧处于验证阶段。那我们接下来计划将这个数据湖加速服务不断地去打磨成熟,用来做实时数据的交换和热数据的存储,以解决分钟级到秒级的最后一公里问题。智能加速层面临的最大的挑战是批流数据写入的一致性问题,这也是我们接下来重点要解决的问题。例如在这种端到端的实时生产链路中,如何在提供秒级延时的前提下解决类似于跨表事务的问题。
第三是索引加速。通过 bucket, zorder 等一系列的主键索引,来进一步地提升数据湖之上的数据的查询性能,过滤掉大量的原始数据,避免无效的数据交换。同时我们接下来也会非常注重二级索引的支持,因为二级索引的支持可以延伸湖上数据的更新能力,从而去加速非主线更新的效率。
第四是智能优化。我们接下来会通过一套表优化服务来实现智能优化,因为对于两个类似的查询能否去提供一个稳定的查询性能,表的数据分布是一个关键因素。而从用户的角度来看,用户只要查询快、写入快,像类似于 compaction 或 clustering、索引构建等一系列的表优化的方式,只会提升用户的使用门槛。而我们的计划是通过一个智能的表优化服务分析用户的查询特征,去同时监听这个数据湖上数据的变化,自适应的去触发这个表的一系列的优化操作,可以做到在用户不需要了解过多细节的情况下,做到智能的互加速。 2. 开源层面
第二个维度是开源贡献。我们现在一直在积极地投入到 Hudi 的社区贡献当中。参与了多个 Hudi 的核心 feature 的开发和设计。其中 Bucket index 是我们合入到社区的第一个核心功能,而当下我们也在同时贡献着多个重要的功能,比如最早提到的解决数据难管理的 Hudi MetaStore Server,我们已经贡献到社区了,去普惠到开源社区。因为我们发现 Hudi MetaStore Server 不止解决我们在生产实践中遇到的问题,也是业界普遍遇到的一个问题。现在也在跟 Hudi 社区的 PMC 共同探讨数据湖的元数据管理系统制定标准。
另外一个产品 EMR 是一个 Stateless 的云原生数仓,100%开源兼容,在这个产品当中也会包含字节数据湖实践中一些开源兼容的优化,以及一些引擎的企业级增强,以及云上便捷的运维能力。
最后,欢迎大家关注字节跳动数据平台公众号,在这里有非常多的技术干货、产品动态和招聘信息。 立即跳转了解:火山引擎LAS或 火山引擎EMR 产品
以上是大佬教程为你收集整理的实时数据湖在字节跳动的实践全部内容,希望文章能够帮你解决实时数据湖在字节跳动的实践所遇到的程序开发问题。
如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。