『壹』 4.一文搞定:Flink與Kafka之間的精準一次性
在上一篇文章當中,也算是比較詳細且通俗的聊了聊Flink是如何通過checkpoint機制來完成數據精準一次性的實現的。並且也在上一章的結尾表示,要在接下來聊一聊Flink與它的鐵哥們Kafaka之間,是如何實現數據的精準一次性消費的。
本次的聊法,還是要通過以kafka(source)->Flink,Flink(source)->Kafka來分別展開討論。
kafka是一個具有數據保存、數據回放能力的消息隊列,說白了就是kafka中的每一個數據,都有一個專門的標記作為標識。而在Flink消費kafka傳入的數據的時候,source任務就能夠將這個偏移量以運算元狀態的角色進行保存,寫入到設定好的檢查點中。這樣一旦發生故障,Flink中的FlinkKafkaProce連接器就i能夠按照自己保存的偏移量,自己去Kafka中重新拉取數據,也正是通過這種方式,就能夠確保Kafka到Flink之間的精準一次性。
在上一篇文章當中,已經表明了,如果想要讓輸出端能夠進行精準一次性消費,就需要使用到冪等性或者是事務。而事務中的兩階段提交是所有方案裡面最好的實現。
其實Flink到Kafak之間也是採用了這種方式,具體的可以看一下ctrl進到FlinkKafkaProce連接器內部去看一看:
這也就表明了,當數據通過Flink發送給sink端Kafka的時候,是經歷了兩個階段的處理的。第一階段就是Flink向Kafka中插入數據,進入預提交階段。當JobManager發送的Ckeckpoint保存成功信號過來之後,才會提交事務進行正式的數據發送,也就是讓原來不可用的數據可以被使用了。
這個實現過程到目前階段就很清晰了,它的主體流程無非就是在開啟檢查點之後,由JobManager向各個階段的處理邏輯發送有關於檢查點的barrier。所有的計算任務接收到之後,就會根據自己當前的狀態做一個檢查點保存。而當這個barrier來到sink任務的時候,sink就會開啟一個事務,然後通過這個事務向外預寫數據。直到Jobmanager來告訴它這一次的檢查點已經保存完成了,sink就會進行第二次提交,數據也就算是成功寫出了。
1.必須要保證檢查點被打開了,如果檢查點沒有打開,那麼之前說的一切話都是空談。因為Flink默認檢查點是關著的。
2.在FlinkKafakProcer連接器的構造函數中要傳入參數,這個參數就是用來保證狀態一致性的。就是在構造函數的最後一個參數輸入如下:
3.配置Kafka讀取數據的隔離級別
在kafka中有個配置,這個配置用來管理Kafka讀取數據的級別。而這個配置默認是能夠讀取預提交階段的數據的,所以如果你沒改這個配置,那兩階段提交的第一階段就是白費了。所以需要改一下這個配置,來更換一下隔離級別:
4.事務超時時間
這個配置也很有意思,大家試想一下。如果要進行兩階段提交,就要保證sink端支持事務,Kafka是支持事務的,但是像這個組件對於很多機制都有一個超時時間的概念,也就是說如果時間到了這個界限還沒完成工作,那就會默認這個工作失敗。Kafka中由這個概念,Flink中同樣由這個概念。但是flink默認超時時間是1小時,而Kafka默認是15分鍾,這就有可能出現檢查點保存東西的時間大於15分鍾,假如說是16分鍾保存完成然後給sink發送檢查點保存陳功可以提交事務的信號,但是這個時候Kafka已經認為事務失敗,把之前的數據都扔了。那數據不就是丟失了么。所以說Kafka的超時時間要大於Flink的超時時間才好。
截止到目前為止,基本上把有關於狀態維護的一些東西都說完了,有狀態後端、有檢查點。還通過檢查點完成可端到端的數據精準一次性消費。但是想到這我又感覺,如果有學習進度比我差一些的,萬一沒辦法很好的理解怎麼辦。所以在下一篇文章當中我就聊聊Flink中的「狀態」到底是個什麼東西,都有什麼類型,都怎麼去用。
『貳』 基於Flink的實時計算平台的構建
一、系統架構
1. 接入層
Canal、Flume、Kafka
針對業務系統數據,Canal監控Binlog日誌,發送至kafka;
針對日誌數據,由Flume來進行統一收集,並發送至kafka。
消息隊列的數據既是離線數倉的原始數據,也是實時計算的原始數據,這樣可以保證實時和離線的原始數據是統一的。
2. 計算層
Flink
有了源數據,在 計算層 經過Flink實時計算引擎做一些加工處理,然後落地到存儲層中不同存儲介質當中。
3. 存儲層
HBase、Kafka、ES、Mysql、Hive、Redis
不同的 存儲介質 是通過不同的應用場景來選擇。
4. 數據應用層
風控、模型、圖譜、大屏展示
通過存儲層應用於不同的 數據應用 ,數據應用可能是我們的正式產品或者直接的業務系統
二、技術實現
1. 計算引擎
實時計算引擎的功能要求
提供高級 API,支持常見的數據操作比如關聯聚合,最好是能支持 SQL
具有狀態管理和自動支持久化方案,減少對存儲的依賴
可靠的容錯機制,低延時,最好能夠保證Exactly-once
Flink的優勢
Flink的API、容錯機制與狀態管理都滿足實時數倉計算引擎的需求
Flink高吞吐、低延時的特性
端到端的Exactly-once
WaterMark&Event Time的支持
Flink 不僅支持了大量常用的 SQL 語句,還有豐富的數據類型、內置函數以及靈活的自定義函數,基本覆蓋了我們的開發場景
2. 存儲引擎
根據不同的業務場景,使用最適合的存儲引擎:
Kafka主要用於中間數據表的存儲
ES主要針對日誌數據的存儲和分析
HBase、Redis可用於維表存儲
Hive用於數據校驗
Mysql可以用於指標計算結果的存儲
三、數據分層
數據源:目前數據源主要是Binlog,通過Canal監控各個業務系統的Mysql,將binlog發送至kafka。
ODS層:主要將Binlog數據存儲至Kafka,這一層不對數據進行任何操作,存儲最原始的數據,Binlog 日誌在這一層為庫級別,即:一個庫的變更數據存放在同一個 Kafka Topic 中。
DWD層:主要對數據進行簡單的清洗。拆分主題,將庫級別的主題拆分為表級別;打平數據,將data數組格式打平。
DWS層:主要根據不同的業務的需求,將該需求所涉及到的表進行join所得。
APP層:根據指標計算需求,對數據進行處理後,存儲HBase,為了方便模型查詢,主要將表存儲為索引表和明細表,直接對數據進行指標計算後,將計算結果存儲到HBase。
四、數據監控及校驗
1. 數據監控
目前數據的監控的架構是pushgateway + Prometheus + Grafana
數據監控主要是接入Flink的Metric,通過Grafana對Flink系統指標及自定義指標進行圖形化界面的展示,對關鍵指標進行監控報警
2. 數據校驗
目前數據的監控的架構是Grafana + Mysql
Grafana用於監控指標的展示及相關閾值數據的報警,Mysql主要用於監控數據的存儲
將每個服務的source收到的數據、sink發出的數據,根據表的不同將數據關鍵欄位寫入mysql中,通過統計各個階段各個表中的數據條數,對數據完整性進行監控校驗,若出現數據缺時,先查找原因,然後指定時間戳重啟服務
五、系統管理
元數據管理
表,欄位元數據管理,實時感知元數據的變化,大幅度降低使用數據的成本。
系統配置
對應用啟動參數及相關配置參數的管理,對任務進行靈活配置及管理。
血緣管理
主要是梳理實時計算平台中數據依賴關系,以及實時任務的依賴關系,從底層ODS到DWD再到DWS,以及APP層用到哪些數據,將整個鏈度串聯起來。
六、問題及解決方案
1. 數據傾斜
由於要拆分主題,要以table為key對數據進行keyBy,但是由於每個表的數據量相差較大,會出現數據傾斜
解決方案:
加鹽,給key加前綴
前綴不能隨便加,為了保證同一id的數據在相同的分區中,所以根據id_table進行keyBy
2. 數據重復
任務在進行自動或手動重啟時,為了保證數據不丟失,數據會出現重復計算的問題,如果下游只是對數據進行HBase存儲的話,由於冪等性,這種重復可以解。但是,如果下游要對數據進行聚合,這樣會導致數據被計算多次,影響計算結果的准確性
解決方案:
上游在對數據進行發送時,對kafka procer 進行 exactly once的設置
在對數據統計時進行數據去重
3. 數據延時
由於所處理的數據表的大小不一樣,處理大表時,會出現數據延時的問題。
解決方案:
針對大表數據增加並行度
4.數據亂序
由於Flink kafka procer默認是根據hash對數據進行隨機分區,kafka consumer在對數據進行消費時,每個分區消費速度不同,這樣最終在存儲數據時,就會出現亂序即相同的id會出現老數據覆蓋新數據的問題
解決方案:
對kafka每個階段進行自定義分區,將id相同的數據分到同一個分區,保證同一id的數據的有序性
由於整個數據處理過程中可能會出現shuffle,導數數據重新亂序,所以在對數據存儲前對數據進行排序
對數據進行排序的關鍵點時要保證每條數據的唯一性,即要有標記數據先後順序的欄位
5 . 數據唯一標記(很重要)
由於要對數據進行去重或者排序,所以要保證數據的唯一性
解決辦法:
使用時間戳不可以,因為數據量很大的情況下,同一時間會處理上百條數據
在最初發出數據的時候,為數據打上標記,使用 partition + offset + idx 的組合來確認數據的唯一性及順序性
6. 數據可靠性
我們對服務重啟或對服務升級時,可能會出現數據的丟失
解決方案:
結合Flink 的checkpoint及savepoint機制保證數據的可靠性
開啟Flink的checkpoint機制,服務進行自動重啟時,會自動讀取上次保存在checkpoint中offset,或者我們指定offset進行數據消費
對服務進行升級時,先將服務的狀態保存至savepoint中,重啟時指定savepoint進行服務啟動,保證數據不丟失
7. 無感升級
由於我們目前數據量比較龐大,且在對服務進行升級時,耗時較長,會影響調用方的使用。
解決辦法:
在對服務進行升級時,將數據寫入備用庫,等數據追上且服務穩定運行後,再將存儲庫進行切換