⑴ 一起討論下,消息冪等(去重)通用解決方案
消息中間件是分布式系統常用的組件,無論是非同步化、解耦、削峰等都有廣泛的應用價值。我們通常會認為,消息中間件是一個可靠的組件——這里所謂的可靠是指,只要我把消息成功投遞到了消息中間件,消息就不會丟失,即消息肯定會至少保證消息能被消費者成功消費一次,這是消息中間件最基本的特性之一,也就是我們常說的「AT LEAST ONCE」,即消息至少會被「成功消費一遍」。
舉個例子,一個消息M發送到了消息中間件,消息投遞到了消費程序A,A接受到了消息,然後進行消費,但在消費到一半的時候程序重啟了,這時候這個消息並沒有標記為消費成功,這個消息還會繼續投遞給這個消費者,直到其消費成功了,消息中間件才會停止投遞。
然而這種可靠的特性導致,消息可能被多次地投遞。舉個例子,還是剛剛這個例子,程序A接受到這個消息M並完成消費邏輯之後,正想通知消息中間件「我已經消費成功了」的時候,程序就重啟了,那麼對於消息中間件來說,這個消息並沒有成功消費過,所以他還會繼續投遞。這時候對於應用程序A來說,看起來就是這個消息明明消費成功了,但是消息中間件還在重復投遞。
這在RockectMQ的場景來看,就是同一個messageId的消息重復投遞下來了。
基於消息的投遞可靠(消息不丟)是優先順序更高的,所以消息不重的任務就會轉移到應用程序自我實現,這也是為什麼RocketMQ的文檔里強調的,消費邏輯需要自我實現冪等。背後的邏輯其實就是:不丟和不重是矛盾的(在分布式場景下),但消息重復是有解決方案的,而消息丟失是很麻煩的。
例如:假設我們業務的消息消費邏輯是:插入某張訂單表的數據,然後更新庫存:
要實現消息的冪等,我們可能會採取這樣的方案:
這對於很多情況下,的確能起到不錯的效果,但是在並發場景下,還是會有問題。
假設這個消費的所有代碼加起來需要1秒,有重復的消息在這1秒內(假設100毫秒)內到達(例如生產者快速重發,Broker重啟等),那麼很可能,上面去重代碼裡面會發現,數據依然是空的(因為上一條消息還沒消費完,還沒成功更新訂單狀態),
那麼就會穿透掉檢查的擋板,最後導致重復的消息消費邏輯進入到非冪等安全的業務代碼中,從而引發重復消費的問題(如主鍵沖突拋出異常、庫存被重復扣減而沒釋放等)
要解決上面並發場景下的消息冪等問題,一個可取的方案是開啟事務把select 改成 select for update語句,把記錄進行鎖定。
但這樣消費的邏輯會因為引入了事務包裹而導致整個消息消費可能變長,並發度下降。
當然還有其他更高級的解決方案,例如更新訂單狀態採取樂觀鎖,更新失敗則消息重新消費之類的。但這需要針對具體業務場景做更復雜和細致的代碼開發、庫表設計,不在本文討論的范圍。
但無論是select for update, 還是樂觀鎖這種解決方案,實際上都是基於業務表本身做去重,這無疑增加了業務開發的復雜度, 一個業務系統裡面很大部分的請求處理都是依賴MQ的,如果每個消費邏輯本身都需要基於業務本身而做去重/冪等的開發的話,這是繁瑣的工作量。本文希望 探索 出一個通用的消息冪等處理的方法,從而抽象出一定的工具類用以適用各個業務場景。
在消息中間件里,有一個投遞語義的概念,而這個語義里有一個叫」Exactly Once」,即消息肯定會被成功消費,並且只會被消費一次。以下是阿里雲里對Exactly Once的解釋:
在我們業務消息冪等處理的領域內,可以認為業務消息的代碼肯定會被執行,並且只被執行一次,那麼我們可以認為是Exactly Once。
但這在分布式的場景下想找一個通用的方案幾乎是不可能的。不過如果是針對基於資料庫事務的消費邏輯,實際上是可行的。
假設我們業務的消息消費邏輯是:更新MySQL資料庫的某張訂單表的狀態:
要實現Exaclty Once即這個消息只被消費一次(並且肯定要保證能消費一次),我們可以這樣做:在這個資料庫中增加一個消息消費記錄表,把消息插入到這個表,並且把原來的訂單更新和這個插入的動作放到同一個事務中一起提交,就能保證消息只會被消費一遍了。
1、開啟事務
2、插入消息表(處理好主鍵沖突的問題)
3、更新訂單表(原消費邏輯)
4、提交事務
說明:
1、這時候如果消息消費成功並且事務提交了,那麼消息表就插入成功了,這時候就算RocketMQ還沒有收到消費位點的更新再次投遞,也會插入消息失敗而視為已經消費過,後續就直接更新消費位點了。這保證我們消費代碼只會執行一次。2、如果事務提交之前服務掛了(例如重啟),對於本地事務並沒有執行所以訂單沒有更新,消息表也沒插入成功;而對於RocketMQ服務端來說,消費位點也沒更新,所以消息還會繼續投遞下來,投遞下來發現這個消息插入消息表也是成功的,所以可以繼續消費。這保證了消息不丟失。
事實上,阿里雲ONS的EXACTLY-ONCE語義的實現上,就是類似這個方案基於資料庫的事務特性實現的。更多詳情可參考:https://help.aliyun.com/document_detail/102777.html
基於這種方式,的確這是有能力拓展到不同的應用場景,因為他的實現方案與具體業務本身無關——而是依賴一個消息表。
但是這里有它的局限性
1、消息的消費邏輯必須是依賴於關系型資料庫事務。如果消費的消費過程中還涉及其他數據的修改,例如Redis這種不支持事務特性的數據源,則這些數據是不可回滾的。
2、資料庫的數據必須是在一個庫,跨庫無法解決
註:業務上,消息表的設計不應該以消息ID作為標識,而應該以業務的業務主鍵作為標識更為合理,以應對生產者的重發。阿里雲上的消息去重只是RocketMQ的messageId,在生產者因為某些原因手動重發(例如上游針對一個交易重復請求了)的場景下起不到去重/冪等的效果(因消息id不同)。
如上所述,這種方式Exactly Once語義的實現,實際上有很多局限性,這種局限性使得這個方案基本不具備廣泛應用的價值。並且由於基於事務,可能導致鎖表時間過長等性能問題。
例如我們以一個比較常見的一個訂單申請的消息來舉例,可能有以下幾步(以下統稱為步驟X):
1、 檢查庫存(RPC)
2、 鎖庫存(RPC)
3、 開啟事務,插入訂單表(MySQL)
4、 調用某些其他下游服務(RPC)
5、 更新訂單狀態
6、 commit 事務(MySQL)
這種情況下,我們如果採取消息表+本地事務的實現方式,消息消費過程中很多子過程是不支持回滾的,也就是說就算我們加了事務,實際上這背後的操作並不是原子性的。怎麼說呢,就是說有可能第一條小在經歷了第二步鎖庫存的時候,服務重啟了,這時候實際上庫存是已經在另外的服務里被鎖定了,這並不能被回滾。當然消息還會再次投遞下來,要保證消息能至少消費一遍,換句話說,鎖庫存的這個RPC介面本身依舊要支持「冪等」。
再者,如果在這個比較耗時的長鏈條場景下加入事務的包裹,將大大的降低系統的並發。所以通常情況下,我們處理這種場景的消息去重的方法還是會使用一開始說的業務自己實現去重邏輯的方式,如前面加select for update,或者使用樂觀鎖。
那我們有沒有方法抽取出一個公共的解決方案,能兼顧去重、通用、高性能呢?
其中一個思路是把上面的幾步,拆解成幾個不同的子消息,例如:
1、庫存系統消費A:檢查庫存並做鎖庫存,發送消息B給訂單服務
2、訂單系統消費消息B:插入訂單表(MySQL),發送消息C給自己(下游系統)消費
3、下游系統消費消息C:處理部分邏輯,發送消息D給訂單系統
4、訂單系統消費消息D:更新訂單狀態
註:上述步驟需要保證本地事務和消息是一個事務的(至少是最終一致性的),這其中涉及到分布式事務消息相關的話題,不在本文論述。
可以看到這樣的處理方法會使得每一步的操作都比較原子,而原子則意味著是小事務,小事務則意味著使用消息表+事務的方案顯得可行。
然而,這太復雜了!這把一個本來連續的代碼邏輯割裂成多個系統多次消息交互!那還不如業務代碼層面上加鎖實現呢。
上面消息表+本地事務的方案之所以有其局限性和並發的短板,究其根本是因為它依賴於關系型資料庫的事務,且必須要把事務包裹於整個消息消費的環節。
如果我們能不依賴事務而實現消息的去重,那麼方案就能推廣到更復雜的場景例如:RPC、跨庫等。
例如,我們依舊使用消息表,但是不依賴事務,而是針對消息表增加消費狀態,是否可以解決問題呢?
67_1.png
以上是去事務化後的消息冪等方案的流程,可以看到,此方案是無事務的,而是針對消息表本身做了狀態的區分:消費中、消費完成。只有消費完成的消息才會被冪等處理掉。而對於已有消費中的消息,後面重復的消息會觸發延遲消費(在RocketMQ的場景下即發送到RETRY TOPIC),之所以觸發延遲消費是為了控制並發場景下,第二條消息在第一條消息沒完成的過程中,去控制消息不丟(如果直接冪等,那麼會丟失消息(同一個消息id的話),因為上一條消息如果沒有消費完成的時候,第二條消息你已經告訴broker成功了,那麼第一條消息這時候失敗broker也不會重新投遞了)
上面的流程不再細說,後文有github源碼的地址,讀者可以參考源碼的實現,這里我們回頭看看我們一開始想解決的問題是否解決了:
1、 消息已經消費成功了,第二條消息將被直接冪等處理掉(消費成功)。
2、 並發場景下的消息,依舊能滿足不會出現消息重復,即穿透冪等擋板的問題。
3、 支持上游業務生產者重發的業務重復的消息冪等問題。
關於第一個問題已經很明顯已經解決了,在此就不討論了。
關於第二個問題是如何解決的?主要是依靠插入消息表的這個動作做控制的,假設我們用MySQL作為消息表的存儲媒介(設置消息的唯一ID為主鍵),那麼插入的動作只有一條消息會成功,後面的消息插入會由於主鍵沖突而失敗,走向延遲消費的分支,然後後面延遲消費的時候就會變成上面第一個場景的問題。
關於第三個問題,只要我們設計去重的消息鍵讓其支持業務的主鍵(例如訂單號、請求流水號等),而不僅僅是messageId即可。所以也不是問題。
如果細心的讀者可能會發現這里實際上是有邏輯漏洞的,問題出在上面聊到的個三問題中的第2個問題(並發場景),在並發場景下我們依賴於消息狀態是做並發控制使得第2條消息重復的消息會不斷延遲消費(重試)。但如果這時候第1條消息也由於一些異常原因(例如機器重啟了、外部異常導致消費失敗)沒有成功消費成功呢?也就是說這時候延遲消費實際上每次下來看到的都是 消費中 的狀態,最後消費就會被視為消費失敗而被投遞到死信Topic中(RocketMQ默認可以重復消費16次)。
有這種顧慮是正確的!對於此,我們解決的方法是,插入的消息表必須要帶一個最長消費過期時間,例如10分鍾,意思是如果一個消息處於 消費中 超過10分鍾,就需要從消息表中刪除(需要程序自行實現)。所以最後這個消息的流程會是這樣的:
67_2.png
我們這個方案實際上沒有事務的,只需要一個存儲的中心媒介,那麼自然我們可以選擇更靈活的存儲媒介,例如Redis。使用Redis有兩個好處:
1、性能上損耗更低
2、上面我們講到的超時時間可以直接利用Redis本身的ttl實現
當然Redis存儲的數據可靠性、一致性等方面是不如MySQL的,需要用戶自己取捨。
以上方案針對RocketMQ的Java實現已經開源放到Github中,具體的使用文檔可以參考https://github.com/Jaskey/RocketMQDepListener ,
以下僅貼一個Readme中利用Redis去重的使用樣例,用以意業務中如果使用此工具加入消息去重冪等的是多麼簡單:
以上代碼大部分是原始RocketMQ的必須代碼,唯一需要修改的僅僅是創建一個 DepConcurrentListener 示例,在這個示例中指明你的消費邏輯和去重的業務鍵(默認是messageId)。
更多使用詳情請參考Github上的說明。
實現到這里,似乎方案挺完美的,所有的消息都能快速的接入去重,且與具體業務實現也完全解耦。那麼這樣是否就完美的完成去重的所有任務呢?
很可惜,其實不是的。原因很簡單:因為要保證消息至少被成功消費一遍,那麼消息就有機會消費到一半的時候失敗觸發消息重試的可能。還是以上面的訂單流程X:
1、 檢查庫存(RPC)
2、 鎖庫存(RPC)
3、 開啟事務,插入訂單表(MySQL)
4、 調用某些其他下游服務(RPC)
5、 更新訂單狀態
6、 commit 事務(MySQL)
當消息消費到步驟3的時候,我們假設MySQL異常導致失敗了,觸發消息重試。因為在重試前我們會刪除冪等表的記錄,所以消息重試的時候就會重新進入消費代碼,那麼步驟1和步驟2就會重新再執行一遍。如果步驟2本身不是冪等的,那麼這個業務消息消費依舊沒有做好完整的冪等處理。
那麼既然這個並不能完整的完成消息冪等,還有什麼價值呢?價值可就大了!雖然這不是解決消息冪等的銀彈(事實上,軟體工程領域里基本沒有銀彈),但是他能以便捷的手段解決:
1、各種由於Broker、負載均衡等原因導致的消息重投遞的重復問題
2、各種上游生產者導致的業務級別消息重復問題
3、重復消息並發消費的控制窗口問題,就算重復,重復也不可能同一時間進入消費邏輯
也就是說,使用這個方法能保證正常的消費邏輯場景下(無異常,無異常退出),消息的冪等工作全部都能解決,無論是業務重復,還是rocketmq特性帶來的重復。
事實上,這已經能解決99%的消息重復問題了,畢竟異常的場景肯定是少數的。那麼如果希望異常場景下也能處理好冪等的問題,可以做以下工作降低問題率:
1、消息消費失敗做好回滾處理。如果消息消費失敗本身是帶回滾機制的,那麼消息重試自然就沒有副作用了。
2、消費者做好優雅退出處理。這是為了盡可能避免消息消費到一半程序退出導致的消息重試。
3、一些無法做到冪等的操作,至少要做到終止消費並告警。例如鎖庫存的操作,如果統一的業務流水鎖成功了一次庫存,再觸發鎖庫存,如果做不到冪等的處理,至少要做到消息消費觸發異常(例如主鍵沖突導致消費異常等)
4、在#3做好的前提下,做好消息的消費監控,發現消息重試不斷失敗的時候,手動做好#1的回滾,使得下次重試消費成功
⑵ 雙十一是怎麼保證高並發,分布式系統中,數據一致性
前言 在系統開發過程中,經常遇到數據重復插入、重復更新、消息重發發送等等問題,因為應用系統的復雜邏輯以及網路交互存在的不確定性,會導致這一重復現象,但是有些邏輯是需要有冪等特性的,否則造成的後果會比較嚴重,例如訂單重復創建,這時候帶來的問題可是非同一般啊。 什麼是系統的冪等性 冪等是數據中得一個概念,表示N次變換和1次變換的結果相同。 高並發的系統如何保證冪等性? 1.查詢 查詢的API,可以說是天然的冪等性,因為你查詢一次和查詢兩次,對於系統來講,沒有任何數據的變更,所以,查詢一次和查詢多次一樣的。 2.MVCC方案 多版本並發控制,update with condition,更新帶條件,這也是在系統設計的時候,合理的選擇樂觀鎖,通過version或者其他條件,來做樂觀鎖,這樣保證更新及時在並發的情況下,也不會有太大的問題。 例如:update table_xxx set name=#name#,version=version+1 where version=#version# ,或者是 update table_xxx set quality=quality-#subQuality# where quality-#subQuality# >= 0 。 3.單獨的去重表 如果涉及到的去重的地方特別多,例如ERP系統中有各種各樣的業務單據,每一種業務單據都需要去重,這時候,可以單獨搞一張去重表,在插入數據的時候,插入去重表,利用資料庫的唯一索引特性,保證唯一的邏輯。 4.分布式鎖 還是拿插入數據的例子,如果是分布是系統,構建唯一索引比較困難,例如唯一性的欄位沒法確定,這時候可以引入分布式鎖,通過第三方的系統,在業務系統插入數據或者更新數據,獲取分布式鎖,然後做操作,之後釋放鎖,這樣其實是把多線程並發的鎖的思路,引入多多個系統,也就是分布式系統中得解決思路。 5.刪除數據 刪除數據,僅僅第一次刪除是真正的操作數據,第二次甚至第三次刪除,直接返回成功,這樣保證了冪等。 6.插入數據的唯一索引 插入數據的唯一性,可以通過業務主鍵來進行約束,例如一個特定的業務場景,三個欄位肯定確定唯一性,那麼,可以在資料庫表添加唯一索引來進行標示。 這里有一個場景,API層面的冪等,例如提交數據,如何控制重復提交,這里可以在提交數據的form表單或者客戶端軟體,增加一個唯一標示,然後服務端,根據這個UUID來進行去重,這樣就能比較好的做到API層面的唯一標識。 7.狀態機冪等 在設計單據相關的業務,或者是任務相關的業務,肯定會涉及到狀態機,就是業務單據上面有個狀態,狀態在不同的情況下會發生變更,一般情況下存在有限狀態機,這時候,如果狀態機已經處於下一個狀態,這時候來了一個上一個狀態的變更,理論上是不能夠變更的,這樣的話,保證了有限狀態機的冪等。 以上就是高並發系統數據冪等的解決方案的資料整理,後續繼續補充相關知識,謝謝大家對本站的支持!