當前位置:首頁 » 電腦資訊 » 為什麼kafka啟動時文件過大

為什麼kafka啟動時文件過大

發布時間: 2023-05-29 10:59:00

① Kafka partition的數量問題

kafka的每個topic都可以創建多個partition,partition的數量無上限,並不局數會像replica一樣受限於broker的數量,因此partition的數量可以隨心所欲的設置。那確定partition的數量就需要思考一些權衡因素。

越多的partition可以提供更高的吞吐量

在kafka中,單個partition是kafka並行操作的最小單元。每個partition可以獨立接收推送的消息以及被consumer消費,相當於topic的一個子通道,partition和topic的關系就像高速公路的車道和高速公路的關系一樣,起始點和終點相同,每個車道都可以獨立實現運輸,不同的是kafka中不存在車輛變道的說法,入口時選擇的車道需要從一而終。而kafka的吞吐量顯而易見,在資源足夠的情況下,partition越多速度越快。

這里提到的資源充足解釋一下,假設我現在一個partition的最大傳輸速度為p,目前kafka集群共有三個broker,每個broker的資源足夠支撐三個partition最大速度傳輸,那我的集群最大傳輸速度為3*3*p=9p,假設在不增加資源的情況下將partition增加到18個,每個partition只能以p/2的速度傳輸數據,因此傳輸速度上限還是9p,並不能再提升,因此吞吐量的設計需要考慮broker的資源上限。當然,kafka跟其他集群一樣,可以橫向擴展,再增加三個相同資源的broker,那傳輸速度即可達到18p。

越多的分區需要打開更多的文件句柄

在kafka的broker中,每個分區都會對照著文件系統的一個目錄。

在kafka的數據日誌文件目錄中,每個日誌數據段都會分配兩個文件,一個索引文件和一個數據文件。因此,隨著partition的增多,需要的文件句柄數急劇增加,必要時需要調整操作系統允許打開的文件句柄數。

更多的分區會導致端對端的延遲

kafka端對端的延遲為procer端發布消息到consumer端消費消息所需的時間,即consumer接收消息的時間減去proce發布消息的時間。kafka在消息正確接收後才會暴露給消費者,即在保證in-sync副本復製成功之後才會暴露,瓶頸則來自於此。在一個broker上的副本從其他broker的leader上復制數據的時候只會開啟一個線程,假設partition數量為n,每個副本同步的時間為1ms,那in-sync操作完成所需的時間即n*1ms,若n為10000,則需要10秒才能返回同步狀態,數據才能暴露給消費者,這就導致了較大的端對端的延遲。

越多的partition意味著需要更多的內存

在新版本的kafka中可以支持批量提交和批量消費,而設置了批量提交和批量消費後,每個partition都會需要一定的內存空間。假設為100k,當partition為100時,procer端和consumer端都需要10M的內存;當partition為100000時,procer端和consumer端則都需要10G內存。無限的partition數量很快就會占據大量的內存,造成性能瓶頸。

越多的partition會導致更長時間的恢復期

kafka通過多副本復制技術,實現kafka的高可用性和穩定性。每個partition都會有多個副本存在於多個broker中,其中一個副本為leader,其餘的為follower。當kafka集群其中一個broker出現故障時,在這個broker上的leader會需要在其他broker上重新選擇一個副本啟動為leader,這個過程由kafka controller來完成,主要是從Zookeeper讀取和修改受影響partition的一些元數據信息。

通常情況下,當一個broker有計劃的停機上,該broker上的partition leader會在broker停機前有次序頃稿的一一移走,假設移走雀臘孝一個需要1ms,10個partition leader則需要10ms,這影響很小,並且在移動其中一個leader的時候,其他九個leader是可用的,因此實際上每個partition leader的不可用時間為1ms。但是在宕機情況下,所有的10個partition

leader同時無法使用,需要依次移走,最長的leader則需要10ms的不可用時間窗口,平均不可用時間窗口為5.5ms,假設有10000個leader在此宕機的broker上,平均的不可用時間窗口則為5.5s。

更極端的情況是,當時的broker是kafka controller所在的節點,那需要等待新的kafka leader節點在投票中產生並啟用,之後新啟動的kafka leader還需要從zookeeper中讀取每一個partition的元數據信息用於初始化數據。在這之前partition leader的遷移一直處於等待狀態。

總結

通常情況下,越多的partition會帶來越高的吞吐量,但是同時也會給broker節點帶來相應的性能損耗和潛在風險,雖然這些影響很小,但不可忽略,因此需要根據自身broker節點的實際情況來設置partition的數量以及replica的數量。

② 什麼是kafka

Kafka最初由Linkedin公司開發,是一個分布式、支持分區的(partition)、多副本的(replica),基於zookeeper協調的分布式消息系統,它的最大特性就是可以實時處理大量數據以滿足各種需求場景:比如基於hadoop的批處理系統、低時延的實時系統、storm/Spark流式處理引擎,web/nginx日誌、訪問日誌,消息服務等等,用scala語言編寫,Linkedin於2010年貢獻給了Apache基金會並成為頂級開源項目。

消息隊列的性能好壞,其文件存儲機制設計是衡量一個消息隊列服務水平和最關鍵指標之一。

基本工作流程如上圖所示,其中:

我們看上面的架構圖中,procer就是生產者,是數據的入口。注意看圖中的紅色箭頭,Procer在寫入數據的時候 永遠的找leader ,不會直接將數據寫入follower!那leader怎麼找呢?寫入的流程又是什麼樣的呢?我們看下圖:

發送的流程就在圖中已經說明了,就不單獨在文字列出來了!需要注意的一點是,消息寫入leader後,follower是主動的去leader進行神世伏同步的!procer採用push模式將數據發布到broker,每條消息追加到分區中,順序寫入磁碟,所以保證 同一分區 內的數據是有序的!寫入示意圖如下:

上面說到數據會寫入到不同的分區,那kafka為什麼要做分區呢?相信大家應該也能猜到,分區的主要目的是:

熟悉負載均衡的朋友應該知道,當我們向某個伺服器發送請求的時候,服務端可能會對請求做一個負載,將流量分發到不同的伺服器,那在kafka中,如果某個topic有多個partition,procer又怎麼知道該將數據發往哪個partition呢?kafka中有幾個原則:

保證消息不丟失是一個消息隊列中間件的基本保證,那procer在向kafka寫入消息的時候,怎麼保證消息不丟失呢?其實上面的寫入流程圖中有描述出來,那就是通過ACK應答機制!在生產者向隊列寫入數據的時候可以設置參數來確定是否確認kafka接收到數據,這個參數可設置的值為 0 1 all

最後要注意的是,如果往不存在的topic寫數據,能不能寫入成功呢?kafka會自動創建topic,分區和副本的數量根據默認配置都是1。

Procer將數據寫入kafka後,集群就需要對數據進行保存了!kafka將數據保存在磁碟,可能在我們的一般的認知里,寫入磁碟是比較耗時的操作,不適合這種高並發的組件。Kafka初始會單獨開辟一塊磁碟空間,順序寫入數據(效率比隨機寫入高)。

前面說過了每個topic都可以分為一個或多個partition,游攜如果你覺得topic比較抽象,那partition就是比較具體的東西了!Partition在伺服器上的表現形式就是一個一個的文件夾,每個partition的文件夾下面會有多組segment文件,每組segment文件又包含.index文件、.log文件、.timeindex文件(早期版本中沒有)三個文件, log文件就實際是存儲message的地方,而index和timeindex文件為索引文件,用於檢索消息。

上面說到log文件就實際是存儲message的地方,我們在procer往kafka寫入的也是一條一條的message,那存儲在log中的message是什麼樣子的呢?消息主要包含消息體、消息大小、offset、壓縮類型……等等!我們重點需要知道的是下面三個:

無論消息是否被消費,kafka都會保存所有的消息。那對於舊數據有什麼刪除策略呢?

需要注意的是,kafka讀取特定消息的時間復雜度是O(1),所以這里刪除過期的文件並不會提高返絕kafka的性能!

消息存儲在log文件後,消費者就可以進行消費了。在講消息隊列通信的兩種模式的時候講到過點對點模式和發布訂閱模式。Kafka採用的是點對點的模式,消費者主動的去kafka集群拉取消息,與procer相同的是,消費者在拉取消息的時候也是 找leader 去拉取。

多個消費者可以組成一個消費者組(consumer group),每個消費者組都有一個組id!同一個消費組者的消費者可以消費同一topic下不同分區的數據,但是不會組內多個消費者消費同一分區的數據!!!如下圖:

圖示是消費者組內的消費者小於partition數量的情況,所以會出現某個消費者消費多個partition數據的情況,消費的速度也就不及只處理一個partition的消費者的處理速度!如果是消費者組的消費者多於partition的數量,那會不會出現多個消費者消費同一個partition的數據呢?上面已經提到過不會出現這種情況!多出來的消費者不消費任何partition的數據。所以在實際的應用中,建議 消費者組的consumer的數量與partition的數量一致

kafka使用文件存儲消息(append only log),這就直接決定kafka在性能上嚴重依賴文件系統的本身特性.且無論任何OS下,對文件系統本身的優化是非常艱難的.文件緩存/直接內存映射等是常用的手段.因為kafka是對日誌文件進行append操作,因此磁碟檢索的開支是較小的;同時為了減少磁碟寫入的次數,broker會將消息暫時buffer起來,當消息的個數(或尺寸)達到一定閥值時,再flush到磁碟,這樣減少了磁碟IO調用的次數.對於kafka而言,較高性能的磁碟,將會帶來更加直接的性能提升.

除磁碟IO之外,我們還需要考慮網路IO,這直接關繫到kafka的吞吐量問題.kafka並沒有提供太多高超的技巧;對於procer端,可以將消息buffer起來,當消息的條數達到一定閥值時,批量發送給broker;對於consumer端也是一樣,批量fetch多條消息.不過消息量的大小可以通過配置文件來指定.對於kafka broker端,似乎有個sendfile系統調用可以潛在的提升網路IO的性能:將文件的數據映射到系統內存中,socket直接讀取相應的內存區域即可,而無需進程再次和交換(這里涉及到"磁碟IO數據"/"內核內存"/"進程內存"/"網路緩沖區",多者之間的數據).

其實對於procer/consumer/broker三者而言,CPU的開支應該都不大,因此啟用消息壓縮機制是一個良好的策略;壓縮需要消耗少量的CPU資源,不過對於kafka而言,網路IO更應該需要考慮.可以將任何在網路上傳輸的消息都經過壓縮.kafka支持gzip/snappy等多種壓縮方式

kafka集群中的任何一個broker,都可以向procer提供metadata信息,這些metadata中包含"集群中存活的servers列表"/"partitions leader列表"等信息(請參看zookeeper中的節點信息). 當procer獲取到metadata信息之後, procer將會和Topic下所有partition leader保持socket連接;消息由procer直接通過socket發送到broker,中間不會經過任何"路由層".

非同步發送,將多條消息暫且在客戶端buffer起來,並將他們批量發送到broker;小數據IO太多,會拖慢整體的網路延遲,批量延遲發送事實上提升了網路效率;不過這也有一定的隱患,比如當procer失效時,那些尚未發送的消息將會丟失。

其他JMS實現,消息消費的位置是有prodiver保留,以便避免重復發送消息或者將沒有消費成功的消息重發等,同時還要控制消息的狀態.這就要求JMS broker需要太多額外的工作.在kafka中,partition中的消息只有一個consumer在消費,且不存在消息狀態的控制,也沒有復雜的消息確認機制,可見kafka broker端是相當輕量級的.當消息被consumer接收之後,consumer可以在本地保存最後消息的offset,並間歇性的向zookeeper注冊offset.由此可見,consumer客戶端也很輕量級。

kafka中consumer負責維護消息的消費記錄,而broker則不關心這些,這種設計不僅提高了consumer端的靈活性,也適度的減輕了broker端設計的復雜度;這是和眾多JMS prodiver的區別.此外,kafka中消息ACK的設計也和JMS有很大不同,kafka中的消息是批量(通常以消息的條數或者chunk的尺寸為單位)發送給consumer,當消息消費成功後,向zookeeper提交消息的offset,而不會向broker交付ACK.或許你已經意識到,這種"寬松"的設計,將會有"丟失"消息/"消息重發"的危險.

Kafka提供3種消息傳輸一致性語義:最多1次,最少1次,恰好1次。

最少1次:可能會重傳數據,有可能出現數據被重復處理的情況;

最多1次:可能會出現數據丟失情況;

恰好1次:並不是指真正只傳輸1次,只不過有一個機制。確保不會出現「數據被重復處理」和「數據丟失」的情況。

at most once: 消費者fetch消息,然後保存offset,然後處理消息;當client保存offset之後,但是在消息處理過程中consumer進程失效(crash),導致部分消息未能繼續處理.那麼此後可能其他consumer會接管,但是因為offset已經提前保存,那麼新的consumer將不能fetch到offset之前的消息(盡管它們尚沒有被處理),這就是"at most once".

at least once: 消費者fetch消息,然後處理消息,然後保存offset.如果消息處理成功之後,但是在保存offset階段zookeeper異常或者consumer失效,導致保存offset操作未能執行成功,這就導致接下來再次fetch時可能獲得上次已經處理過的消息,這就是"at least once".

"Kafka Cluster"到消費者的場景中可以採取以下方案來得到「恰好1次」的一致性語義:

最少1次+消費者的輸出中額外增加已處理消息最大編號:由於已處理消息最大編號的存在,不會出現重復處理消息的情況。

kafka中,replication策略是基於partition,而不是topic;kafka將每個partition數據復制到多個server上,任何一個partition有一個leader和多個follower(可以沒有);備份的個數可以通過broker配置文件來設定。leader處理所有的read-write請求,follower需要和leader保持同步.Follower就像一個"consumer",消費消息並保存在本地日誌中;leader負責跟蹤所有的follower狀態,如果follower"落後"太多或者失效,leader將會把它從replicas同步列表中刪除.當所有的follower都將一條消息保存成功,此消息才被認為是"committed",那麼此時consumer才能消費它,這種同步策略,就要求follower和leader之間必須具有良好的網路環境.即使只有一個replicas實例存活,仍然可以保證消息的正常發送和接收,只要zookeeper集群存活即可.

選擇follower時需要兼顧一個問題,就是新leader server上所已經承載的partition leader的個數,如果一個server上有過多的partition leader,意味著此server將承受著更多的IO壓力.在選舉新leader,需要考慮到"負載均衡",partition leader較少的broker將會更有可能成為新的leader.

每個log entry格式為"4個位元組的數字N表示消息的長度" + "N個位元組的消息內容";每個日誌都有一個offset來唯一的標記一條消息,offset的值為8個位元組的數字,表示此消息在此partition中所處的起始位置..每個partition在物理存儲層面,有多個log file組成(稱為segment).segment file的命名為"最小offset".kafka.例如"00000000000.kafka";其中"最小offset"表示此segment中起始消息的offset.

獲取消息時,需要指定offset和最大chunk尺寸,offset用來表示消息的起始位置,chunk size用來表示最大獲取消息的總長度(間接的表示消息的條數).根據offset,可以找到此消息所在segment文件,然後根據segment的最小offset取差值,得到它在file中的相對位置,直接讀取輸出即可.

kafka使用zookeeper來存儲一些meta信息,並使用了zookeeper watch機制來發現meta信息的變更並作出相應的動作(比如consumer失效,觸發負載均衡等)

Broker node registry: 當一個kafka broker啟動後,首先會向zookeeper注冊自己的節點信息(臨時znode),同時當broker和zookeeper斷開連接時,此znode也會被刪除.

Broker Topic Registry: 當一個broker啟動時,會向zookeeper注冊自己持有的topic和partitions信息,仍然是一個臨時znode.

Consumer and Consumer group: 每個consumer客戶端被創建時,會向zookeeper注冊自己的信息;此作用主要是為了"負載均衡".一個group中的多個consumer可以交錯的消費一個topic的所有partitions;簡而言之,保證此topic的所有partitions都能被此group所消費,且消費時為了性能考慮,讓partition相對均衡的分散到每個consumer上.

Consumer id Registry: 每個consumer都有一個唯一的ID(host:uuid,可以通過配置文件指定,也可以由系統生成),此id用來標記消費者信息.

Consumer offset Tracking: 用來跟蹤每個consumer目前所消費的partition中最大的offset.此znode為持久節點,可以看出offset跟group_id有關,以表明當group中一個消費者失效,其他consumer可以繼續消費.

Partition Owner registry: 用來標記partition正在被哪個consumer消費.臨時znode。此節點表達了"一個partition"只能被group下一個consumer消費,同時當group下某個consumer失效,那麼將會觸發負載均衡(即:讓partitions在多個consumer間均衡消費,接管那些"游離"的partitions)

當consumer啟動時,所觸發的操作:

A) 首先進行"Consumer id Registry";

B) 然後在"Consumer id Registry"節點下注冊一個watch用來監聽當前group中其他consumer的"leave"和"join";只要此znode path下節點列表變更,都會觸發此group下consumer的負載均衡.(比如一個consumer失效,那麼其他consumer接管partitions).

C) 在"Broker id registry"節點下,注冊一個watch用來監聽broker的存活情況;如果broker列表變更,將會觸發所有的groups下的consumer重新balance.

總結:

Kafka的核心是日誌文件,日誌文件在集群中的同步是分布式數據系統最基礎的要素。

如果leaders永遠不會down的話我們就不需要followers了!一旦leader down掉了,需要在followers中選擇一個新的leader.但是followers本身有可能延時太久或者crash,所以必須選擇高質量的follower作為leader.必須保證,一旦一個消息被提交了,但是leader down掉了,新選出的leader必須可以提供這條消息。大部分的分布式系統採用了多數投票法則選擇新的leader,對於多數投票法則,就是根據所有副本節點的狀況動態的選擇最適合的作為leader.Kafka並不是使用這種方法。

Kafka動態維護了一個同步狀態的副本的集合(a set of in-sync replicas),簡稱ISR,在這個集合中的節點都是和leader保持高度一致的,任何一條消息必須被這個集合中的每個節點讀取並追加到日誌中了,才回通知外部這個消息已經被提交了。因此這個集合中的任何一個節點隨時都可以被選為leader.ISR在ZooKeeper中維護。ISR中有f+1個節點,就可以允許在f個節點down掉的情況下不會丟失消息並正常提供服。ISR的成員是動態的,如果一個節點被淘汰了,當它重新達到「同步中」的狀態時,他可以重新加入ISR.這種leader的選擇方式是非常快速的,適合kafka的應用場景。

一個邪惡的想法:如果所有節點都down掉了怎麼辦?Kafka對於數據不會丟失的保證,是基於至少一個節點是存活的,一旦所有節點都down了,這個就不能保證了。

實際應用中,當所有的副本都down掉時,必須及時作出反應。可以有以下兩種選擇:

這是一個在可用性和連續性之間的權衡。如果等待ISR中的節點恢復,一旦ISR中的節點起不起來或者數據都是了,那集群就永遠恢復不了了。如果等待ISR意外的節點恢復,這個節點的數據就會被作為線上數據,有可能和真實的數據有所出入,因為有些數據它可能還沒同步到。Kafka目前選擇了第二種策略,在未來的版本中將使這個策略的選擇可配置,可以根據場景靈活的選擇。

這種窘境不只Kafka會遇到,幾乎所有的分布式數據系統都會遇到。

以上僅僅以一個topic一個分區為例子進行了討論,但實際上一個Kafka將會管理成千上萬的topic分區.Kafka盡量的使所有分區均勻的分布到集群所有的節點上而不是集中在某些節點上,另外主從關系也盡量均衡這樣每個幾點都會擔任一定比例的分區的leader.

優化leader的選擇過程也是很重要的,它決定了系統發生故障時的空窗期有多久。Kafka選擇一個節點作為「controller」,當發現有節點down掉的時候它負責在游泳分區的所有節點中選擇新的leader,這使得Kafka可以批量的高效的管理所有分區節點的主從關系。如果controller down掉了,活著的節點中的一個會備切換為新的controller.

對於某個分區來說,保存正分區的"broker"為該分區的"leader",保存備份分區的"broker"為該分區的"follower"。備份分區會完全復制正分區的消息,包括消息的編號等附加屬性值。為了保持正分區和備份分區的內容一致,Kafka採取的方案是在保存備份分區的"broker"上開啟一個消費者進程進行消費,從而使得正分區的內容與備份分區的內容保持一致。一般情況下,一個分區有一個「正分區」和零到多個「備份分區」。可以配置「正分區+備份分區」的總數量,關於這個配置,不同主題可以有不同的配置值。注意,生產者,消費者只與保存正分區的"leader"進行通信。

Kafka允許topic的分區擁有若干副本,這個數量是可以配置的,你可以為每個topic配置副本的數量。Kafka會自動在每個副本上備份數據,所以當一個節點down掉時數據依然是可用的。

Kafka的副本功能不是必須的,你可以配置只有一個副本,這樣其實就相當於只有一份數據。

創建副本的單位是topic的分區,每個分區都有一個leader和零或多個followers.所有的讀寫操作都由leader處理,一般分區的數量都比broker的數量多的多,各分區的leader均勻的分布在brokers中。所有的followers都復制leader的日誌,日誌中的消息和順序都和leader中的一致。followers向普通的consumer那樣從leader那裡拉取消息並保存在自己的日誌文件中。

許多分布式的消息系統自動的處理失敗的請求,它們對一個節點是否著(alive)」有著清晰的定義。Kafka判斷一個節點是否活著有兩個條件:

符合以上條件的節點准確的說應該是「同步中的(in sync)」,而不是模糊的說是「活著的」或是「失敗的」。Leader會追蹤所有「同步中」的節點,一旦一個down掉了,或是卡住了,或是延時太久,leader就會把它移除。至於延時多久算是「太久」,是由參數replica.lag.max.messages決定的,怎樣算是卡住了,怎是由參數replica.lag.time.max.ms決定的。

只有當消息被所有的副本加入到日誌中時,才算是「committed」,只有committed的消息才會發送給consumer,這樣就不用擔心一旦leader down掉了消息會丟失。Procer也可以選擇是否等待消息被提交的通知,這個是由參數acks決定的。

Kafka保證只要有一個「同步中」的節點,「committed」的消息就不會丟失。

③ windows系統下啟動kafka CMD報錯:輸入行太長,語法錯誤

        在windows系孝和伍統用cmd輸命令:.\bin\windows\kafka-server-start.bat .\config\server.properties  啟動kafka居然棚亮報錯:輸入行太長,語法錯誤,折磨了我一個小時。

        在網上查到一個解決方法:  kafka目錄不要巧或建太深,直接在放在D盤

 

④ Kafka 客戶端開啟壓縮

需要注意的是,

1. borker / server 默認允許的最大消息大小是 1M,過大的消息會被拒

2. 1M 是包括壓縮之後的大小,因此 procer/client 如果開啟壓縮,扮隱核將大於 1M 的數據廳掘壓縮至小於 1M 發送即可

3. 如果修改 broker 端的 message.max.bytes 大小,需要修改消費者、follower fetch 的大小與之匹配,並且允許較大的消息對性能有較大影響攜塌

1. 允許發的數據 > 1M

2. 開啟壓縮

⑤ kafka too many open files的解決方法

在生產環境中,為了方便將kafka做成了一個服務,使用systemctl start kafka,kafka用戶來對kafka進行啟動,可是在最近的一次升級中啟動應用時,kafka出現too many open files的雹明報錯並且宕機,

在linux系統下每一個進程都會有其相應的文件打開限制,可以使用cat /proc/<pid>/limits來進行查看。使用命令行啟動的應用會共用執行該命令用戶的文件打開數,若是將其做成一個服務,如果不經過相應的配置,那麼該進程默認的文件打開數為4096,所以解決該問題不僅需要提高kafka用戶的文件打開數,而且需要在啟動腳本中配置文件打開數,

1)進入kafka用戶中,執行ulimit -n可以查看kafka用戶的文件打開數

2)若是該文件打開數小,那麼需要在/etc/security/limits文件中添加:

kafka soft nofile <文件打開數>
kafka hard nofile <文件打開數>

3)也可以使用ulimit -n <文件打開數>來進肢歷行臨時配置

1)kafka.service腳本如下

[Unit]
Description=kafka service
Requires=zookeeper.service
After=zookeeper.service

[Service]
ExecStart=/data/apps_data/kafka/bin/kafka-server-start.sh /data/apps_data/kafka/config/server.properties
ExecStop=/home/kafka/bin/kafka-server-stop.sh
Type=simple
User=kafka
Group=kafka
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

[Unit]
Description=kafka service
Requires=zookeeper.service
After=zookeeper.service

[Service]
ExecStart=/data/apps_data/kafka/bin/kafka-server-start.sh /data/apps_data/kafka/config/server.properties
ExecStop=/home/kafka/bin/kafka-server-stop.sh
Type=simple
User=kafka
Group=kafka
LimitNOFILE=<文件打開數>
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

3)更改往後執行systemctl daemon-reload重載腳本源飢告

4)重啟kafka生效

⑥ 啟動kafka報錯,應該怎麼解決

Kafka是由LinkedIn設計的一個高吞吐量、分布式、基於發布訂閱模式的消息系統,使用Scala編寫,它以可水平擴展、可靠性、非同步通信和高吞吐率等特性而被廣泛使用。目前越來越多的開源分布式處理系統都支持與Kafka集成,其中Spark Streaming作為後端流引擎配合Kafka作為前端消息系統正成為當前流處理系統的主流架構之一。 然而,當下越來越多的安全漏洞、數據泄露等問題的爆發,安全正成為系統選型不得不考慮的問題,Kafka由於其安全機制的匱乏,也導致其在數據敏感行業的部署存在嚴重的安全隱患。本文將圍繞Kafka,先介紹其整體架構和關鍵概念察察州,再深入分析其架構之中存在的安全問題,最後分享下Transwarp在Kafka安全性上所做的工作及其使用方法。 Kafka架構與安全 首先,我們來了解下有關Kafka的幾個基本概念: Topic:Kafka把接收的消息按種類劃分,每個種類都稱之為Topic,由唯一的Topic Name標識。 Procer:向Topic發布消息的進程稱為Procer。 Consumer:從Topic訂閱消息的進程稱為Consumer。 Broker:Kafka集群包含一個或多個伺服器,這種伺服器被稱為Broker。 Kafka的整體架構如下圖所示,典型的Kafka集群包含一組發布消息的Procer,一組管理Topic的Broker,和一組訂閱消息的Consumer。Topic可以有多個分區,每個分區只存儲於一個Broker。Procer可以按照一定的策略將消息劃分給指定的分區,如簡單的輪詢各個分區或者按照特定欄位的Hash值指定分區。Broker需要通過ZooKeeper記錄集群的所有Broker、選舉分區的Leader,記錄Consumer的消費消息的偏移量,以及在Consumer Group發生變化時進行relalance. Broker接收和發送消息是被動的:由Procer主動發送消息,Consumer主動拉取消息。 然而,分析Kafka框架,我們會發現以下嚴重的安全問題: 1.網路敗蔽中的任何一台主機,都可以通過啟動Broker進程而加入Kafka集群,能夠接收Procer的消息,能夠篡改消息並發送給Consumer。 2.網路中的任何一台主機,都可以啟動惡意的Procer/Consumer連接到Broker,發送非法消息或拉取隱私消息數據。 3.Broker不支持連接到啟用Kerberos認證的ZooKeeper集群,沒有對存放在ZooKeeper上的數據設置許可權。任意用戶都能夠直接訪問ZooKeeper集群,對這些數據進行修改或刪除。 4.Kafka中的Topic不支持設置訪問控制列表,任意連接到Kafka集群的Consumer(或Procer)都能對任意Topic讀取(或發送)消息。 隨著Kafka應用場景越來越廣泛,特別是一些數據隱私程度較高的領域(如道路交通的視頻監控),上述安全問題的存在猶如一顆定時炸彈,一旦內網被黑客入侵或者內部出現惡意用戶,所有的隱私數據(如車輛出行記錄)都能夠輕易地被竊取,而無需攻破Broker所在的伺服器。 Kafka安全設計 基於上述分析,Transwarp從以下兩個方面增強Kafka的安全性: 身份認證(Authentication):設計並實現了基於Kerberos和基於IP的兩種身份認證機制。前者為強身份認證,相比於後者具有更好的安全性,後者適用於IP地址可信的網路環境,相比於前者部署更為簡便。 許可權控制(Authorization):設計並實現了Topic級別的許可權模型。Topic的許可權分為READ(從Topic拉取數據)、WRITE(向Topic中生產數據)、CREATE(創建Topic)和DELETE(刪除Topic)。 基於Kerberos的身份機制如下圖所示: Broker啟動時,需要使用配置文件中的身份和密鑰文件向KDC(Kerberos伺服器)認證,認證通過則加入Kafka集群,否則報錯退出。 Procer(或Consumer)啟動後需要經過如下步驟與Broker建立安全的Socket連接: 1.Procer向KDC認證身份,通過則得到沒沖TGT(票證請求票證),否則報錯退出 2.Procer使用TGT向KDC請求Kafka服務,KDC驗證TGT並向Procer返回SessionKey(會話密鑰)和ServiceTicket(服務票證) 3.Procer使用SessionKey和ServiceTicket與Broker建立連接,Broker使用自身的密鑰解密ServiceTicket,獲得與Procer通信的SessionKey,然後使用SessionKey驗證Procer的身份,通過則建立連接,否則拒絕連接。 ZooKeeper需要啟用Kerberos認證模式,保證Broker或Consumer與其的連接是安全的。 Topic的訪問控制列表(ACL)存儲於ZooKeeper中,存儲節點的路徑為/acl/<topic>/<user>,節點數據為R(ead)、W(rite)、C(reate)、D(elete)許可權的集合,如/acl/transaction/jack節點的數據為RW,則表示用戶jack能夠對transaction這個topic進行讀和寫。 另外,kafka為特權用戶,只有kafka用戶能夠賦予/取消許可權。因此,ACL相關的ZooKeeper節點許可權為kafka具有所有許可權,其他用戶不具有任何許可權。 構建安全的Kafka服務 首先,我們為Broker啟用Kerberos認證模式,配置文件為/etc/kafka/conf/server.properties,安全相關的參數如下所示: 其中,authentication參數表示認證模式,可選配置項為simple, kerberos和ipaddress,默認為simple。當認證模式為kerberos時,需要額外配置賬戶屬性principal和對應的密鑰文件路徑keytab. 認證模式為ipaddress時,Procer和Consumer創建時不需要做任何改變。而認證模式為kerberos時,需要預先創建好相應的principal和keytab,並使用API進行登錄,樣例代碼如下所示: public class SecureProcer extends Thread { private final kafka.javaapi.procer.Procer<Integer, String> procer; private final String topic; private final Properties props = new Properties(); public SecureProcer(String topic) { AuthenticationManager.setAuthMethod(「kerberos」); AuthenticationManager.login(「procer1″, 「/etc/procer1.keytab」); props.put(「serializer.class」, 「kafka.serializer.StringEncoder」); props.put(「metadata.broker.list」, 「172.16.1.190:9092,172.16.1.192:9092,172.16.1.193:9092″); // Use random partitioner. Don』t need the key type. Just set it to Integer. // The message is of type String. procer = new kafka.javaapi.procer.Procer<Integer, String>( new ProcerConfig(props)); this.topic = topic;

⑦ kafka 單機/集群壓力測試

由於kafka吞吐量特別大,所以先考慮集群伺服器的自身瓶頸,因為現在測試的是單機所以只會涉及到磁碟IO以及cpu,但是對於kafka來說對於cpu的使用還是可以忽略不計的,

1.1磁碟IO寫入瓶頸
使用以下命令測試磁碟IO的寫入瓶頸
sync;time -p bash -c "(dd if=/dev/zero of=test.dd bs=1M count=20000)"
說明: 在當前目錄下創建一個test.dd的文件,寫入20000個1M的數據
磁碟寫入IO的結果
可以看到平均就是187MB/s

1.2 使用iostat命令監測磁碟io情況
使用命令
# iostat -x 1
說明: 擴展查看io性能,每秒刷新一次
注意: 如果沒有iostat,請執行 yum install sysstat -y 進行安裝 iostat命令

關注wkB/s和%util兩個參數

wkB/s:每秒寫入設備的數據量(單位:KB)

%util:消耗在I/O請求中的CPU時間百分比亂謹(設備帶寬利用率)。如果該值接近100%說明設備出現了瓶頸。
如圖現在這台機器的磁碟IO極限值為187MB/s

1.3 單機版測試kafka性能
因為測試的次數比較多,也沒有去找kafka中數據存儲設置,所以就使用docker部署單機版的kafka (因為測試的數據比較多,也就多次的刪除了容器,重新啟動鏡像)
新建目錄:
mkdir /usr/local/kafka_test
dockerfile

run.sh

sources.list

目錄結構如下:

生成鏡像
docker build -t kafka_test /usr/local/kafka_test
啟動kafka
docker run -d -it kafka_test

測試結果慶賀

從表格中可以看出來五個分區就已經是極限了

結果分析
這中間並沒有設置條數/每秒,所以就是按照kafka 就會按照量級自動的吞入數據,如果我們需要對於消息的即時性做控制,還需要再重新測試一下,按照業務的延遲找到最合適的數量(單機版,然後再部署集群,測試適合的數量)

集群測試:
部署就不再這里說明了
本次測試的是三台嘩差基機器集群

測試結果:

之後還測試了9個分區的topic 因為空間不足所以就沒有繼續測下去,但是看部分數據還超過了500MB/s還是有上升空間的

1.3 磁碟IO 讀取瓶頸
使用一下命令測試磁碟IO的讀取瓶頸
hdparm -tT --direct /dev/vda
說明: hdparm命令是顯示與設定硬碟的參數, -t參數為評估硬碟的讀取效率(不經過磁碟cache), -T參數為評估硬碟的讀取效率(經過磁碟cache).

⑧ 修改Kafka的啟動內存

1.修改 bin 目錄下的 zookeeper-server-start.sh ,將初始堆的大小(-Xms)設置前搏臘小一些

這一慧滑個是Kafka初始化銀圓LogManager時候用到的buffer size

⑨ kafka原理分析

作為一款典型的消息中間件產品,kafka系統仍然由procer、broker、consumer三部分組成。kafka涉及的幾個常用概念和組件簡派薯單介紹如下:

當consumer group的狀態發生變化(如有consumer故障、增減consumer成員等)或consumer group消費的topic狀態發生變化(如增加了partition,消費的topic發生變化),kafka集群會自動調整和重新分配consumer消費的partition,這個過程就叫做rebalance(再平衡)。

__consumer_offsets是kafka集群自己維護的一個特殊的topic,它裡面存儲的是每個consumer group已經消費了每個topic partition的offset。__consumer_offsets中offset消息的key由group id,topic name,partition id組成,格式為 {topic name}-${partition id},value值就是consumer提交的已消費的topic partition offset值。__consumer_offsets的分區數和副本數分別由offsets.topic.num.partitions(默認值為50)和offsets.topic.replication.factor(默認值為1)參數配置。我們通過公式 hash(group id) % offsets.topic.num.partitions 就可以計算出指定consumer group的已提交offset存儲的partition。由於consumer group提交的offset消息只有最後一條消息有意義,所以__consumer_offsets是一個compact topic,kafka集群會周期性的對__consumer_offsets執行compact操作,只保留最新的一次提交offset。

group coordinator運行在kafka某個broker上,負責consumer group內所有的consumer成員管理、所有的消費的topic的partition的消費關系分配、offset管理、觸發rebalance等功能。group coordinator管理partition分配時,會指定consumer group內某個consumer作為group leader執行具體的partition分配任務。存儲某個consumer group已提交offset的__consumer_offsets partition leader副本所在的broker就是該consumer group的協調器運行的broker。

跟大多數分布式系統一樣,集群有一個master角色管理整個集群,協調集群中各個成員的行為。kafka集群中的controller就相當於其它分布式系統的master,用來負責集群topic的分區分配,分區leader選舉以及維護集群的所有partition的ISR等集群協調功能。集群中哪個borker是controller也是通過一致性協議選舉產生的,2.8版本之前通腔銷過zookeeper進行選主,2.8版本後通過kafka raft協議進行選舉。如果controller崩潰,集群會重新選舉一個broker作為新的controller,並增加controller epoch值(相當於zookeeper ZAB協議的epoch,raft協議的term值)

當kafka集群新建了topic或為一個topic新增了partition,controller需要為這些新增加的partition分配到具體的broker上,並把分配結果記錄下來,供procer和consumer查詢獲取。

因為只有partition的leader副本才會處理procer和consumer的讀寫請求,而partition的其他follower副本需要從相應的leader副本同步消息,為了盡量保證集群中所有broker的負載是均衡的,controller在進行集群全局partition副本伍羨游分配時需要使partition的分布情況是如下這樣的:

在默認情況下,kafka採用輪詢(round-robin)的方式分配partition副本。由於partition leader副本承擔的流量比follower副本大,kafka會先分配所有topic的partition leader副本,使所有partition leader副本全局盡量平衡,然後再分配各個partition的follower副本。partition第一個follower副本的位置是相應leader副本的下一個可用broker,後面的副本位置依此類推。

舉例來說,假設我們有兩個topic,每個topic有兩個partition,每個partition有兩個副本,這些副本分別標記為1-1-1,1-1-2,1-2-1,1-2-2,2-1-1,2-1-2,2-2-1,2-2-2(編碼格式為topic-partition-replia,編號均從1開始,第一個replica是leader replica,其他的是follower replica)。共有四個broker,編號是1-4。我們先對broker按broker id進行排序,然後分配leader副本,最後分配foller副本。
1)沒有配置broker.rack的情況
現將副本1-1-1分配到broker 1,然後1-2-1分配到broker 2,依此類推,2-2-1會分配到broker 4。partition 1-1的leader副本分配在broker 1上,那麼下一個可用節點是broker 2,所以將副本1-1-2分配到broker 2上。同理,partition 1-2的leader副本分配在broker 2上,那麼下一個可用節點是broker 3,所以將副本1-1-2分配到broker 3上。依此類推分配其他的副本分片。最後分配的結果如下圖所示:

2)配置了broker.rack的情況
假設配置了兩個rack,broker 1和broker 2屬於Rack 1,broker 3和broker 4屬於Rack 2。我們對rack和rack內的broker分別排序。然後先將副本1-1-1分配到Rack 1的broker 1,然後將副本1-2-1分配到下一個Rack的第一個broker,即Rack 2的broker 3。其他的parttition leader副本依此類推。然後分配follower副本,partition 1-1的leader副本1-1-1分配在Rack 1的broker上,下一個可用的broker是Rack 2的broker 3,所以分配到broker 3上,其他依此類推。最後分配的結果如下圖所示:

kafka除了按照集群情況自動分配副本,也提供了reassign工具人工分配和遷移副本到指定broker,這樣用戶可以根據集群實際的狀態和各partition的流量情況分配副本

kafka集群controller的一項功能是在partition的副本中選擇一個副本作為leader副本。在topic的partition創建時,controller首先分配的副本就是leader副本,這個副本又叫做preference leader副本。

當leader副本所在broker失效時(宕機或網路分區等),controller需要為在該broker上的有leader副本的所有partition重新選擇一個leader,選擇方法就是在該partition的ISR中選擇第一個副本作為新的leader副本。但是,如果ISR成員只有一個,就是失效的leader自身,其餘的副本都落後於leader怎麼辦?kafka提供了一個unclean.leader.election配置參數,它的默認值為true。當unclean.leader.election值為true時,controller還是會在非ISR副本中選擇一個作為leader,但是這時候使用者需要承擔數據丟失和數據不一致的風險。當unclean.leader.election值為false時,則不會選擇新的leader,該partition處於不可用狀態,只能恢復失效的leader使partition重新變為可用。

當preference leader失效後,controller重新選擇一個新的leader,但是preference leader又恢復了,而且同步上了新的leader,是ISR的成員,這時候preference leader仍然會成為實際的leader,原先的新leader變為follower。因為在partition leader初始分配時,使按照集群副本均衡規則進行分配的,這樣做可以讓集群盡量保持平衡。

為了保證topic的高可用,topic的partition往往有多個副本,所有的follower副本像普通的consumer一樣不斷地從相應的leader副本pull消息。每個partition的leader副本會維護一個ISR列表存儲到集群信息庫里,follower副本成為ISR成員或者說與leader是同步的,需要滿足以下條件:

1)follower副本處於活躍狀態,與zookeeper(2.8之前版本)或kafka raft master之間的心跳正常

2)follower副本最近replica.lag.time.max.ms(默認是10秒)時間內從leader同步過最新消息。需要注意的是,一定要拉取到最新消息,如果最近replica.lag.time.max.ms時間內拉取過消息,但不是最新的,比如落後follower在追趕leader過程中,也不會成為ISR。

follower在同步leader過程中,follower和leader都會維護幾個參數,來表示他們之間的同步情況。leader和follower都會為自己的消息隊列維護LEO(Last End Offset)和HW(High Watermark)。leader還會為每一個follower維護一個LEO。LEO表示leader或follower隊列寫入的最後一條消息的offset。HW表示的offset對應的消息寫入了所有的ISR。當leader發現所有follower的LEO的最小值大於HW時,則會增加HW值到這個最小值LEO。follower拉取leader的消息時,同時能獲取到leader維護的HW值,如果follower發現自己維護的HW值小於leader發送過來的HW值,也會增加本地的HW值到leader的HW值。這樣我們可以得到一個不等式: follower HW <= leader HW <= follower LEO <= leader LEO 。HW對應的log又叫做committed log,consumer消費partititon的消息時,只能消費到offset值小於或等於HW值的消息的,由於這個原因,kafka系統又稱為分布式committed log消息系統。

kafka的消息內容存儲在log.dirs參數配置的目錄下。kafka每個partition的數據存放在本地磁碟log.dirs目錄下的一個單獨的目錄下,目錄命名規范為 ${topicName}-${partitionId} ,每個partition由多個LogSegment組成,每個LogSegment由一個數據文件(命名規范為: {baseOffset}.index)和一個時間戳索引文件(命名規范為:${baseOffset}.timeindex)組成,文件名的baseOffset就是相應LogSegment中第一條消息的offset。.index文件存儲的是消息的offset到該消息在相應.log文件中的偏移,便於快速在.log文件中快速找到指定offset的消息。.index是一個稀疏索引,每隔一定間隔大小的offset才會建立相應的索引(比如每間隔10條消息建立一個索引)。.timeindex也是一個稀疏索引文件,這樣可以根據消息的時間找到對應的消息。

可以考慮將消息日誌存放到多個磁碟中,這樣多個磁碟可以並發訪問,增加消息讀寫的吞吐量。這種情況下,log.dirs配置的是一個目錄列表,kafka會根據每個目錄下partition的數量,將新分配的partition放到partition數最少的目錄下。如果我們新增了一個磁碟,你會發現新分配的partition都出現在新增的磁碟上。

kafka提供了兩個參數log.segment.bytes和log.segment.ms來控制LogSegment文件的大小。log.segment.bytes默認值是1GB,當LogSegment大小達到log.segment.bytes規定的閾值時,kafka會關閉當前LogSegment,生成一個新的LogSegment供消息寫入,當前供消息寫入的LogSegment稱為活躍(Active)LogSegment。log.segment.ms表示最大多長時間會生成一個新的LogSegment,log.segment.ms沒有默認值。當這兩個參數都配置了值,kafka看哪個閾值先達到,觸發生成新的LogSegment。

kafka還提供了log.retention.ms和log.retention.bytes兩個參數來控制消息的保留時間。當消息的時間超過了log.retention.ms配置的閾值(默認是168小時,也就是一周),則會被認為是過期的,會被kafka自動刪除。或者是partition的總的消息大小超過了log.retention.bytes配置的閾值時,最老的消息也會被kafka自動刪除,使相應partition保留的總消息大小維持在log.retention.bytes閾值以下。這個地方需要注意的是,kafka並不是以消息為粒度進行刪除的,而是以LogSegment為粒度刪除的。也就是說,只有當一個LogSegment的最後一條消息的時間超過log.retention.ms閾值時,該LogSegment才會被刪除。這兩個參數都配置了值時,也是只要有一個先達到閾值,就會執行相應的刪除策略

當我們使用KafkaProcer向kafka發送消息時非常簡單,只要構造一個包含消息key、value、接收topic信息的ProcerRecord對象就可以通過KafkaProcer的send()向kafka發送消息了,而且是線程安全的。KafkaProcer支持通過三種消息發送方式

KafkaProcer客戶端雖然使用簡單,但是一條消息從客戶端到topic partition的日誌文件,中間需要經歷許多的處理過程。KafkaProcer的內部結構如下所示:

從圖中可以看出,消息的發送涉及兩類線程,一類是調用KafkaProcer.send()方法的應用程序線程,因為KafkaProcer.send()是多線程安全的,所以這樣的線程可以有多個;另一類是與kafka集群通信,實際將消息發送給kafka集群的Sender線程,當我們創建一個KafkaProcer實例時,會創建一個Sender線程,通過該KafkaProcer實例發送的所有消息最終通過該Sender線程發送出去。RecordAccumulator則是一個消息隊列,是應用程序線程與Sender線程之間消息傳遞的橋梁。當我們調用KafkaProcer.send()方法時,消息並沒有直接發送出去,只是寫入了RecordAccumulator中相應的隊列中,最終需要Sender線程在適當的時機將消息從RecordAccumulator隊列取出來發送給kafka集群。

消息的發送過程如下:

在使用KafkaConsumer實例消費kafka消息時,有一個特性我們要特別注意,就是KafkaConsumer不是多線程安全的,KafkaConsumer方法都在調用KafkaConsumer的應用程序線程中運行(除了consumer向kafka集群發送的心跳,心跳在一個專門的單獨線程中發送),所以我們調用KafkaConsumer的所有方法均需要保證在同一個線程中調用,除了KafkaConsumer.wakeup()方法,它設計用來通過其它線程向consumer線程發送信號,從而終止consumer執行。

跟procer一樣,consumer要與kafka集群通信,消費kafka消息,首先需要獲取消費的topic partition leader replica所在的broker地址等信息,這些信息可以通過向kafka集群任意broker發送Metadata請求消息獲取。

我們知道,一個consumer group有多個consumer,一個topic有多個partition,而且topic的partition在同一時刻只能被consumer group內的一個consumer消費,那麼consumer在消費partition消息前需要先確定消費topic的哪個partition。partition的分配通過group coordinator來實現。基本過程如下:

我們可以通過實現介面org.apache.kafka.clients.consumer.internals.PartitionAssignor自定義partition分配策略,但是kafka已經提供了三種分配策略可以直接使用。

partition分配完後,每個consumer知道了自己消費的topic partition,通過metadata請求可以獲取相應partition的leader副本所在的broker信息,然後就可以向broker poll消息了。但是consumer從哪個offset開始poll消息?所以consumer在第一次向broker發送FetchRequest poll消息之前需要向Group Coordinator發送OffsetFetchRequest獲取消費消息的起始位置。Group Coordinator會通過key {topic}-${partition}查詢 __consumer_offsets topic中是否有offset的有效記錄,如果存在,則將consumer所屬consumer group最近已提交的offset返回給consumer。如果沒有(可能是該partition是第一次分配給該consumer group消費,也可能是該partition長時間沒有被該consumer group消費),則根據consumer配置參數auto.offset.reset值確定consumer消費的其實offset。如果auto.offset.reset值為latest,表示從partition的末尾開始消費,如果值為earliest,則從partition的起始位置開始消費。當然,consumer也可以隨時通過KafkaConsumer.seek()方法人工設置消費的起始offset。

kafka broker在收到FetchRequest請求後,會使用請求中topic partition的offset查一個skiplist表(該表的節點key值是該partition每個LogSegment中第一條消息的offset值)確定消息所屬的LogSegment,然後繼續查LogSegment的稀疏索引表(存儲在.index文件中),確定offset對應的消息在LogSegment文件中的位置。為了提升消息消費的效率,consumer通過參數fetch.min.bytes和max.partition.fetch.bytes告訴broker每次拉取的消息總的最小值和每個partition的最大值(consumer一次會拉取多個partition的消息)。當kafka中消息較少時,為了讓broker及時將消息返回給consumer,consumer通過參數fetch.max.wait.ms告訴broker即使消息大小沒有達到fetch.min.bytes值,在收到請求後最多等待fetch.max.wait.ms時間後,也將當前消息返回給consumer。fetch.min.bytes默認值為1MB,待fetch.max.wait.ms默認值為500ms。

為了提升消息的傳輸效率,kafka採用零拷貝技術讓內核通過DMA把磁碟中的消息讀出來直接發送到網路上。因為kafka寫入消息時將消息寫入內存中就返回了,如果consumer跟上了procer的寫入速度,拉取消息時不需要讀磁碟,直接從內存獲取消息發送出去就可以了。

為了避免發生再平衡後,consumer重復拉取消息,consumer需要將已經消費完的消息的offset提交給group coordinator。這樣發生再平衡後,consumer可以從上次已提交offset出繼續拉取消息。

kafka提供了多種offset提交方式

partition offset提交和管理對kafka消息系統效率來說非常關鍵,它直接影響了再平衡後consumer是否會重復拉取消息以及重復拉取消息的數量。如果offset提交的比較頻繁,會增加consumer和kafka broker的消息處理負載,降低消息處理效率;如果offset提交的間隔比較大,再平衡後重復拉取的消息就會比較多。還有比較重要的一點是,kafka只是簡單的記錄每次提交的offset值,把最後一次提交的offset值作為最新的已提交offset值,作為再平衡後消息的起始offset,而什麼時候提交offset,每次提交的offset值具體是多少,kafka幾乎不關心(這個offset對應的消息應該存儲在kafka中,否則是無效的offset),所以應用程序可以先提交3000,然後提交2000,再平衡後從2000處開始消費,決定權完全在consumer這邊。

kafka中的topic partition與consumer group中的consumer的消費關系其實是一種配對關系,當配對雙方發生了變化時,kafka會進行再平衡,也就是重新確定這種配對關系,以提升系統效率、高可用性和伸縮性。當然,再平衡也會帶來一些負面效果,比如在再平衡期間,consumer不能消費kafka消息,相當於這段時間內系統是不可用的。再平衡後,往往會出現消息的重復拉取和消費的現象。

觸發再平衡的條件包括:

需要注意的是,kafka集群broker的增減或者topic partition leader重新選主這類集群狀態的變化並不會觸發在平衡

有兩種情況與日常應用開發比較關系比較密切:

consumer在調用subscribe()方法時,支持傳入一個ConsumerRebalanceListener監聽器,ConsumerRebalanceListener提供了兩個方法,onPartitionRevoked()方法在consumer停止消費之後,再平衡開始之前被執行。可以發現,這個地方是提交offset的好時機。onPartitonAssigned()方法則會在重新進行partition分配好了之後,但是新的consumer還未消費之前被執行。

我們在提到kafka時,首先想到的是它的吞吐量非常大,這也是很多人選擇kafka作為消息傳輸組件的重要原因。

以下是保證kafka吞吐量大的一些設計考慮:

但是kafka是不是總是這么快?我們同時需要看到kafka為了追求快舍棄了一些特性:

所以,kafka在消息獨立、允許少量消息丟失或重復、不關心消息順序的場景下可以保證非常高的吞吐量,但是在需要考慮消息事務、嚴格保證消息順序等場景下procer和consumer端需要進行復雜的考慮和處理,可能會比較大的降低kafka的吞吐量,例如對可靠性和保序要求比較高的控制類消息需要非常謹慎的權衡是否適合使用kafka。

我們通過procer向kafka集群發送消息,總是期望消息能被consumer成功消費到。最不能忍的是procer收到了kafka集群消息寫入的正常響應,但是consumer仍然沒有消費到消息。

kafka提供了一些機制來保證消息的可靠傳遞,但是有一些因素需要仔細權衡考慮,這些因素往往會影響kafka的吞吐量,需要在可靠性與吞吐量之間求得平衡:

kafka只保證partition消息順序,不保證topic級別的順序,而且保證的是partition寫入順序與讀取順序一致,不是業務端到端的保序。

如果對保序要求比較高,topic需要只設置一個partition。這時可以把參數max.in.flight.requests.per.connection設置為1,而retries設置為大於1的數。這樣即使發生了可恢復型錯誤,仍然能保證消息順序,但是如果發生不可恢復錯誤,應用層進行重試的話,就無法保序了。也可以採用同步發送的方式,但是這樣也極大的降低了吞吐量。如果消息攜帶了表示順序的欄位,可以在接收端對消息進行重新排序以保證最終的有序。

⑩ Kafka(四)集群之kafka

在章節二( https://www.jianshu.com/p/d9fefdf2db85 )中,我們部署了單機的kafka,現在我們部署一套集群模式的kafka。

這里我准備了三台虛擬機:
192.168.184.134
192.168.184.135
192.168.184.136
每台機器部署一個zk和kafka。

上一章節中zk集群已經神中部署完畢。

在章節二中,134這台機器已經有kafka存在了,我們在另外兩台機器上安裝kafka:

在上面的文件中有幾個關鍵點,我們一一進行配置,我會對配置中的說明翻譯:

以下這兩個listeners,advertised_listeners 是對外暴露的服務埠,真正建立連接用的是 listeners。
在內網中我們使用listenners就可以了,在docker等容器或雲中使用advertised。游判山

下面這個是日誌路徑的配置

下面這個是個重點的東西,topic在磁碟上會分為多個partitions存儲,相比單一文件存儲,增加了並行性,在後續文章中會詳細去講解:

日誌的保存時間:

以下是zookeeper的配置:

這里我們直接設置後台啟動,三個節點都是如此:

這裡面有個小坑,還記得之前我們搭建的單機環境嗎?那時候默認的日誌文件夾在/tmp/kafka-logs下面,生成了很多內容,導致我們134這個節點無法啟動成功,報錯如下:

解決這個問題只需要把/tmp/kafka-logs文件刪除就好了。

看到日誌出現這一句表明啟動成功了:

下面我們驗證下是否搭建成功了,首先使用kafkatool工機具連接看下:

我們在134節點創建一個topic:

查看topic列表:

在kafkatool中查看:

創建生產者:

創建消費者:

生成者發送沖游消息:

消費者接收消息:

到此為止,kafka的集群搭建已經完成了。在後面的文章我們會去學習如何在springboot中集成kafka。

熱點內容
淘寶為什麼用wifi就很卡 發布:2025-02-07 13:46:30 瀏覽:202
吃涼東西馬上就流鼻涕為什麼 發布:2025-02-07 13:42:23 瀏覽:809
為什麼4g球機晚上紅外燈不亮 發布:2025-02-07 13:37:13 瀏覽:328
為什麼有的貓眼睛是紅的 發布:2025-02-07 13:32:56 瀏覽:195
新手機為什麼容易耗電 發布:2025-02-07 13:28:02 瀏覽:40
為什麼電烤箱的時間不可以減少 發布:2025-02-07 13:24:14 瀏覽:667
為什麼閉上眼睛就不能走直線了 發布:2025-02-07 13:23:32 瀏覽:434
微信圖片為什麼不能放ps 發布:2025-02-07 13:22:51 瀏覽:391
淘寶認證過後為什麼還顯示認證 發布:2025-02-07 13:22:51 瀏覽:856
為什麼眼睛夠不見 發布:2025-02-07 13:18:26 瀏覽:363