Apache Iceberg在小紅書(shū)的探索與實(shí)踐
以下文章來(lái)源于DataFunTalk ,作者孫超
目前小紅書(shū)對數據湖技術(shù)的探索主要分為三個(gè)方向,第一個(gè)方向是在小紅書(shū)云原生架構下,對于大規模日志實(shí)時(shí)入湖的實(shí)踐,第二個(gè)方向是業(yè)務(wù)數據的CDC實(shí)時(shí)入湖實(shí)踐,第三個(gè)方向是對實(shí)時(shí)數據湖分析的探索。
今天的分享也主要圍繞這三個(gè)方向展開(kāi),并在最后介紹我們對未來(lái)工作的規劃:
- 日志數據入湖
- CDC實(shí)時(shí)入湖
- 實(shí)時(shí)湖分析探索
- 未來(lái)規劃
01 日志數據入湖
1. 小紅書(shū)數據平臺架構
在進(jìn)入主題之前先介紹一下小紅書(shū)數據平臺的基本架構。
總體來(lái)說(shuō),小紅書(shū)數據平臺與其他互聯(lián)網(wǎng)公司大同小異,主要不同在于小紅書(shū)的基礎架構是“長(cháng)”在多朵公有云之上的。在數據采集層,日志和RDBMS的數據源來(lái)自不同的公有云;在數據存儲加工層,絕大多數數據會(huì )存儲于A(yíng)WS S3對象存儲;同時(shí),數倉體系也是圍繞著(zhù)S3來(lái)建設的,實(shí)時(shí)ETL鏈路基于Kafka、Flink,離線(xiàn)分析鏈路基于A(yíng)WS EMR上的Spark、Hive、Presto等;在數據共享層,諸如Clickhouse、StarRocks、TiDB等OLAP引擎,為上層報表提供一些近實(shí)時(shí)的查詢(xún)。以上就是小紅書(shū)數據平臺整體的架構組成。
2. APM日志數據入湖
接下來(lái)我們用APM(Application Performance Monitor)的例子來(lái)介紹Iceberg如何在當前架構體系下運轉。
(1)使用Iceberg之前的APM鏈路
APM主要記錄小紅書(shū)APP前端和客戶(hù)端性能相關(guān)的埋點(diǎn)日志,可以達到百萬(wàn)每秒的RPS。以前的離線(xiàn)鏈路是先將埋點(diǎn)數據發(fā)送到阿里云的Kafka,通過(guò)Flink作業(yè)落到阿里云的OSS對象存儲,然后通過(guò)Distcp搬到AWS S3上,之后通過(guò)Add Partition落地到Hive表里,接下來(lái)下游的EMR集群會(huì )對落地的數據做一些離線(xiàn)的ETL作業(yè)調度和Adhoc的查詢(xún)。整條鏈路中,數倉同學(xué)的痛點(diǎn)是Flink ETL作業(yè)上數據需要按業(yè)務(wù)分區動(dòng)態(tài)寫(xiě)入,但是各點(diǎn)位分區之間的流量非常不均勻。這就涉及到動(dòng)態(tài)寫(xiě)分區時(shí)候是否要加Keyby,如果加Keyby就會(huì )發(fā)生數據傾斜,不加Keyby每個(gè)寫(xiě)算子的Subtask都會(huì )為每個(gè)分區創(chuàng )建一個(gè)Writer,而分區Writer又至少創(chuàng )建一個(gè)文件,同時(shí) Flink Checkpoint 又會(huì )放大這個(gè)寫(xiě)放大,最終導致小文件數爆炸。
小文件數多后會(huì )導致以下幾個(gè)后果:
- Distcp會(huì )變得非常慢,導致數據延遲在小時(shí)級以上。
- 流量小的很多文件集中在一個(gè)Task,導致查詢(xún)性能差。
(2)基于Iceberg的改良鏈路
Iceberg支持事務(wù),我們可以利用這個(gè)特性來(lái)異步合并小文件,這樣既不影響主流的寫(xiě)入又可以保障一致性,基于此想法我們可以得到以上的架構圖。
該架構簡(jiǎn)化了落OSS 的步驟,Kafka數據可以直接通過(guò)Flink落到S3的Iceberg,之后異步執行合并小文件作業(yè),此后下游就可以直接基于Iceberg做ETL調度。這個(gè)鏈路的問(wèn)題在于:
- 異步的小文件合并為周期調度,但是Iceberg在commit之后,下游ETL讀文件作業(yè)會(huì )立即執行,在這之后再掛異步合并作業(yè)的意義就不大了。
- 如果同步合并小文件,即在Flink入湖作業(yè)中掛一個(gè)合并算子,這樣會(huì )引入跨云IO,并增加Flink作業(yè)的OOM風(fēng)險。
所以我們還是決定通過(guò)加入Shuffle,從源頭解決數據傾斜的問(wèn)題。我們自主設計了一個(gè)EvenPartitionShuffle的算法做數據Shuffle。Iceberg支持將分區級別的統計信息寫(xiě)入到元數據中,這樣就可以拿到不同分區的流量分布,再根據下游的并行度,就可以將問(wèn)題轉化為一個(gè)類(lèi)背包問(wèn)題,類(lèi)似于Spark的AQE。
對于評估這個(gè)算法可以抽象出以下兩個(gè)指標:
- Fanout:下游Subtask的分區個(gè)數。
- Residual:下游Subtask的分配流量和與目標流量差距。
這兩個(gè)指標反映出小文件的個(gè)數以及數據傾斜的均勻程度,我們也在這兩個(gè)指標的評估下來(lái)不斷調整背包算法。從最終的效果來(lái)看,線(xiàn)上作業(yè)IcebergStreamWriter各Subtask數據負載還是比較均勻的,也極大減少了小文件數。
以上方案的優(yōu)缺點(diǎn)如下:
優(yōu)點(diǎn):
- 小文件的問(wèn)題得到了解決。
- Writer算子內存占用減少。
缺點(diǎn):
- 引入了Shuffle。
- 流量動(dòng)態(tài)變化。暫時(shí)還不能根據流量變化動(dòng)態(tài)調整分區分布,因為當前是在Flink 作業(yè)啟動(dòng)的時(shí)候讀取Iceberg的元數據。
(3)將基于Iceberg的鏈路應用于小紅書(shū)多云架構
當解決以上問(wèn)題之后,讓我們來(lái)看看如何將以上鏈路應用在小紅書(shū)的多云架構上。有兩個(gè)問(wèn)題需要解決:跨云流式讀寫(xiě)的問(wèn)題,以及Iceberg與下游系統的集成。
①跨云流式讀寫(xiě)
關(guān)于Iceberg多云架構下讀寫(xiě)的問(wèn)題,我們先來(lái)看以上架構圖的組件與數據流。在上面的架構圖中高亮標出了Iceberg兩個(gè)比較重要的抽象:Catalog與FileIO。
Catalog保存了Iceberg最新的元數據的指針,并且需要保證指針變更的原子性。Iceberg提供了HiveCatalog和HadoopCatalog兩種實(shí)現。HadoopCatalog依賴(lài)于文件系統rename接口的原子性,而rename在對象存儲上并不是原子操作(對于最新版本的HadoopCatalog,加一個(gè)顯式的鎖可以保證原子性,但是當時(shí)還沒(méi)有這方面的實(shí)現)。所以我們選用了HiveCatalog,對于HiveMetastore,離線(xiàn)數倉包括Iceberg都是讀寫(xiě)一個(gè)RDS庫,所以通過(guò)EMR集群的HMS也能直接訪(fǎng)問(wèn)到Flink寫(xiě)進(jìn)來(lái)的Iceberg表。
FileIO是Iceberg讀寫(xiě)存儲系統的接口。HiveCatalog默認是HadoopFileIO,我們可以在中間封裝一層S3AFileSystem來(lái)讀寫(xiě)S3。當我們走完這條鏈路時(shí)發(fā)現Flink讀寫(xiě)都是正常的,但是離線(xiàn)所依賴(lài)的EMRFS不支持S3A的Schema。于是我們調研了Iceberg原生的S3FileIO,發(fā)現它的實(shí)現非常簡(jiǎn)單直接,且可控性非常高,于是在經(jīng)過(guò)了一些大規模的壓測,并解決了一些問(wèn)題后就選擇了S3FileIO。
首先Flink TaskWriter在接收數據向下游寫(xiě)到S3OutputStream。用戶(hù)可設置一個(gè)MPU閾值,當大于閾值時(shí),會(huì )有一個(gè)線(xiàn)程池異步地使用MPU上傳文件到S3,否則就會(huì )走另一條路徑,將StagingFiles串在一起,通過(guò)PutObject請求寫(xiě)到S3。
對于以上鏈路,我們也對S3FileIO做了一些優(yōu)化以支持大流量的作業(yè)。
(1)S3Client上的優(yōu)化:
- HttpsClients,我們將S3原生的HttpsClients(Java8自帶的HTTP URL Connection)更換為了Apache HttpClient,其在Socket鏈接以及易用性上有一些提升。在寫(xiě)的過(guò)程中我們也遇到了一些問(wèn)題,多云機器帶來(lái)的問(wèn)題是每個(gè)廠(chǎng)商機器的內核是不太一樣的,例如在某云上發(fā)現有寫(xiě)S3超時(shí)的問(wèn)題,我們與廠(chǎng)商一起抓包發(fā)現是內核參數的問(wèn)題。
- API Call Timeout,將S3的Timeout配置項暴露給Iceberg。
- Credential Provider,S3 SDK從FlinkConf中讀取密鑰。
(2)MPU Threshold
Flink做Checkpoint的時(shí)候,所有的Writer都會(huì )將數據刷到S3,這時(shí)候的毛刺會(huì )非常大。我們的方案是降低MPU的閾值以及ParquetWriter的RowGroup。降低Parquet的RowGroup就意味著(zhù)它刷到S3OutputStream可以更早一點(diǎn),降低MPU閾值就可以更早地上傳StagingFile。通過(guò)以上優(yōu)化我們把CheckPoint在上傳到S3的延遲中從2分鐘降到了幾十秒。
(3)ResetException
當S3OutputStream通過(guò)BufferedInputStream把兩個(gè)StagingFile合并到一起并上傳時(shí),當遇到諸如網(wǎng)絡(luò )問(wèn)題時(shí)會(huì )重試,它重試的機制是通過(guò)InputStreaming的mark和reset來(lái)做的,但是默認的mark limit是128KB,BufferedInputStream超過(guò)128KB之后就會(huì )丟數據,重試時(shí)就會(huì )出現ResetException。我們將mark limit改成 StagingFiles Size +1,保證所有的數據都會(huì )緩存避免以上問(wèn)題。
②下游系統集成
接下來(lái)要解決的是跟下游生態(tài)系統集成的問(wèn)題。
- 第一個(gè)問(wèn)題是Batch Read
Iceberg與Hive最明顯的區別就是分區的可見(jiàn)性語(yǔ)義,Hive在整個(gè)分區寫(xiě)完后可見(jiàn),而Iceberg在commit后就立即可見(jiàn)。但是下游離線(xiàn)調度的小時(shí)級任務(wù)比較依賴(lài)于HivePartition的可見(jiàn)性。
在此我們做了一個(gè)Sensor,其原理是Flink在寫(xiě)的時(shí)候將Watermark寫(xiě)進(jìn)Iceberg表的Table Property。下游的離線(xiàn)調度就可以使用我們基于A(yíng)irflow的Watermark Sensor去定期的輪詢(xún)HMS,查詢(xún)Watermark是否已經(jīng)達到分區時(shí)間,條件滿(mǎn)足之后就會(huì )觸發(fā)Spark的調度。
- 第二個(gè)問(wèn)題是Adhoc查詢(xún)
Adhoc查詢(xún)使用了Kyuubi這樣一個(gè)多租戶(hù)的SQL Gateway通過(guò)Spark去讀Iceberg表。用戶(hù)可以直接通過(guò)三段式的表名去查詢(xún)Iceberg 表,例如:
hive_prod.Iceberg_test.table
總結:
我們目前在生產(chǎn)環(huán)境已經(jīng)落地了幾個(gè)比較大的作業(yè),單作業(yè)的吞吐達到了GB/S以及百萬(wàn)級別的RPS,數據的就緒時(shí)間大概在五分鐘左右,由Flink Checkpoint來(lái)控制。下游的讀耗時(shí)得益于小文件問(wèn)題的解決以及Iceberg基于文件的Planning,使下游讀耗時(shí)減少了30%~50%。
02 CDC實(shí)時(shí)入湖
1. Mysql全量入倉
小紅書(shū)數倉數據的另一重要來(lái)源是MySQL,目前的Mysql2Hive鏈路是全量入倉這種比較傳統的模式,主要通過(guò)Airflow定時(shí)調度,使用Sqoop去小時(shí)級別或天級別從MySQL拉數據寫(xiě)到Hive表相應的分區里面。
其中比較特殊的一點(diǎn)是為了解決Schema Evolution,每次拉取數據的時(shí)候都會(huì )生成一個(gè)Avro Shema,對應的Hive表選用了行存儲的Avro表,而不是通常會(huì )使用的基于列存的Parquet文件的表。它的缺點(diǎn)是不如列存高效,但是它解決了一個(gè)問(wèn)題——下游的用戶(hù)不需要考慮schema變化的情況。這條鏈路的好處是簡(jiǎn)單實(shí)用直接,缺點(diǎn)是MySQL壓力大,下游查詢(xún)不夠高效。
2. CDC增量入倉
關(guān)于CDC如何增量入離線(xiàn)數倉的問(wèn)題,大廠(chǎng)都有一些比較成熟穩定的方案。
如上圖, ODS一般有兩張表,一張增量表一張全量表,開(kāi)始會(huì )有一個(gè)全量表的導入,之后會(huì )通過(guò)實(shí)時(shí)流進(jìn)增量表,然后通過(guò)Merge任務(wù)進(jìn)行周期性的合并操作。這個(gè)鏈路已經(jīng)在很多廠(chǎng)都有了成熟穩定的實(shí)踐,缺點(diǎn)是鏈路比較長(cháng)。
3. CDC實(shí)時(shí)入湖
我們最終的鏈路如上圖,將MySQL的上游數據庫通過(guò)全增量數據發(fā)送到Kafka,然后使用Flink將數據Upsert到Iceberg里面,同時(shí)會(huì )處理一些Schema Evolution的情況,這條鏈路就非常簡(jiǎn)潔。
整條鏈路中我們需要特別注意,同?主鍵(業(yè)務(wù)主鍵+ Shard Key)的Binlog應該保序。以下是在整條鏈路中保持Exactly-Once語(yǔ)義所做的事情:
①Binlog
- 全增量,先發(fā)全量再發(fā)增量。
- At-Least-Once,保證重復發(fā)送時(shí)保證有序(最終?致性)。
- MQ Producer根據主鍵Hash(且分桶數固定,不受擴容影響)。
②Flink
- Shuffle Key 只能是主鍵的?集 + Immutable Columns。
③ Iceberg sink
- Upsert Mode。
(1)Merge on Read
這個(gè)方案我們在實(shí)踐中也發(fā)現一些問(wèn)題,最核心的就是DeleteFile多導致的MOR查詢(xún)性能差。
Iceberg查詢(xún)時(shí),每個(gè)DataFile都需要讀取相應的DeleteFile進(jìn)內存進(jìn)行過(guò)濾,會(huì )使得Task的IO負載很重,這樣我們的優(yōu)化思路就轉換為如何減少DeleteFile。而出現DeleteFile過(guò)多的原因是,Update的實(shí)現要先把當前行刪掉再I(mǎi)nsert,刪掉這行就至少會(huì )生成一個(gè)DeleteFile。我們對此所作的優(yōu)化是去除重復的Insert事件,這樣只需要對Update做Delete。當下游Insert很多,Update很少的時(shí)候就會(huì )有比較大的收益。
(2)Hidden Partition
Iceberg的分區與Hive不同的是它的分區信息可以被隱藏起來(lái),不需要用戶(hù)去感知,在建表或者修改分區策略之后,新插入的數據自動(dòng)計算所屬分區。
利用隱藏分區我們可以做到以下優(yōu)化:
- 在讀數據時(shí)可以只查詢(xún)關(guān)聯(lián)分區,忽略其他分區。
- 錯峰做File Compaction,減少沖突。例如在寫(xiě)當前小時(shí)分區時(shí)我們可以對之前的分區做File Compaction。
對于FlinkSQL原生不支持隱藏分區的問(wèn)題,我們通過(guò)Table Property去定義隱藏分區,在建表的時(shí)候去建相應的分區。
(3)Auto Schema Evolution
在實(shí)時(shí)流處理Binlog,一個(gè)繞不開(kāi)的問(wèn)題是上游的Schema變更了下游怎么及時(shí)的檢測到,再去做相應的Writer的變更,下游表的變更。有一種解決方案是當消費到上游變更的Event事件時(shí),我們會(huì )在平臺把作業(yè)重新改掉重啟,也就是先變更下游的Iceberg的Table Schema,再變更Flink SQL,之后重新啟動(dòng)作業(yè)。但在平臺化之前,對于一些常用的場(chǎng)景,比如加列,已經(jīng)能覆蓋線(xiàn)上很多Schema Evolution的場(chǎng)景。為了讓Flink作業(yè)能自動(dòng)監測到加列并且有序的正確的提交到Iceberg,我們將Binlog中的Schema隨著(zhù)每條數據記錄一起發(fā)送,當數據往下發(fā)到Iceberg的Dynamic Streaming Writer時(shí),就可以和Writer里面保存的上一個(gè)Schema去做比較,假設只是加列,那么我們就會(huì )做兩件事情:
- 關(guān)掉當前的Writer,以新的Schema去建立新的Writer寫(xiě)數據。
- 以Schema變更的時(shí)間點(diǎn)為分割,對Schema變更前的數據先提交,再對Schema 進(jìn)行Update,之后再提交 Schema變更后的文件。
(4)CDC實(shí)時(shí)入湖其他工作
除此之外,CDC與實(shí)時(shí)鏈路我們還做了其它一些工作:
- Binlog Format。支持解析Canal PB格式。
- Progressive Compaction。Compaction是我們接下來(lái)工作的重點(diǎn),尤其在MySQL的量比較小的時(shí)候,如果想維持五分鐘級別的CheckPoint,小文件問(wèn)題就會(huì )非常突出。如何避開(kāi)流式任務(wù)正在寫(xiě)的Partition去做Compaction 也是目前在做的事情。
以上就是我們目前正在做的CDC入湖的一些工作。
03 實(shí)時(shí)湖分析探索
我們想用Iceberg 來(lái)做一些更面向未來(lái)的事情。
1. 實(shí)時(shí)分析鏈路
首先介紹一下目前分析的實(shí)時(shí)鏈路。
Kafka通過(guò)Flink做一些Join和聚合操作之后,最后會(huì )生成一張大寬表存儲到ClickHouse中以提供秒級或者毫秒級的返回功能,Kafka在其中也用做了事實(shí)表的存儲。以上架構圖來(lái)自FLIP-188,FLIP-188要做的事情就是如何實(shí)現流批一體的存儲。我們數倉同學(xué)的需求是要對中間結果進(jìn)行一些查詢(xún)操作或者利用其進(jìn)一步生成下游的表,這些操作只利用Kafka是做不了的。常見(jiàn)的做法是利用Kafka再接一個(gè)任務(wù),將中間結果寫(xiě)到Iceberg或者Hudi表里面。
2. 流批一體存儲
我們實(shí)現流批一體存儲是通過(guò)直接在Kafka里雙寫(xiě)一份數據到Iceberg的列存儲上。這除了讓Kafka做擴容更簡(jiǎn)單,更重要的是支持一些離線(xiàn)數倉的用法,我們不必再啟動(dòng)一個(gè)Flink的作業(yè)去寫(xiě)到S3。要實(shí)現這樣的功能首先需要一個(gè)Schema的概念,也就是如何把Kafka的Schema映射到下游表的Schema,對此我們讓用戶(hù)在我們的平臺上來(lái)自定義,同時(shí)有一個(gè)Remote Fetcher模塊來(lái)拿到這個(gè)Schema,之后通過(guò)Iceberg寫(xiě)到下游。真正的寫(xiě)線(xiàn)程是在Broker里面,可以根據Leader去動(dòng)態(tài)遷移。之后集群中的Controller節點(diǎn)上啟動(dòng)一個(gè)單獨的Commiter進(jìn)程,接受Fetcher傳來(lái)的數據文件列表,定期commit。
3. Iceberg外表
ClickHouse社區版是存算耦合的,離線(xiàn)數倉想用這部分的數據就比較困難。我們公司內部的ClickHouse已經(jīng)實(shí)現了存算分離的架構,數據是存儲于對象存儲的。在此基礎上,我們和ClickHouse團隊合作做了Iceberg的外表。Iceberg外表沒(méi)有使用Paruqet這種開(kāi)放式的文件格式,而是使用了MergeTree的格式。上圖是一張Iceberg傳統的數據文件組織形式圖,它的Metadata層分成了Manifest List和Manifest File,之后會(huì )指向一些DataFile。這些DataFile與ClickHouse里面的part概念很像,所以我們就將Manifest File指向了一個(gè)part.ck文件,part.ck其實(shí)也是一層衍生的元數據文件,它的下游會(huì )再去讀一些bin/mark的文件,這樣就可以完成對ClickHouse數據的讀取。
04 未來(lái)規劃
未來(lái)規劃主要有存、算、管三個(gè)方向。
- 首先在存儲方面,我們需要對CloudNative FileIO持續優(yōu)化,比如進(jìn)一步減少Checkpoint的毛刺、進(jìn)一步提高吞吐、提高跨云讀寫(xiě)的穩定性。
- 關(guān)于計算,我們會(huì )跟更多引擎去集成,目前已經(jīng)集成了Spark引擎,同時(shí)正在集成ClickHouse。另外StarRocks社區已經(jīng)集成了Iceberg外表的Connector,我們以后也會(huì )在上面做一些應用。在查詢(xún)方面,計劃通過(guò)改變數據的組織形式,或者添加一些二級索引來(lái)做Data Skipping去加速查詢(xún)。
- 管理方面,讓Iceberg持續穩定的運行下去還是需要外掛表維護作業(yè)的,這對下游數倉同學(xué)來(lái)說(shuō)還是引入了運維壓力。我們接下來(lái)會(huì )將其服務(wù)化,思考如何智能地拉起一些作業(yè),以及運用什么策略可以減少沖突的概率。
這就是我們正在做的和將來(lái)準備做的一些事情。
編輯:王菁
*博客內容為網(wǎng)友個(gè)人發(fā)布,僅代表博主個(gè)人觀(guān)點(diǎn),如有侵權請聯(lián)系工作人員刪除。
linux操作系統文章專(zhuān)題:linux操作系統詳解(linux不再難懂)