日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關咨詢
選擇下列產(chǎn)品馬上在線溝通
服務時間:8:30-17:00
你可能遇到了下面的問題
關閉右側工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
如何在Linux下查看Kafka的相關信息?(linux下查看kafka)

Kafka是由Apache軟件基金會開發(fā)的一款開源的分布式消息系統(tǒng),它的高吞吐率、高并發(fā)性能以及良好的水平擴展性,使得它在數(shù)據(jù)處理領域中備受青睞。在Linux下使用Kafka,您可能需要查看一些與Kafka相關的信息,包括已安裝的Kafka版本、Topic、Partition和Broker等信息。那么,在本文中,我們將介紹如何在linux下查看kafka的相關信息。

查看已安裝的Kafka版本

在Linux中查看已安裝的Kafka版本可以通過以下命令實現(xiàn):

“`shell

$ kafka-topics.sh –version

“`

該命令將會返回當前Kafka的版本信息,如下:

“`

kafka-topics.sh v2.4.0 (Commit: c57222ae8cd7866d)

“`

查看Topic信息

在Linux中查看Kafka的Topic信息可以通過以下命令實現(xiàn):

“`shell

$ kafka-topics.sh –describe –topic –zookeeper :

“`

該命令將會返回與指定Topic相關的所有信息,包括Partition數(shù)量、Replication Factor、Partition分布情況等,如下:

“`

Topic: PartitionCount:3 ReplicationFactor:2 Configs:

Topic: Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2

Topic: Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3

Topic: Partition: 2 Leader: 3 Replicas: 3,1 Isr: 3,1

“`

其中,“Leader”表示管理該Partition的Broker,而“Replicas”表示Partition的副本集。

查看Broker信息

在Linux中查看Kafka的Broker信息可以通過以下命令實現(xiàn):

“`shell

$ kafka-broker-api-versions.sh –bootstrap-server : –command-config

“`

該命令將會返回當前Kafka的Broker信息,包括API版本、Broker ID、版本等,如下:

“`

{ version: 4, leader_epoch: -1, attributes: 0 }

… omitted the rest …

( note that the output may include other revisions, you have to look it up).

“`

查看Partition信息

在Linux中查看Kafka的Partition信息可以通過以下命令實現(xiàn):

“`shell

$ kafka-run-class.sh kafka.tools.GetOffsetShell –broker-list : –topic –time -2

“`

該命令將會返回指定Topic的Partition相關信息,包括Offset信息,如下:

“`

topic_name:0:100

“`

其中,“100”是指該Partition的最新Offset。

查看Consumer Group信息

在Linux中查看Kafka的Consumer Group信息可以通過以下命令實現(xiàn):

“`shell

$ kafka-consumer-groups.sh –bootstrap-server : –group –describe

“`

該命令將會返回指定Group Name的相關信息,包括當前消費者數(shù)量、Offset信息以及Lag信息等,如下:

“`

TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID

test_topic 0 12023 12100 33 YjSIHz94Lua6a-EU0Ti9Xw-1593316979417 /192.168.0.13 YjSIHz94Lua6a-EU0Ti9Xw-1593316979417

“`

其中,“Consumer-ID”是指該消費者當前的ID,而“HOST”是指該消費者所在的IP地址。

相關問題拓展閱讀:

  • 深入理解kafka(五)日志存儲
  • Linux中Cache內存占用過高清理
  • Kafka數(shù)據(jù)丟失分析

深入理解kafka(五)日志存儲

5.1文件目錄布局

根目錄下有以下5個checkpoint文件: cleaner-offset-checkpoint, log-start-offset-checkpoint, meta.properties, recovery-point-offset-checkpoint, replication-offset-checkpoint

分區(qū)目錄下有以下目錄: 0000xxx.index(偏移量為64位長整形,長度固定為20位), 0000xxx.log, 0000xxx.timeindex.

還有可能包含.deleted .cleaned .swap等臨時文件, 以及可能的.snapshot .txnindex leader-epoch-checkpoint

5.2日志格式演變

5.2.1 v0版本

kafka0.10.0之前

RECORD_OVERHEAD包括offset(8B)和message size(4B)

RECORD包括:

crc32(4B):crc32校驗值

magic(1B):消息版本號0

attributes(1B):消息屬性。低3位表示壓縮類型:0-NONE 1-GZIP 2-SNAPPY 3-LZ4(0.9.x引入)

key length(4B):表示消息的key的長度。-1代表null

key: 可選

value length(4B):實際消息體的長度。-1代表null

value: 消息體。可以為空,如墓族顫碑消息

5.2.2 v1版本

kafka0.10.0-0.11.0

比v0多了timestamp(8B)字段,表示消息的時間戳

attributes的第4位也被利用起來,0表示timestamp的類型為CreateTime,1表示timestamp的類型為LogAppendTime

timestamp類型由broker端參數(shù)log.message.timestamp.type來配置,默認為CreateTime,即采用生產(chǎn)者創(chuàng)建的時間戳

5.2.3 消息壓縮

保證端到端的壓縮,服務端配置compression.type,默認為”producer”,表示保留生產(chǎn)者使用的壓縮方式,還可以配置為”gzip”,”snappy”,”lz4″

多條消息壓縮至value字段,以提高壓縮率

5.2.4 變長字段

變長整缺隱形(Varints):每一個字節(jié)都有一個位于更高位的m位(most significant bit),除了最后一個字節(jié)為1,其余都為0,字節(jié)倒序排列

為了使編碼更加高效,Varints使用ZigZag編碼:sint32對應 (n>31) sint64對應 (n>63)

5.2.5 v2版本

Record Batch

first offset:

length:

partition leader epoch:

magic:固定為2

attributes:兩個字節(jié)。低3位表示壓縮格式,第4位表示時間戳類型,第5位表示事務(0-非事務1-事務),第6位控制消息(0-非控制1控制)

first timestamp:

max timestamp:

producer id:

producer epoch:

first sequence:

records count:

v2版本的消息去掉了crc字段,另外增加了length(消息總長度)、timestamp delta(時間戳增量)、offset delta(位移增伏穗廳量)和headers信息,并且棄用了attributes

Record

length:

attributes:棄用,但仍占據(jù)1B

timestamp delta:

offset delta:

headers:

5.3日志索引

稀疏索引(sparse index):每當寫入一定量(broker端參數(shù)log.index.interval.bytes指定,默認為4096B),偏移量索引文件和時間索引文件分別對應一個索引項

日志段切分策略:

1.大小超過broker端參數(shù)log.segment.bytes配置的值,默認為(1GB)

2.當前日志段消息的更大時間戳與當前系統(tǒng)的時間戳差值大于log.roll.ms或者log.roll.hours,ms優(yōu)先級高,默認log.roll.hours=168(7天)

3.索引文件或者時間戳索引文件的大小大于log.index.size.max.bytes配置的值,默認為(10MB)

4.偏移量差值(offset-baseOffset)>Integer.MAX_VALUE

5.3.1 偏移量索引

每個索引項占用8個字節(jié),分為兩個部分:1.relativeOffset相對偏移量(4B) 2.position物理地址(4B)

使用kafka-dump-log.sh腳本來解析.index文件(包括.timeindex、.snapshot、.txnindex等文件),如下:

bin/kafka-dump-log.sh –files /tmp/kafka-logs/topicId-0/00……00.index

如果broker端參數(shù)log.index.size.max.bytes不是8的倍數(shù),內部會自動轉換為8的倍數(shù)

5.3.2 時間戳索引

每個索引項占用12個字節(jié),分為兩個部分:1.timestamp當前日志分段的更大時間戳(12B) 2.relativeOffset時間戳對應的相對偏移量(4B)

如果broker端參數(shù)log.index.size.max.bytes不是12的倍數(shù),內部會自動轉換為12的倍數(shù)

5.4日志清理

日志清理策略可以控制到主題級別

5.4.1 日志刪除

broker端參數(shù)log.cleanup.policy設置為delete(默認為delete)

檢測周期broker端參數(shù)log.retention.check.interval.ms=300000(默認5分鐘)

1.基于時間

broker端參數(shù)log.retention.hours,log.retention.minutes,log.retention.ms,優(yōu)先級ms>minutes>hours

刪除時先增加.delete后綴,延遲刪除根據(jù)file.delete.delay.ms(默認60000)配置

2.基于日志大小

日志總大小為broker端參數(shù)log.retention.bytes(默認為-1,表示無窮大)

日志段大小為broker端參數(shù)log.segment.bytes(默認為,1GB)

3.基于日志起始偏移量

DeleteRecordRequest請求

1.KafkaAdminClient的deleteRecord()

2.kafka-delete-record.sh腳本

5.4.2 日志壓縮

broker端參數(shù)log.cleanup.policy設置為compact,且log.cleaner.enable設置為true(默認為true)

5.5磁盤存儲

相關測試:一個由6塊7200r/min的RAID-5陣列組成的磁盤簇的線性寫入600MB/s,隨機寫入100KB/s,隨機內存寫入400MB/s,線性內存3.6GB/s

5.5.1 頁緩存

Linux操作系統(tǒng)的vm.dirty_background_ratio參數(shù)用來指定臟頁數(shù)量達到系統(tǒng)的百分比之后就觸發(fā)pdflush/flush/kdmflush,一般小于10,不建議為0

vm.dirty_ratio表示臟頁百分比之后刷盤,但是阻塞新IO請求

kafka同樣提供同步刷盤及間斷性強制刷盤(fsync)功能,可以通過log.flush.interval.messages、log.flush.interval.ms等參數(shù)來控制

kafka不建議使用swap分區(qū),vm.swappiness參數(shù)上限為100,下限為0,建議設置為1

5.5.2 磁盤I/O流程

一般磁盤IO的場景有以下4種:

1.用戶調用標準C庫進行IO操作,數(shù)據(jù)流為:應用程序Buffer->C庫標準IOBuffer->文件系統(tǒng)也緩存->通過具體文件系統(tǒng)到磁盤

2.用戶調用文件IO,數(shù)據(jù)流為:應用程序Buffer->文件系統(tǒng)也緩存->通過具體文件系統(tǒng)到磁盤

3.用戶打開文件時使用O_DIRECT,繞過頁緩存直接讀寫磁盤

4.用戶使用類似dd工具,并使用direct參數(shù),繞過系統(tǒng)cache與文件系統(tǒng)直接讀寫磁盤

Linux系統(tǒng)中IO調度策略有4種:

1.NOOP:no operation

2.CFQ

3.DEADLINE

4.ANTICIPATORY

5.5.3 零拷貝

指數(shù)據(jù)直接從磁盤文件復制到網(wǎng)卡設備中,不需要經(jīng)應用程序

對linux而言依賴于底層的sendfile()

對java而言,F(xiàn)ileChannal.transferTo()的底層實現(xiàn)就是sendfile()

Linux中Cache內存占用過高清理

在Linux中每次用free查看的時候,發(fā)現(xiàn)free的空間都只有500M左右。同樣的環(huán)境32G只剩下這點,64G的也只剩下這么一點。后來發(fā)現(xiàn)都被Cache占用了,因為服務器上運行了Kafka環(huán)境,每周的日志文件都有一二百G的,估計就是他占用了page cache吧。

Free中的buffer和cache:(它們都是占用內存):

  buffer : 作為buffer cache的內存,是塊設備的讀寫緩沖區(qū)

  cache: 作為page cache的內存, 文件系統(tǒng)的cache

  如果亂鏈握 cache 的值很大,說明cache住的文件數(shù)很多。如果頻繁訪問到的文件都能被cache住,那么磁盤的讀IO bi會非常小。

Linux內核會在內存將要喚大耗盡的時候,觸發(fā)內存回收的工作,以便釋放出內存給急需內存的進程使用。也可以用動釋放,釋放的時候需要對cache中的數(shù)據(jù)跟對應文件中的數(shù)據(jù)一致。

釋放的方式有下以幾種

一般情況下釋放pagecache就可以了。這樣可以寫一個sh腳本來在服務器空閑的時候定時執(zhí)行

使用嘩慶crontab來設置定時任務,如每天4點開始清理

本文參考:

Kafka數(shù)據(jù)丟失分析

Kafka存在丟消息的問題,消息丟失會發(fā)生在Broker,Producer和Consumer三種。

Broker丟失消息是由于Kafka本身的原因造成的,kafka為了得到更高的性能和吞吐量,將數(shù)據(jù)異步批量的存儲在磁盤中。消息的刷盤過程,為了提高性能,減少刷盤次數(shù),kafka采用了批量刷盤的做法。即,按照一定的消息量,和時間間隔進行刷盤。這種機制也是由于linux操作系統(tǒng)決定的。將數(shù)據(jù)存儲到linux操作系統(tǒng)種,會先存儲到頁緩存(Page cache)中,按照時間或者其他條件進行刷盤(從page cache到file),或者通過fsync命令強制刷盤。數(shù)據(jù)在page cache中時,如果系統(tǒng)掛掉,數(shù)據(jù)會丟失。

Broker在linux服務器上高速讀寫以及同步到Replica

上圖簡述了broker寫數(shù)據(jù)以及同步的一個過程。broker寫數(shù)據(jù)只寫到PageCache中,而pageCache位于內存。這部分數(shù)據(jù)在斷電后是會丟失的。pageCache的數(shù)據(jù)通過linux的flusher程序進行刷盤。刷盤觸發(fā)條件有三:

Broker配置刷盤機制,是通過調用fsync函數(shù)接管了刷盤動作。從單個Broker來看,pageCache的數(shù)據(jù)會丟失。

Kafka沒有提供同步刷盤的方式。同步刷盤在RocketMQ中有實現(xiàn),實現(xiàn)原理是將異步刷盤的流程進行阻塞,等待響應,類似ajax的callback或者是java的future。下面是一段rocketmq的源碼。

也就是說,理論上,要完全讓kafka保證單個broker不丟失消息是做不到的,只能通過調整刷盤機制的參數(shù)緩解該情況。比如,減少刷盤間隔,減少刷盤數(shù)據(jù)量大小。時間越短,性能越差,可靠性越好(盡可能可靠)。這是一個選擇題。

為了解決該問題,kafka通過producer和broker協(xié)同處理單個broker丟失參數(shù)的情況。一旦producer發(fā)現(xiàn)broker消息丟失,即可自動進行retry。除非retry次數(shù)超過閥值乎橘(可配置),消息才會丟失。此時需要生產(chǎn)者客戶端手動處理該情況。那么producer是如何檢測到數(shù)據(jù)丟失的呢?是通過ack機制,類似于http的三次握手的方式。

以上的引用是kafka官方對于參數(shù) acks 的解釋(在老版本中,該參數(shù)是 request.required.acks )。

上面第三點提到了ISR的列表的follower,需要配合另一個參數(shù)才能更好的保證ack的有效性。ISR是Broker維護的一個“可靠的follower列表”,in-sync Replica列表,broker的配置包含一個參數(shù): min.insync.replicas 。該參數(shù)表示ISR中最少的副本數(shù)。如果不設置該值,ISR中的follower列表可能為空。此時相當于acks=1。

如上圖中:

性能依次遞減,可靠性依次升高。

Producer丟失消息,發(fā)生在生產(chǎn)者客戶端。

為了提升效率,減少IO,producer在發(fā)送數(shù)據(jù)時可以將多個請求進行合并后發(fā)送。被合并的請求咋發(fā)送一線緩存在本地buffer中。緩存的方式和前文提到的刷盤類似,producer可以將請求打包成“塊”或者按照時間間隔,將buffer中的數(shù)據(jù)發(fā)出。通過buffer我們可以將生產(chǎn)者改造為異步的方式,而這可以提升我們的發(fā)送效率。

但是,buffer中的數(shù)據(jù)就是危險的。在正常情況下,客戶端的異步調用可以通過callback來處理消息發(fā)送失敗或者超時的情況,但是,一旦producer被非法的停止了歲談團,那么buffer中的數(shù)據(jù)將丟失,broker將無法收到該部分數(shù)據(jù)。又或者,當Producer客戶端內存不夠時,如果采取的策略是丟棄消息(另一種策略是block阻塞),消息也會被丟失。抑或,消息產(chǎn)生(異步產(chǎn)生)過快,導致掛起線程過多,內存不足,侍戚導致程序崩潰,消息丟失。

producer

根據(jù)上圖,可以想到幾個解決的思路:

Consumer消費消息有下面幾個步驟:

Consumer的消費方式主要分為兩種:

上面的示例是自動提交的例子。如果此時,insertIntoDB(record)發(fā)生異常,消息將會出現(xiàn)丟失。接下來是手動提交的例子:

將提交類型改為手動以后,可以保證消息“至少被消費一次”(at least once)。但此時可能出現(xiàn)重復消費的情況,重復消費不屬于本篇討論范圍。

上面兩個例子,是直接使用Consumer的High level API,客戶端對于offset等控制是透明的。也可以采用Low level API的方式,手動控制offset,也可以保證消息不丟,不過會更加復雜。

關于linux下查看kafka的介紹到此就結束了,不知道你從中找到你需要的信息了嗎 ?如果你還想了解更多這方面的信息,記得收藏關注本站。

香港服務器選創(chuàng)新互聯(lián),2H2G首月10元開通。
創(chuàng)新互聯(lián)(www.cdcxhl.com)互聯(lián)網(wǎng)服務提供商,擁有超過10年的服務器租用、服務器托管、云服務器、虛擬主機、網(wǎng)站系統(tǒng)開發(fā)經(jīng)驗。專業(yè)提供云主機、虛擬主機、域名注冊、VPS主機、云服務器、香港云服務器、免備案服務器等。


網(wǎng)頁題目:如何在Linux下查看Kafka的相關信息?(linux下查看kafka)
本文網(wǎng)址:http://m.5511xx.com/article/cdcgpjd.html