新聞中心
Kafka是由Apache軟件基金會開發(fā)的一款開源的分布式消息系統(tǒng),它的高吞吐率、高并發(fā)性能以及良好的水平擴展性,使得它在數(shù)據(jù)處理領域中備受青睞。在Linux下使用Kafka,您可能需要查看一些與Kafka相關的信息,包括已安裝的Kafka版本、Topic、Partition和Broker等信息。那么,在本文中,我們將介紹如何在linux下查看kafka的相關信息。

查看已安裝的Kafka版本
在Linux中查看已安裝的Kafka版本可以通過以下命令實現(xiàn):
“`shell
$ kafka-topics.sh –version
“`
該命令將會返回當前Kafka的版本信息,如下:
“`
kafka-topics.sh v2.4.0 (Commit: c57222ae8cd7866d)
“`
查看Topic信息
在Linux中查看Kafka的Topic信息可以通過以下命令實現(xiàn):
“`shell
$ kafka-topics.sh –describe –topic –zookeeper :
“`
該命令將會返回與指定Topic相關的所有信息,包括Partition數(shù)量、Replication Factor、Partition分布情況等,如下:
“`
Topic: PartitionCount:3 ReplicationFactor:2 Configs:
Topic: Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: Partition: 2 Leader: 3 Replicas: 3,1 Isr: 3,1
“`
其中,“Leader”表示管理該Partition的Broker,而“Replicas”表示Partition的副本集。
查看Broker信息
在Linux中查看Kafka的Broker信息可以通過以下命令實現(xiàn):
“`shell
$ kafka-broker-api-versions.sh –bootstrap-server : –command-config
“`
該命令將會返回當前Kafka的Broker信息,包括API版本、Broker ID、版本等,如下:
“`
{ version: 4, leader_epoch: -1, attributes: 0 }
… omitted the rest …
( note that the output may include other revisions, you have to look it up).
“`
查看Partition信息
在Linux中查看Kafka的Partition信息可以通過以下命令實現(xiàn):
“`shell
$ kafka-run-class.sh kafka.tools.GetOffsetShell –broker-list : –topic –time -2
“`
該命令將會返回指定Topic的Partition相關信息,包括Offset信息,如下:
“`
topic_name:0:100
“`
其中,“100”是指該Partition的最新Offset。
查看Consumer Group信息
在Linux中查看Kafka的Consumer Group信息可以通過以下命令實現(xiàn):
“`shell
$ kafka-consumer-groups.sh –bootstrap-server : –group –describe
“`
該命令將會返回指定Group Name的相關信息,包括當前消費者數(shù)量、Offset信息以及Lag信息等,如下:
“`
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test_topic 0 12023 12100 33 YjSIHz94Lua6a-EU0Ti9Xw-1593316979417 /192.168.0.13 YjSIHz94Lua6a-EU0Ti9Xw-1593316979417
“`
其中,“Consumer-ID”是指該消費者當前的ID,而“HOST”是指該消費者所在的IP地址。
相關問題拓展閱讀:
- 深入理解kafka(五)日志存儲
- Linux中Cache內存占用過高清理
- Kafka數(shù)據(jù)丟失分析
深入理解kafka(五)日志存儲
5.1文件目錄布局
根目錄下有以下5個checkpoint文件: cleaner-offset-checkpoint, log-start-offset-checkpoint, meta.properties, recovery-point-offset-checkpoint, replication-offset-checkpoint
分區(qū)目錄下有以下目錄: 0000xxx.index(偏移量為64位長整形,長度固定為20位), 0000xxx.log, 0000xxx.timeindex.
還有可能包含.deleted .cleaned .swap等臨時文件, 以及可能的.snapshot .txnindex leader-epoch-checkpoint
5.2日志格式演變
5.2.1 v0版本
kafka0.10.0之前
RECORD_OVERHEAD包括offset(8B)和message size(4B)
RECORD包括:
crc32(4B):crc32校驗值
magic(1B):消息版本號0
attributes(1B):消息屬性。低3位表示壓縮類型:0-NONE 1-GZIP 2-SNAPPY 3-LZ4(0.9.x引入)
key length(4B):表示消息的key的長度。-1代表null
key: 可選
value length(4B):實際消息體的長度。-1代表null
value: 消息體。可以為空,如墓族顫碑消息
5.2.2 v1版本
kafka0.10.0-0.11.0
比v0多了timestamp(8B)字段,表示消息的時間戳
attributes的第4位也被利用起來,0表示timestamp的類型為CreateTime,1表示timestamp的類型為LogAppendTime
timestamp類型由broker端參數(shù)log.message.timestamp.type來配置,默認為CreateTime,即采用生產(chǎn)者創(chuàng)建的時間戳
5.2.3 消息壓縮
保證端到端的壓縮,服務端配置compression.type,默認為”producer”,表示保留生產(chǎn)者使用的壓縮方式,還可以配置為”gzip”,”snappy”,”lz4″
多條消息壓縮至value字段,以提高壓縮率
5.2.4 變長字段
變長整缺隱形(Varints):每一個字節(jié)都有一個位于更高位的m位(most significant bit),除了最后一個字節(jié)為1,其余都為0,字節(jié)倒序排列
為了使編碼更加高效,Varints使用ZigZag編碼:sint32對應 (n>31) sint64對應 (n>63)
5.2.5 v2版本
Record Batch
first offset:
length:
partition leader epoch:
magic:固定為2
attributes:兩個字節(jié)。低3位表示壓縮格式,第4位表示時間戳類型,第5位表示事務(0-非事務1-事務),第6位控制消息(0-非控制1控制)
first timestamp:
max timestamp:
producer id:
producer epoch:
first sequence:
records count:
v2版本的消息去掉了crc字段,另外增加了length(消息總長度)、timestamp delta(時間戳增量)、offset delta(位移增伏穗廳量)和headers信息,并且棄用了attributes
Record
length:
attributes:棄用,但仍占據(jù)1B
timestamp delta:
offset delta:
headers:
5.3日志索引
稀疏索引(sparse index):每當寫入一定量(broker端參數(shù)log.index.interval.bytes指定,默認為4096B),偏移量索引文件和時間索引文件分別對應一個索引項
日志段切分策略:
1.大小超過broker端參數(shù)log.segment.bytes配置的值,默認為(1GB)
2.當前日志段消息的更大時間戳與當前系統(tǒng)的時間戳差值大于log.roll.ms或者log.roll.hours,ms優(yōu)先級高,默認log.roll.hours=168(7天)
3.索引文件或者時間戳索引文件的大小大于log.index.size.max.bytes配置的值,默認為(10MB)
4.偏移量差值(offset-baseOffset)>Integer.MAX_VALUE
5.3.1 偏移量索引
每個索引項占用8個字節(jié),分為兩個部分:1.relativeOffset相對偏移量(4B) 2.position物理地址(4B)
使用kafka-dump-log.sh腳本來解析.index文件(包括.timeindex、.snapshot、.txnindex等文件),如下:
bin/kafka-dump-log.sh –files /tmp/kafka-logs/topicId-0/00……00.index
如果broker端參數(shù)log.index.size.max.bytes不是8的倍數(shù),內部會自動轉換為8的倍數(shù)
5.3.2 時間戳索引
每個索引項占用12個字節(jié),分為兩個部分:1.timestamp當前日志分段的更大時間戳(12B) 2.relativeOffset時間戳對應的相對偏移量(4B)
如果broker端參數(shù)log.index.size.max.bytes不是12的倍數(shù),內部會自動轉換為12的倍數(shù)
5.4日志清理
日志清理策略可以控制到主題級別
5.4.1 日志刪除
broker端參數(shù)log.cleanup.policy設置為delete(默認為delete)
檢測周期broker端參數(shù)log.retention.check.interval.ms=300000(默認5分鐘)
1.基于時間
broker端參數(shù)log.retention.hours,log.retention.minutes,log.retention.ms,優(yōu)先級ms>minutes>hours
刪除時先增加.delete后綴,延遲刪除根據(jù)file.delete.delay.ms(默認60000)配置
2.基于日志大小
日志總大小為broker端參數(shù)log.retention.bytes(默認為-1,表示無窮大)
日志段大小為broker端參數(shù)log.segment.bytes(默認為,1GB)
3.基于日志起始偏移量
DeleteRecordRequest請求
1.KafkaAdminClient的deleteRecord()
2.kafka-delete-record.sh腳本
5.4.2 日志壓縮
broker端參數(shù)log.cleanup.policy設置為compact,且log.cleaner.enable設置為true(默認為true)
5.5磁盤存儲
相關測試:一個由6塊7200r/min的RAID-5陣列組成的磁盤簇的線性寫入600MB/s,隨機寫入100KB/s,隨機內存寫入400MB/s,線性內存3.6GB/s
5.5.1 頁緩存
Linux操作系統(tǒng)的vm.dirty_background_ratio參數(shù)用來指定臟頁數(shù)量達到系統(tǒng)的百分比之后就觸發(fā)pdflush/flush/kdmflush,一般小于10,不建議為0
vm.dirty_ratio表示臟頁百分比之后刷盤,但是阻塞新IO請求
kafka同樣提供同步刷盤及間斷性強制刷盤(fsync)功能,可以通過log.flush.interval.messages、log.flush.interval.ms等參數(shù)來控制
kafka不建議使用swap分區(qū),vm.swappiness參數(shù)上限為100,下限為0,建議設置為1
5.5.2 磁盤I/O流程
一般磁盤IO的場景有以下4種:
1.用戶調用標準C庫進行IO操作,數(shù)據(jù)流為:應用程序Buffer->C庫標準IOBuffer->文件系統(tǒng)也緩存->通過具體文件系統(tǒng)到磁盤
2.用戶調用文件IO,數(shù)據(jù)流為:應用程序Buffer->文件系統(tǒng)也緩存->通過具體文件系統(tǒng)到磁盤
3.用戶打開文件時使用O_DIRECT,繞過頁緩存直接讀寫磁盤
4.用戶使用類似dd工具,并使用direct參數(shù),繞過系統(tǒng)cache與文件系統(tǒng)直接讀寫磁盤
Linux系統(tǒng)中IO調度策略有4種:
1.NOOP:no operation
2.CFQ
3.DEADLINE
4.ANTICIPATORY
5.5.3 零拷貝
指數(shù)據(jù)直接從磁盤文件復制到網(wǎng)卡設備中,不需要經(jīng)應用程序
對linux而言依賴于底層的sendfile()
對java而言,F(xiàn)ileChannal.transferTo()的底層實現(xiàn)就是sendfile()
Linux中Cache內存占用過高清理
在Linux中每次用free查看的時候,發(fā)現(xiàn)free的空間都只有500M左右。同樣的環(huán)境32G只剩下這點,64G的也只剩下這么一點。后來發(fā)現(xiàn)都被Cache占用了,因為服務器上運行了Kafka環(huán)境,每周的日志文件都有一二百G的,估計就是他占用了page cache吧。
Free中的buffer和cache:(它們都是占用內存):
buffer : 作為buffer cache的內存,是塊設備的讀寫緩沖區(qū)
cache: 作為page cache的內存, 文件系統(tǒng)的cache
如果亂鏈握 cache 的值很大,說明cache住的文件數(shù)很多。如果頻繁訪問到的文件都能被cache住,那么磁盤的讀IO bi會非常小。
Linux內核會在內存將要喚大耗盡的時候,觸發(fā)內存回收的工作,以便釋放出內存給急需內存的進程使用。也可以用動釋放,釋放的時候需要對cache中的數(shù)據(jù)跟對應文件中的數(shù)據(jù)一致。
釋放的方式有下以幾種
一般情況下釋放pagecache就可以了。這樣可以寫一個sh腳本來在服務器空閑的時候定時執(zhí)行
使用嘩慶crontab來設置定時任務,如每天4點開始清理
本文參考:
Kafka數(shù)據(jù)丟失分析
Kafka存在丟消息的問題,消息丟失會發(fā)生在Broker,Producer和Consumer三種。
Broker丟失消息是由于Kafka本身的原因造成的,kafka為了得到更高的性能和吞吐量,將數(shù)據(jù)異步批量的存儲在磁盤中。消息的刷盤過程,為了提高性能,減少刷盤次數(shù),kafka采用了批量刷盤的做法。即,按照一定的消息量,和時間間隔進行刷盤。這種機制也是由于linux操作系統(tǒng)決定的。將數(shù)據(jù)存儲到linux操作系統(tǒng)種,會先存儲到頁緩存(Page cache)中,按照時間或者其他條件進行刷盤(從page cache到file),或者通過fsync命令強制刷盤。數(shù)據(jù)在page cache中時,如果系統(tǒng)掛掉,數(shù)據(jù)會丟失。
Broker在linux服務器上高速讀寫以及同步到Replica
上圖簡述了broker寫數(shù)據(jù)以及同步的一個過程。broker寫數(shù)據(jù)只寫到PageCache中,而pageCache位于內存。這部分數(shù)據(jù)在斷電后是會丟失的。pageCache的數(shù)據(jù)通過linux的flusher程序進行刷盤。刷盤觸發(fā)條件有三:
Broker配置刷盤機制,是通過調用fsync函數(shù)接管了刷盤動作。從單個Broker來看,pageCache的數(shù)據(jù)會丟失。
Kafka沒有提供同步刷盤的方式。同步刷盤在RocketMQ中有實現(xiàn),實現(xiàn)原理是將異步刷盤的流程進行阻塞,等待響應,類似ajax的callback或者是java的future。下面是一段rocketmq的源碼。
也就是說,理論上,要完全讓kafka保證單個broker不丟失消息是做不到的,只能通過調整刷盤機制的參數(shù)緩解該情況。比如,減少刷盤間隔,減少刷盤數(shù)據(jù)量大小。時間越短,性能越差,可靠性越好(盡可能可靠)。這是一個選擇題。
為了解決該問題,kafka通過producer和broker協(xié)同處理單個broker丟失參數(shù)的情況。一旦producer發(fā)現(xiàn)broker消息丟失,即可自動進行retry。除非retry次數(shù)超過閥值乎橘(可配置),消息才會丟失。此時需要生產(chǎn)者客戶端手動處理該情況。那么producer是如何檢測到數(shù)據(jù)丟失的呢?是通過ack機制,類似于http的三次握手的方式。
以上的引用是kafka官方對于參數(shù) acks 的解釋(在老版本中,該參數(shù)是 request.required.acks )。
上面第三點提到了ISR的列表的follower,需要配合另一個參數(shù)才能更好的保證ack的有效性。ISR是Broker維護的一個“可靠的follower列表”,in-sync Replica列表,broker的配置包含一個參數(shù): min.insync.replicas 。該參數(shù)表示ISR中最少的副本數(shù)。如果不設置該值,ISR中的follower列表可能為空。此時相當于acks=1。
如上圖中:
性能依次遞減,可靠性依次升高。
Producer丟失消息,發(fā)生在生產(chǎn)者客戶端。
為了提升效率,減少IO,producer在發(fā)送數(shù)據(jù)時可以將多個請求進行合并后發(fā)送。被合并的請求咋發(fā)送一線緩存在本地buffer中。緩存的方式和前文提到的刷盤類似,producer可以將請求打包成“塊”或者按照時間間隔,將buffer中的數(shù)據(jù)發(fā)出。通過buffer我們可以將生產(chǎn)者改造為異步的方式,而這可以提升我們的發(fā)送效率。
但是,buffer中的數(shù)據(jù)就是危險的。在正常情況下,客戶端的異步調用可以通過callback來處理消息發(fā)送失敗或者超時的情況,但是,一旦producer被非法的停止了歲談團,那么buffer中的數(shù)據(jù)將丟失,broker將無法收到該部分數(shù)據(jù)。又或者,當Producer客戶端內存不夠時,如果采取的策略是丟棄消息(另一種策略是block阻塞),消息也會被丟失。抑或,消息產(chǎn)生(異步產(chǎn)生)過快,導致掛起線程過多,內存不足,侍戚導致程序崩潰,消息丟失。
producer
根據(jù)上圖,可以想到幾個解決的思路:
Consumer消費消息有下面幾個步驟:
Consumer的消費方式主要分為兩種:
上面的示例是自動提交的例子。如果此時,insertIntoDB(record)發(fā)生異常,消息將會出現(xiàn)丟失。接下來是手動提交的例子:
將提交類型改為手動以后,可以保證消息“至少被消費一次”(at least once)。但此時可能出現(xiàn)重復消費的情況,重復消費不屬于本篇討論范圍。
上面兩個例子,是直接使用Consumer的High level API,客戶端對于offset等控制是透明的。也可以采用Low level API的方式,手動控制offset,也可以保證消息不丟,不過會更加復雜。
關于linux下查看kafka的介紹到此就結束了,不知道你從中找到你需要的信息了嗎 ?如果你還想了解更多這方面的信息,記得收藏關注本站。
香港服務器選創(chuàng)新互聯(lián),2H2G首月10元開通。
創(chuàng)新互聯(lián)(www.cdcxhl.com)互聯(lián)網(wǎng)服務提供商,擁有超過10年的服務器租用、服務器托管、云服務器、虛擬主機、網(wǎng)站系統(tǒng)開發(fā)經(jīng)驗。專業(yè)提供云主機、虛擬主機、域名注冊、VPS主機、云服務器、香港云服務器、免備案服務器等。
網(wǎng)頁題目:如何在Linux下查看Kafka的相關信息?(linux下查看kafka)
本文網(wǎng)址:http://m.5511xx.com/article/cdcgpjd.html


咨詢
建站咨詢
