本日的先容会环绕下面五点展开:

字节基于 Hudi 的批流一体存储实践_数据_离线 科技快讯

1. 背景与寻衅

2. 设计方案

3. 落地场景

4. 运用案例

5. Q&A环节

分享高朋|耿筱喻 字节跳动 火山引擎 LAS 技能专家

编辑整理|阿东同学

内容校正|李瑶

出品社区|DataFun

01

背景与寻衅

首先来先容一下干系背景。

传统数仓存在实时和离线两条链路,来知足业务对付时效数据的时效性和数据量的不同需求。
离线会掩护历史的全量视图,实时会掩护增量视图,末了在做事层去进行数据的汇总,从而支持后续的在线的serving、 OLAP 查询以及看板的运用等等。

由于处理场景的差异,在实时和离线数仓的详细实现上,依赖的底层存储打算引擎基本上是完备隔离的,实时依赖的紧张因此 Flink 为代表的流式打算引擎来做打算,而离线依赖紧张因此 Spark 为代表的引擎,实时紧张依赖 KV 或 MQ 这样的多种存储选型。
离线则常常采取 Hive 为代表的存储引擎,传统的数仓架构,它实质上结合了流打算和 批打算的上风,通过两套代码来兼容实时数据和离线数据的上风,填补各自的缺陷。
但是这两套架构或代码也带来了两倍的资源本钱,并且由于底层打算引擎的不同,对付相同算子的处理的语义不是完备同等的,它们的打算结果就会存在差异。
以是对付研发同学来说,这部分差异也会给数据校验等其它一些事情带来额外的包袱。

总结下来,传统数仓的 Lambda 架构紧张存在三个问题要办理:第一个是一套打算逻辑,但须要写离线和实时两套代码,带来了两倍的运维本钱;第二是离线实时两条链路带来了资源冗余的问题,双倍的资源的本钱;第三是两套引擎打算口径不完备对齐,导致数据校准方面会有比较大的困难。
以是业内提出了希望通过批流一体来办理当前传统 Lambda 架构的问题。

基于上述问题,再结合内部的实际场景,批流一体的诉求可以分为两种,第一种是打算的批流一体,第二种是存储的批流一体。
打算的批流一体指的是希望通过底层一套系统,业务层的一套代码同时知足离线和实时的开拓需求,从而办理两套系统带来的研发效率、人工本钱、运维本钱等资源成本相关的问题。
其余我们希望一套系统能够对齐打算指标的口径来办理数据同等性的问题。

存储的流批一体包括,第一是实时和离线所在的存储统一,第二是实时和离线的数据能够复用。
实时和离线存储统一是指我们希望实时和离线能够利用统一选型的存储,这就哀求存储能够知足大规模、全量和增量数据的读写诉求。
批流数据复用指的是批处理能够利用流处理的结果数据,提升全体离线数仓的产出韶光。
范例的 case 便是 ODS 层的数据复用,其余流处理也能够复用批处理的数据来办理链路冷启动的时候,须要将离线的数据回灌到实时存储中的额外本钱,我们通过 LAS 去实现了批流一体的能力。

在打算层,我们的办理思路是对外暴露统一的 SQL,底层根据 SQL 处理的场景选择不同的实行引擎去实行,对用户屏蔽以底层实行引擎的差异性。
之以是这样做的缘故原由是我们认为不同引擎适用于不同场景,很难找到引擎能够同时知足实时和离线,不同时效性和数据规模的哀求,在 SQL 层,我们对齐了底层实行引擎的差异性来实现打算口径的对齐,办理了上面说到的同等性问题。

在存储层,我们基于湖仓一体的架构,通过数据湖实现了批流一体存储的能力。
除了能够支持流式的增量和批式的全量读写之外,我们还支持了高效的 OLAP 查询能力以及维表 join 的能力。

既然提到了LAS,我们看一下 LAS 的整体构造, LAS全称是 Lakehouse Analysis Service。
湖仓一体的数据剖析做事,领悟了湖与仓的上风,既能够利用湖的上风将所有的数据都存储到廉价存储中,供积极学习、数据剖析等场景利用,又能够基于数据湖构建数仓,让BI、报表等业务场景去利用。

LAS具有如下一些特性:首先是能够支持统一的元数据,避免在数据湖中存在数据孤岛的问题,每个数据都是可追溯的。
第二个是依托数据湖供应ACID 的能力。
第三点是机器支持企业级的权限管控。
第四点是支持资源的极致弹性扩缩容,降落用户的利用本钱。
末了是引擎的内核的极致优化,供应高效的读写性能。

LAS 整体架构,最上面一层是湖仓开拓工具,为数据运用处景供应能力支撑。
下面一层是数据剖析引擎,支持流批一体SQL,办理打算批流一体的问题,并且支持根据 SQL 的特点去做引擎的智能选择和加速。
针对 OLAP 剖析,我们会将 SQL 路由到不同的实行引擎去实行,比如对 Ad-hoc我们会用 Presto 去进行查询。
再往下一层是统一元数据层,末了一层是基于 Hudi 去实现的流批存储层。

本文会聚焦在流批一体存储的细节实现上。

我们须要剖析现有的离线数仓和实时数仓的详细需求,来考虑流批一体存储的实现办法。
离线数仓的整体构造分层相对来说还是比较清晰的,利用的存储也会比较单一,紧张是Spark 加 Hive 的形式,供应高效的数据处理和吞吐能力,能够支持离线数据回溯场景下的并发更新。
但是实时数仓的利用存储相对来说会繁芜一些,一样平常会依托 Kafka 或 MQ 进行每一层数据表的构建,为了支持高效的 join 的性能,在维表的存储选型上,我们每每会根据数据量的差异选择KV、Hbase 或者是 MySQL 去进行存储。

在整体的 DWS 数仓链路梳理完之后,到了数据运用层会对接 ClickHouse、Doris这样的高效的 OLAP 引擎,去对外供应计时的数据看板报表等等。
数据运用层还会有一些 serving 的功能,做事层会将数据写到 KV 或者 MySQL 或者 ES 这些存储里,对外供应 serving 的做事。

在构建实时数仓新链路的时候,对付链路冷启动,须要利用历史分区的数据,以是我们须要将离线的数据回灌到实时链路的 MQ 里面,受限于 MQ 带宽的限定,整体的回溯周期可能会非常的长,并且操作很繁芜。
其余当打算指标有问题,或者是整体的打算口径须要调度的时候,也须要利用离线的数据去对实时数据进行回刷,同样它也会碰着回溯周期长,操作繁芜等一系列问题。

通过先容可以看出实时数据仓库整体相比拟较繁芜,存储办法和构建标准没有完备统一。
为了更风雅地剖析实时数据仓库对付流批一体存储的需求,我们基于数据量延迟、数据同等性哀求和打算周期等维度,将场景划分成了三类:日志打算、长周期打算和全量打算。

日志打算的场景特点在于数据量比较大,但是可以接管少量数据丢失。
大部分数据哀求在分钟内打算,并进行分组聚合。
但该场景的痛点在于希望通过批流数据复用和统一以提升数据时效性和降落资源本钱。

对付长周期打算场景,数据量相对中等。
须要对指标进行复合打算,但整体数据周期可能较长。
直播类业务场景可能持续一个月,数据哀求在秒级。
该场景的痛点在于冷启动和回溯过程繁芜、周期长、本钱高。

全量打算场景数据量不大,会将全量数据存储到Flink state中进行打算,哀求强同等性,时效性哀求在秒级别。
但该场景碰着的最大问题在于因数据存储到Flink state中,未进行分层构造,回溯的中间结果可能不透出,不太利于开拓职员进行调试操作。

总结来看,数仓对付存储的紧张需求可以概括为以下几点:其一,实时存储分歧一,运维繁芜;其二,实时离线存储分歧一,资源本钱高;其三,冷启动或回溯过程繁芜耗时;其四,无法查询中间数据。

因此,我们的批流一体存储方案,不仅要办理上述痛点,还须要具备以下基本能力:支持离线回溯场景的分区并发更新,且数据读写吞吐量不低于Hive。

对付流式场景的流批处理,须要知足低延迟的哀求,数据延迟约为几秒钟,并能够供应高吞吐量以支持千万级RPS。
此外,我们还须要供应支持Exactly-once和At-least-once数据同等性语义的功能。
为了实现整体的流批一体的目标,还须要支持多引擎,例如Spark、Flink的读写,同时也须要支持多种OLAP引擎进行查询。

02

设计方案

接下来看一下我们的流批一体存储方案,结合刚刚谈论的流批一体存储目标,我们创造现有的基于数据湖仓一体的架构,实际上已经可以知足大多数哀求了。
当前数据湖仓一体的架构已经支持所有数据入湖,并支持Spark、Flink引擎,同时也可以进行离线和实时的数据操作。
不才游数据运用方面,数据湖仓一体的架构还支持ihook、metastore、adhoc等OLAP查询办法。

在字节内部利用的场景中,业务会通过Flink实时将数据入湖,利用Spark批量回溯更新湖内的数据,并且下贱会利用Presto查询做事来触及下贱的看板。
因此,我们信息湖仓库紧张利用了Hudi这样的开源方案。
在功能方面,数据湖仓库基本符合了实时和离线数仓对付流批一体存储的需求,而这紧张是由于Hudi本身供应了事务支持,我们在内部还进行了桶索引机制的优化以进一步提高入湖的性能,并且通过metastore的元数据做事来支持并发写入功能。
此外,Hudi原生支持多引擎,因此既可以对批流进行读写消费,也可以利用Presto进行交互式剖析。

在内部,湖仓一体架构大规模地落地了离线的数仓场景和部分近实时的数仓场景。
但是由于 Hudi 本身的或者数据库本身分钟级别的可见性,它还是没有办法做到实时数仓存储的标准方案。

为理解决时效性的问题,供应低延迟的能力,我们内部自研了基于内存的做事,它构建于数据湖之上,形成了一套整体的高吞吐、高并发、低延迟的实时数据做事方案。
底层方案的整体架构如图所示。
底层是持久化数据层,会复用Hudi当前的能力持久化数据,文件分布跟 Hudi同等,通过 log 的行存文件和 base 的列存文件进行数据存储,会通过 file slice 这种基于韶光戳的办法去掩护数据的版本信息,通过 file group 这样的办法去对文件进行分组,相同组件的数据会存储在同文件组内。
这种文件变分组的办法,再结合索引的能力,能够有效地提升数据入湖的性能和查询的性能。

上层做事层紧张分为两个组件, BTS 和 Table Service Management。
BTS 是基于内存构建的做事层,它紧张是来办理实时场景下数据读写的时效性的问题,通过内存去对数据读写进行加速,TSM (Table Service Management),是表优化的做事,它会异步地去实行一些表优化的操作,从而实现对查询的加速。

这里的表优化操作指的包括社区原生的压缩聚合 clustering,以及一些索引异步构建,视图异步构建的一些操作。
压缩聚合指的是对日志文件和根本文件进一步合并以天生新的列式存储文件,这样对整体查询效率而言更优。
而 clustering 则是合并小文件以减少文件开销。
当前 TSM 只支持这两种能力以及清理能力,我们操持结合社区现有的 MDT 能力来异步构建多级索引,以提升交互式查询的性能。

表优化操作是一个完备异步的过程。
这部分是我们自主开拓的做事,由于一些社区原生实现并没有做到完备异步。
为什么要异步呢?由于 compaction 和 clustering 的实行韶光比较长,同步操作会影响数据湖的写入速率,特殊是在实时场景下不可接管。
而社区的异步操作仅指写入时不壅塞,但是 compaction 会共享写入资源在同一个运用程序内实行。
这可能会影响写入作业的稳定性,因此我们在内部落地过程中创造了这个问题,末了实现了一个完备异步的调度实行,同时不共享写入资源的做事。
在详细的实行层面上,我们还利用混部的资源以降落本钱。

1. 数据组织形式

基于这样的新的流批存储架构,我们新增了中间的做事层,特殊是BTS这样的实时元数据加速层,整体的数据组织形式如下图所示。

数据组织形式在逻辑层分为表分区、文件组和文件大小等观点。
数据写入时,先写入对应分区,再根据主键写入对应文件组。
文件组的底层文件存储分为内存数据和分布式文件系统数据两种类型。
在内存中,数据由块构成,而在分布式文件系统中,数据的组织形式与Hudi相似,采取根本文件和日志文件的模式。
值得把稳的是,我们引入了日志存储层来存储WAL文件,以确保数据在写入过程中的有效性,并办理内存数据丢失的问题。
因此,在写入内存数据之前,我们会先写入WAL文件,确保这部分数据已被持久化存储,实际操作才被认为是成功的。

对付内存文件块与持久化存储文件之间的映射关系,每个块对应一个WAL文件以担保数据的容错性。
每个内存中的块都不会永久存储在内存中,而是定期地刷到持久化存储文件中。
在实际操作中,多个块常日对应一个日志文件。

2. 数据读写办法

再来看一下全体数据读写的交互办法。
首先,做了流批繁芜的分离,由于流场景和批场景对付数据的可见性和时效性的哀求是不完备同等的。
对付批量回溯的场景,用户并不是希望能够立时可见,只是希望把这部分数据做好更新和校准而已。

在批量数据更新的场景中,数据会直接写入持久性存储中,也便是会写入到HDFS上,而不是通过内存。
这种方法可以极大地提高我们的读写吞吐量。
对付流式读写的情形,会首先访问BTS之类的内存做事进行读写。
这里的紧张履行细节是,在写入数据时,我们会优先写入内存中,会首先写WAL文件以保障容错。
在读取数据时,由于内存中的数据不会一贯存在,由于cache会定期清理,以是读取时会优先访问内存中的数据。

如果创造内存中没有数据,我们会先加载WAL文件并对其进行预加载,以尽可能地将数据优先加载到内存中,以担保流式读取的时效性。

如果创造WAL文件也不存在,或者被清理了,那么我们就会转而去读取持久性存储中的日志。

在大多数情形下,这是流式读写,全体周期相对较短。
因此,在内存中,WAL文件的存储能够做到一周之内的时效性。
一周之内的用户都可以正常从内存中消费这部分数据。
如果用户希望存储长周期的数据,那么他可能须要承担更多的存储本钱。
我们须要尽可能避免从HDFS上加载日志文件,这是整体的数据读写办法。

3. BTS架构

刚刚提到的流读的场景,我们也做了读写的负载分离,会有单独的读集群去承接整体的读流量,来避免它影响写入节点的性能。

我们来看一下整体的BTS架构,BTS首先是Master-Slave架构。
Master紧张卖力一些元数据信息的管理,它的构造与HDFS相似。
Table Server以Slave的形式存在,卖力数据的读写,并在其上存储由多少个block组织而成的文件。
对付Master,它管理的元数据包括Table Server的信息以及block的元数据管理。
由于元数据管理的须要,它一定会引入一定的负载均衡机制。
因此,我们目前实现了比较大略的负载均衡机制,旨在避免某一Table Server内存被打爆的情形。

Table Server紧张供应数据读写能力,掩护本地的块并定期进行块清理。
它异步将某些块刷新到HDFS上,全体数据读写流程是客户端要求master,获取须要写入的块,然后找到对应的Table Server进行数据写入。
在写入时,它优先写WAL文件,再写内存文件。
当数据全部写入并ACK返回后,表示这批数据已经成功写入,不会再丢失。
此时涉及到数据提交问题。

这部分统一由主节点master去管理事务,其整体的事务机制与Hudi目前的实现相似,即依托于引擎进行提交。
对付Flink来说,每次checkpoint都会触发主节点进行提交。
因此,当下贱消费这批数据时,如果我们须要达到秒级数据,则不太可能进行秒级信息源数据的提交。
因此,在这部分下贱可能会读取一些基于read on committed的数据,以是须要进行去重操作,以确保At least once的语义。
以上便是整体的BTS构造的先容。
接下来先容落地场景。

03

落地场景

我们的紧张落地场景是流式数据打算,它类似于离线数仓,须要进行一些ETL清理和大略的聚合打算。
左侧是整体架构方案,我们利用了基于Hudi加BTS的数据湖方案来更换MQ,从而实现每一层数据表的存储。
在维表上,我们目前仍利用KV存储。
我们当前的目标是更换MQ场景。

原来离线须要将实时表从 MQ dump 成 Hive,去进行后续的离线数仓的干系事情。
切换成这套方案后,dump 操作就可以省却,能够做到流批的数据复用的能力来减少整体的中间存储的本钱。

第二个场景是多维剖析场景,其特点是实时数据洗濯后直接支持看板等实时的OLAP查询。
基于批流一体方案结合 Presto 查询来知足业务侧分钟级时延诉求,和秒级查询相应诉求。
我们团队针对Presto进行了许多优化,包括native engine等干系技能,以实现高性能查询。
目前,该场景也在现场落地,并取得了不错的收益。
其余,由于整体的流批是数据表,存储是统一的,以是不须要额外将其转储为Hive表,也不须要掩护离线存储的快照。

第三个场景便是批流复用的日志场景,一样平常大家直觉上会从 ODS 层切换做批流复用,字节内部,在实时场景会先对接埋点数据, Flink 端去做洗濯,落到实时的存储里面,然后对接看板等下贱。
在离线数仓上,也会将所有埋点信息存储下来,根据详细的业务场景落身分歧的 ODS 表,再去构建离线数仓的任务。

当我们整体把存储换成 LAS 这套方案之后,只须要掩护 ODS 层的数据,就能支持离线和实时两个的场景去进行剖析。

末了是飞书数仓的场景落地,整体链路比较清晰,分为实时和离线两个链路,这里离线实时链路紧张是去做一些人事信息变更之类的业务。
离线链路紧张是对一些长周期的问题去做一些数据的改动,把这部分改动的数据回补到我们的实时链路里面,让下贱的看板数据变得更准确。

在实时数据传输和离线数据处理两个环节中,我们都采取了LAS之类的存储来更换底层存储。
离线数据处理哀求数据的处理韶光在10-15分钟内完成,因此用户更惯用Spark处理离线数据处理环节。
针对此,我们供应了基于Spark Thrift Server的办理方案,以减少离线数据处理中每个环节的资源申请开销,坚持常驻资源,让用户运行SQL来构建离线数据处理模块。

在实时数据传输环节中,用户原来采取Kafka构建根本表并利用Hbase进行维表构建。
由于Hbase对联合主键的支持并不友好,用户每次在读写时都须要去序列化和反序列化主键列,并将复合主键拼装成单一主键,末了将其写入Hbase中,然后再从Hbase中读取。
这样的流程很繁芜,且韶光耗时较长。

那么除了更换之外,就像之条件到的一样,我们会更换掉MQ或Kafka这样的组件,并且我们还用其他存储替代了Hbase。
我们的紧张目标是实现基于Flink的lookup join功能,并将小表直接加载到内存中,以提高lookup join的性能。
因此,在实时链路这一块,我们已经更换了所有的存储组件。

04

未来方案

末了分享一下未来方案。
首先,我们会探索更多业务场景,大规模落地更多模式。
其次,在技能迭代方面,我们会对负载均衡和分离做出优化。
负载分离在BTS内部(即内存做事内部)会针对一些小组件进行优化,比如将WAL文件刷入持久化存储(如HDFS),这部分资源占用比较高,须要分离处理。
另一方面,我们会更加细致化地对读写负载分离和内存做事负载均衡进行优化。
此外,我们还会实现更风雅的流批负载分离。
第三点是查询优化,我们会结合索引的能力,在内存层和整体存储方案中通过构建索引优化块的数据构造来加速查询性能,包括点查和联合查询等。
末了一点是与native engine的集成,以提升整体的读写速率。
在这方面,我们须要对底层的log、block和parquet文件进行向量化处理。

05

Q&A环节

Q:Hudi支持流式的写入与更新,那Kafka 是否可以被取代了?

A:我不愿定是指社区的Hudi还是我分享的Hudi。
以是我谈一下我分享的整套方案,但我认为我们目前在某些方面还有欠缺。
比较困难的是如何实现Kafka的exactly once语义。

Q:LAS支持的组件索引和二级索引是什么样的索引构造?

A:实际上,这部分紧张是社区原生实现,包括我们内部的实现也大部分已经贡献到社区了。
就组件而言,社区实现是基于哈希去进行分桶,并同时记录一些布隆过滤器(Bloom filter)。
关于二级索引,社区目前正在进行迭代,但并没有完备合入。

Q:利用 BTS 可以加速 Flink 写入 Hudi 的性能吗?

A:会。
时效性提升很多。
由于第一个是我们写内存,第二个是全体数据构造会相对 Hudi 来说会比较轻量一些。

Q:BTS 与 Hudi分别适用的运用处景。
BTS 与Hudi的详细关系。

A:首先对付我们来说,我们整体的这套方案叫LAS, LAS 底层是基于 Hudi 去做的实现,为了支持流批一体存储,我们在Hudi上加了一层内存缓存层BTS,结合查询引擎,一整套方案称之为 LAS。
由于 BTS 是基于 Hudi 上架了一层,以是 BTS 整体的逻辑会跟 Hudi 强干系,它我两者之间的交互紧张便是内存文件到持久化存储中间的一些交互,其他的大部分的设计会沿用Hudi的部分的逻辑,比方说我们会通过TSM, Table Service Management.去做一些比较优化的做事,比方说 Hudi 的compaction, BTS 的 WAL 的clean,就这些操作。

Q:LAS怎么担保查询数据的准确性?

A:我们当前支持的语义是 At least once,便是提及首用户能够在数据里面加业务字段来判断哪条数据是最新的At least once,但是这条数据有可能会被写入多次,以是用户下贱须要做一些去重的操作。
BTS 数据怎么担保一定是写进来的?首先我们会写 WAL 文件,便是 WAL 文件是持久化存储上的文件,当文件写完之后我们才会,我们会一边写 WAL文件一边写内存的数据,只有写到 WAL 文件才会写内存的数据,那整体都完成了之后才会返回给 client ACK,否则, client 会认为这次提交失落败了,会重新去往里写,以是整体上同等性是不会有太大的问题的。

本日的分享就到这里,感激大家