MQ系列1:消息中間件執(zhí)行原理MQ系列2:消息中間件的技術(shù)選型MQ系列3:RocketMQ 架構(gòu)分析MQ系列4:NameServer 原理解析MQ系列5:RocketMQ消息的發(fā)送模式MQ系列6:消息的消費(fèi)MQ系列7:消息通信,追求極致性能 MQ系列8:數(shù)據(jù)存儲(chǔ),消息隊(duì)列的高可用保障MQ系列9:高可用架構(gòu)分析MQ系列10:如何保證消息冪等性消費(fèi)MQ系列11:如何保證消息可靠性傳輸
(資料圖片)
消息的有序性在很多業(yè)務(wù)場(chǎng)景中占有很重要的位置。比如購(gòu)物場(chǎng)景,需要按照 創(chuàng)建訂單 --> 訂單付款 --> 完成訂單 順序執(zhí)行。又比如出行場(chǎng)景,接單 --> 接送到達(dá)目的地 --> 付款 --> 完成訂單。這種是嚴(yán)格按照順序執(zhí)行的,這樣的順序消費(fèi)才不會(huì)出問(wèn)題,而且各個(gè)訂單之間是互相獨(dú)立和并行執(zhí)行的。所以,在MQ中,如何穩(wěn)定地保證順序性消息處理,是一個(gè)不可避免的話題。
2 消息的有序性說(shuō)明消息的有序執(zhí)行,一般不是單個(gè)組件的能力。而是整個(gè)消息從生產(chǎn),排隊(duì),存儲(chǔ)到消費(fèi)都是有序的,比如上面提到的購(gòu)物和出行場(chǎng)景。這就要求我們?cè)谙㈥?duì)列(如果是Kafka,還是RocketMQ、RabbitMQ)中,保證以下前提:
消息生產(chǎn)的有序性:即生產(chǎn)者組件有序發(fā)送消息消息入出隊(duì)列的有序性:即消息是按照進(jìn)入的先后順序排隊(duì)列放的,遵循FIFO原則。消息的存儲(chǔ)的有序性:與上一點(diǎn)一致,部分場(chǎng)景下為了提高可用,就是要持久化到磁盤(pán),這時(shí)候應(yīng)該遵循有序存放,才能保證后續(xù)有序消費(fèi)消息消費(fèi)的有序性:即按照順序進(jìn)行消費(fèi)。又分為全局順序消息與部分順序消息,全局是指Topic下的所有消息都要保證順序;部分順序消息保證每一組消息被順序消費(fèi)即可。這邊還有個(gè)問(wèn)題,如果想讓全局都是順序性消費(fèi),那么只能用一個(gè)消費(fèi)者去消費(fèi)隊(duì)列(一般來(lái)說(shuō)也是單個(gè)生產(chǎn)者),這是會(huì)嚴(yán)重影響整體性能的,一般沒(méi)這個(gè),都是分組順序執(zhí)行消費(fèi)的。
2.1 消息生產(chǎn)的有序性要保證整個(gè)消息隊(duì)列的有序性執(zhí)行,首先要保證消息生產(chǎn)的有序性。RocketMQ在Broker中防止了很多Topic,主題(Topic)可以看做消息的歸類,我們將消息進(jìn)行類型劃分,相同類型的消息稱為一個(gè) Topic。比如我們?cè)谔詫毣蚓〇|上購(gòu)買商品的的過(guò)程,就可能產(chǎn)生:購(gòu)物車消息、交易消息、物流消息等,1條消息必然歸屬于1個(gè) Topic 。1個(gè) Topic可以有0 ~ n 個(gè)生產(chǎn)者向其發(fā)送消息;也可以被 0~n 個(gè)消費(fèi)者訂閱和處理,于是就有出現(xiàn)了生產(chǎn)者組和消費(fèi)者組,如下圖:
或者同一個(gè)Topic中,創(chuàng)建不同的Queue,同一個(gè)消息生產(chǎn)者將消息隔離發(fā)送到不同的Queue中:
按照上述的模式,同理,我們只需要保證一組相同的消息按照給定的順序存入同一個(gè)隊(duì)列中,就能保證生產(chǎn)者有序存儲(chǔ),比如一次完整的消費(fèi)過(guò)程:創(chuàng)建訂單、付款、完成訂單按照順序在一個(gè)隊(duì)列(Queue)中執(zhí)行那就可以了。
★ 同時(shí)我們要保證同一組的消息在消息生產(chǎn)的時(shí)候投送到一個(gè)組中。這個(gè)相對(duì)來(lái)說(shuō)不難,可以這么做:
比如一個(gè)訂單的多個(gè)子消息的父訂單號(hào)是一致,我們把這些消息按照訂單號(hào)取模,投送到對(duì)應(yīng)的Queue中就行了,比如 訂單號(hào) % 隊(duì)列數(shù)量( 163105015 % 9)發(fā)送消息自定義消息標(biāo)簽(消息標(biāo)簽可以用隊(duì)列編號(hào)命名),一組消息使用同一個(gè)標(biāo)簽,改組標(biāo)簽對(duì)應(yīng)的消息都投向標(biāo)簽所在的隊(duì)列。★ 業(yè)務(wù)程序方面,必須使用同步發(fā)送的方式,這樣才能保證生產(chǎn)者發(fā)送的消息有序,否則按照FIFO的原則,很可能 訂單完成會(huì)被先消費(fèi)。但是我們業(yè)務(wù)程序,比如Java代碼中為了提升性能,可能使用多線程的模式進(jìn)行事件觸發(fā)。多線程下保證生產(chǎn)者順序性,可以使用鎖并配合 spring的publish event(按照順序執(zhí)行的內(nèi)部隊(duì)列),持久化之后,再按照先進(jìn)先出的順序推送消息進(jìn)入MQ中??梢詤⒖枷?,大概就是將你的事件進(jìn)行順序化一下。
★ 上述方法也不能完完全全的避免順序化執(zhí)行。如果broker服務(wù)發(fā)生故障,或者消息發(fā)生丟失,都有可能導(dǎo)致事件消費(fèi)不完整,出現(xiàn)不一致的問(wèn)題。
2.2 消息有序性存儲(chǔ)Broker 存儲(chǔ)架構(gòu)采用文件存儲(chǔ)機(jī)制(類似Kafka),即直接在磁盤(pán)上使用文件來(lái)保存消息,而不是采用Redis或者M(jìn)ySQL之類的持久化工具。它會(huì)把消息存儲(chǔ)所屬相關(guān)的文件存儲(chǔ)在ROCKETMQ_HOME下,包含三個(gè)部分:
CommitLog 消息元數(shù)據(jù)ConsumeQueue 消息邏輯隊(duì)列IndexFile 索引文件存儲(chǔ)消息的元數(shù)據(jù),所有消息都會(huì)順序存入到CommitLog文件中。ConsumeQueue是指存儲(chǔ)消息在CommitLog上的索引,一個(gè)MessageQueue一個(gè)文件,記錄當(dāng)前MessageQueue被哪些消費(fèi)者組消費(fèi)到了哪一條CommitLog。所以一切都是順序性操作下來(lái)的,而且按照 MessageQueue 做了隔離了,不用擔(dān)心亂序的問(wèn)題。詳細(xì)參考 《MQ系列8:數(shù)據(jù)存儲(chǔ),消息隊(duì)列的高可用保障》
2.3 消息消費(fèi)的有序性最后一步就是消費(fèi)的有序性了,既然消息生產(chǎn)和消息持久化都可以做到有序性。那么只要保證消費(fèi)的有序性,就能保證整個(gè)消息隊(duì)列的有序執(zhí)行。這邊以RocketMQ為例子,RockerMQ采用MessageListener 回調(diào)函數(shù)進(jìn)行監(jiān)聽(tīng),監(jiān)聽(tīng)到消息之后進(jìn)行數(shù)據(jù)處理。MessageListener主要提供了兩種消費(fèi)模式,如下:
有序消費(fèi)模式MessageListenerOrderly并發(fā)消費(fèi)模式MessageListenerConcurrently其中有序消費(fèi)模式有序消費(fèi)模式MessageListenerOrderly可以保證按照順序進(jìn)行消息處理。但是消費(fèi)的業(yè)務(wù)代碼實(shí)現(xiàn)是多線程并行的,依然是無(wú)法保證的。實(shí)際上RocketMQ也是這么做的,MessageListenerConcurrently拉到消息之后會(huì)提交到線程池去消費(fèi),而MessageListenerOrderly則是通過(guò)分布式鎖和本地鎖保證同時(shí)只有一條線程去消費(fèi)一個(gè)隊(duì)列(Queue)上的數(shù)據(jù)。這種消費(fèi)模式就是使用以下3把鎖來(lái)確保順序性:
broker端的分布式鎖messageQueue的本地synchronized鎖ProcessQueue的本地consumeLock3 總結(jié)要消息的順序性消費(fèi):需要保持先后順序的消息放到同一個(gè)消息隊(duì)列中(kafka中就是partition,rabbitMq中就是queue),然后使用線程池消費(fèi)的時(shí)候使用分布式鎖和本地鎖保證同時(shí)只有一條線程去消費(fèi)一個(gè)隊(duì)列(Queue)上的數(shù)據(jù)。
關(guān)鍵詞: