新聞中心
本篇文章帶大家了解一下Golang緩存,深入淺出的介紹一下Golang中的緩存庫freecache,希望對大家有所幫助!

我們提供的服務(wù)有:網(wǎng)站制作、成都網(wǎng)站建設(shè)、微信公眾號開發(fā)、網(wǎng)站優(yōu)化、網(wǎng)站認(rèn)證、杜集ssl等。為上千企事業(yè)單位解決了網(wǎng)站和推廣的問題。提供周到的售前咨詢和貼心的售后服務(wù),是有科學(xué)管理、有技術(shù)的杜集網(wǎng)站制作公司
go開發(fā)緩存場景一般使用map或者緩存框架,為了線程安全會使用sync.Map或線程安全的緩存框架。
緩存場景中如果數(shù)據(jù)量大于百萬級別,需要特別考慮數(shù)據(jù)類型對于gc的影響(注意string類型底層是指針+Len+Cap,因此也算是指針類型),如果緩存key和value都是非指針類型的話就無需多慮了?!鞠嚓P(guān)推薦:Go視頻教程】
但實(shí)際應(yīng)用場景中,key和value是(包含)指針類型數(shù)據(jù)是很常見的,因此使用緩存框架需要特別注意其對gc影響,從是否對GC影響角度來看緩存框架大致分為2類:
- 零GC開銷:比如freecache或bigcache這種,底層基于ringbuf,減小指針個(gè)數(shù);
- 有GC開銷:直接基于Map來實(shí)現(xiàn)的緩存框架。
下面以freecache為例分析下其實(shí)現(xiàn)原理,代碼示例如下:
func main() {
cacheSize := 100 * 1024 * 1024
cache := freecache.NewCache(cacheSize)
for i := 0; i < N; i++ {
str := strconv.Itoa(i)
_ = cache.Set([]byte(str), []byte(str), 1)
}
now := time.Now()
runtime.GC()
fmt.Printf("freecache, GC took: %s\n", time.Since(now))
_, _ = cache.Get([]byte("aa"))
now = time.Now()
for i := 0; i < N; i++ {
str := strconv.Itoa(i)
_, _ = cache.Get([]byte(str))
}
fmt.Printf("freecache, Get took: %s\n\n", time.Since(now))
}
freecache.NewCache會初始化本地緩存,size表示存儲空間大小,freecache會初始化256個(gè)segment,每個(gè)segment是獨(dú)立的存儲單元,freecache加鎖維度也是基于segment的,每個(gè)segment有一個(gè)ringbuf,初始大小為size/256。freecache號稱零GC的來源就是其指針是固定的,只有512個(gè),每個(gè)segment有2個(gè),分別是rb和slotData(注意切片為指針類型)。
type segment struct {
rb RingBuf // ring buffer that stores data
segId int
_ uint32 // 占位
missCount int64
hitCount int64
entryCount int64
totalCount int64 // number of entries in ring buffer, including deleted entries.
totalTime int64 // used to calculate least recent used entry.
timer Timer // Timer giving current time
totalEvacuate int64 // used for debug
totalExpired int64 // used for debug
overwrites int64 // used for debug
touched int64 // used for debug
vacuumLen int64 // up to vacuumLen, new data can be written without overwriting old data.
slotLens [256]int32 // The actual length for every slot.
slotCap int32 // max number of entry pointers a slot can hold.
slotsData []entryPtr // 索引指針
}
func NewCacheCustomTimer(size int, timer Timer) (cache *Cache) {
cache = new(Cache)
for i := 0; i < segmentCount; i++ {
cache.segments[i] = newSegment(size/segmentCount, i, timer)
}
}
func newSegment(bufSize int, segId int, timer Timer) (seg segment) {
seg.rb = NewRingBuf(bufSize, 0)
seg.segId = segId
seg.timer = timer
seg.vacuumLen = int64(bufSize)
seg.slotCap = 1
seg.slotsData = make([]entryPtr, 256*seg.slotCap) // 每個(gè)slotData初始化256個(gè)單位大小
}
2 讀寫流程
freecache的key和value都是[]byte數(shù)組,使用時(shí)需要自行序列化和反序列化,如果緩存復(fù)雜對象不可忽略其序列化和反序列化帶來的影響,首先看下Set流程:
_ = cache.Set([]byte(str), []byte(str), 1)
Set流程首先對key進(jìn)行hash,hashVal類型uint64,其低8位segID對應(yīng)segment數(shù)組,低8-15位表示slotId對應(yīng)slotsData下標(biāo),高16位表示slotsData下標(biāo)對應(yīng)的[]entryPtr某個(gè)數(shù)據(jù),這里需要查找操作。注意[]entryPtr數(shù)組大小為slotCap(初始為1),當(dāng)擴(kuò)容時(shí)會slotCap倍增。
func (cache *Cache) Set(key, value []byte, expireSeconds int) (err error) {
hashVal := hashFunc(key)
segID := hashVal & segmentAndOpVal // 低8位
cache.locks[segID].Lock() // 加鎖
err = cache.segments[segID].set(key, value, hashVal, expireSeconds)
cache.locks[segID].Unlock()
}
func (seg *segment) set(key, value []byte, hashVal uint64, expireSeconds int) (err error) {
slotId := uint8(hashVal >> 8)
hash16 := uint16(hashVal >> 16)
slot := seg.getSlot(slotId)
idx, match := seg.lookup(slot, hash16, key)
var hdrBuf [ENTRY_HDR_SIZE]byte
hdr := (*entryHdr)(unsafe.Pointer(&hdrBuf[0]))
if match { // 有數(shù)據(jù)更新操作
matchedPtr := &slot[idx]
seg.rb.ReadAt(hdrBuf[:], matchedPtr.offset)
hdr.slotId = slotId
hdr.hash16 = hash16
hdr.keyLen = uint16(len(key))
originAccessTime := hdr.accessTime
hdr.accessTime = now
hdr.expireAt = expireAt
hdr.valLen = uint32(len(value))
if hdr.valCap >= hdr.valLen {
// 已存在數(shù)據(jù)value空間能存下此次value大小
atomic.AddInt64(&seg.totalTime, int64(hdr.accessTime)-int64(originAccessTime))
seg.rb.WriteAt(hdrBuf[:], matchedPtr.offset)
seg.rb.WriteAt(value, matchedPtr.offset+ENTRY_HDR_SIZE+int64(hdr.keyLen))
atomic.AddInt64(&seg.overwrites, 1)
return
}
// 刪除對應(yīng)entryPtr,涉及到slotsData內(nèi)存copy,ringbug中只是標(biāo)記刪除
seg.delEntryPtr(slotId, slot, idx)
match = false
// increase capacity and limit entry len.
for hdr.valCap < hdr.valLen {
hdr.valCap *= 2
}
if hdr.valCap > uint32(maxKeyValLen-len(key)) {
hdr.valCap = uint32(maxKeyValLen - len(key))
}
} else { // 無數(shù)據(jù)
hdr.slotId = slotId
hdr.hash16 = hash16
hdr.keyLen = uint16(len(key))
hdr.accessTime = now
hdr.expireAt = expireAt
hdr.valLen = uint32(len(value))
hdr.valCap = uint32(len(value))
if hdr.valCap == 0 { // avoid infinite loop when increasing capacity.
hdr.valCap = 1
}
}
// 數(shù)據(jù)實(shí)際長度為 ENTRY_HDR_SIZE=24 + key和value的長度
entryLen := ENTRY_HDR_SIZE + int64(len(key)) + int64(hdr.valCap)
slotModified := seg.evacuate(entryLen, slotId, now)
if slotModified {
// the slot has been modified during evacuation, we need to looked up for the 'idx' again.
// otherwise there would be index out of bound error.
slot = seg.getSlot(slotId)
idx, match = seg.lookup(slot, hash16, key)
// assert(match == false)
}
newOff := seg.rb.End()
seg.insertEntryPtr(slotId, hash16, newOff, idx, hdr.keyLen)
seg.rb.Write(hdrBuf[:])
seg.rb.Write(key)
seg.rb.Write(value)
seg.rb.Skip(int64(hdr.valCap - hdr.valLen))
atomic.AddInt64(&seg.totalTime, int64(now))
atomic.AddInt64(&seg.totalCount, 1)
seg.vacuumLen -= entryLen
return
}
seg.evacuate會評估ringbuf是否有足夠空間存儲key/value,如果空間不夠,其會從空閑空間尾部后一位(也就是待淘汰數(shù)據(jù)的開始位置)開始掃描(oldOff := seg.rb.End() + seg.vacuumLen - seg.rb.Size()),如果對應(yīng)數(shù)據(jù)已被邏輯deleted或者已過期,那么該塊內(nèi)存可以直接回收,如果不滿足回收條件,則將entry從環(huán)頭調(diào)換到環(huán)尾,再更新entry的索引,如果這樣循環(huán)5次還是不行,那么需要將當(dāng)前oldHdrBuf回收以滿足內(nèi)存需要。
執(zhí)行完seg.evacuate所需空間肯定是能滿足的,然后就是寫入索引和數(shù)據(jù)了,insertEntryPtr就是寫入索引操作,當(dāng)[]entryPtr中元素個(gè)數(shù)大于seg.slotCap(初始1)時(shí),需要擴(kuò)容操作,對應(yīng)方法見seg.expand,這里不再贅述。
寫入ringbuf就是執(zhí)行rb.Write即可。
func (seg *segment) evacuate(entryLen int64, slotId uint8, now uint32) (slotModified bool) {
var oldHdrBuf [ENTRY_HDR_SIZE]byte
consecutiveEvacuate := 0
for seg.vacuumLen < entryLen {
oldOff := seg.rb.End() + seg.vacuumLen - seg.rb.Size()
seg.rb.ReadAt(oldHdrBuf[:], oldOff)
oldHdr := (*entryHdr)(unsafe.Pointer(&oldHdrBuf[0]))
oldEntryLen := ENTRY_HDR_SIZE + int64(oldHdr.keyLen) + int64(oldHdr.valCap)
if oldHdr.deleted { // 已刪除
consecutiveEvacuate = 0
atomic.AddInt64(&seg.totalTime, -int64(oldHdr.accessTime))
atomic.AddInt64(&seg.totalCount, -1)
seg.vacuumLen += oldEntryLen
continue
}
expired := oldHdr.expireAt != 0 && oldHdr.expireAt < now
leastRecentUsed := int64(oldHdr.accessTime)*atomic.LoadInt64(&seg.totalCount) <= atomic.LoadInt64(&seg.totalTime)
if expired || leastRecentUsed || consecutiveEvacuate > 5 {
// 可以回收
seg.delEntryPtrByOffset(oldHdr.slotId, oldHdr.hash16, oldOff)
if oldHdr.slotId == slotId {
slotModified = true
}
consecutiveEvacuate = 0
atomic.AddInt64(&seg.totalTime, -int64(oldHdr.accessTime))
atomic.AddInt64(&seg.totalCount, -1)
seg.vacuumLen += oldEntryLen
if expired {
atomic.AddInt64(&seg.totalExpired, 1)
} else {
atomic.AddInt64(&seg.totalEvacuate, 1)
}
} else {
// evacuate an old entry that has been accessed recently for better cache hit rate.
newOff := seg.rb.Evacuate(oldOff, int(oldEntryLen))
seg.updateEntryPtr(oldHdr.slotId, oldHdr.hash16, oldOff, newOff)
consecutiveEvacuate++
atomic.AddInt64(&seg.totalEvacuate, 1)
}
}
}
freecache的Get流程相對來說簡單點(diǎn),通過hash找到對應(yīng)segment,通過slotId找到對應(yīng)索引slot,然后通過二分+遍歷尋找數(shù)據(jù),如果找不到直接返回ErrNotFound,否則更新一些time指標(biāo)。Get流程還會更新緩存命中率相關(guān)指標(biāo)。
func (cache *Cache) Get(key []byte) (value []byte, err error) {
hashVal := hashFunc(key)
segID := hashVal & segmentAndOpVal
cache.locks[segID].Lock()
value, _, err = cache.segments[segID].get(key, nil, hashVal, false)
cache.locks[segID].Unlock()
return
}
func (seg *segment) get(key, buf []byte, hashVal uint64, peek bool) (value []byte, expireAt uint32, err error) {
hdr, ptr, err := seg.locate(key, hashVal, peek) // hash+定位查找
if err != nil {
return
}
expireAt = hdr.expireAt
if cap(buf) >= int(hdr.valLen) {
value = buf[:hdr.valLen]
} else {
value = make([]byte, hdr.valLen)
}
seg.rb.ReadAt(value, ptr.offset+ENTRY_HDR_SIZE+int64(hdr.keyLen))
}
定位到數(shù)據(jù)之后,讀取ringbuf即可,注意一般來說讀取到的value是新創(chuàng)建的內(nèi)存空間,因此涉及到[]byte數(shù)據(jù)的復(fù)制操作。
3 總結(jié)
從常見的幾個(gè)緩存框架壓測性能來看,Set性能差異較大但還不是數(shù)量級別的差距,Get性能差異不大,因此對于絕大多數(shù)場景來說不用太關(guān)注Set/Get性能,重點(diǎn)應(yīng)該看功能是否滿足業(yè)務(wù)需求和gc影響,性能壓測比較見:https://golang2.eddycjy.com/posts/ch5/04-performance/
緩存有一個(gè)特殊的場景,那就是將數(shù)據(jù)全部緩存在內(nèi)存,涉及到更新時(shí)都是全量更新(替換),該場景下使用freecache,如果size未設(shè)置好可能導(dǎo)致部分?jǐn)?shù)據(jù)被淘汰,是不符合預(yù)期的,這個(gè)一定要注意。為了使用freecache避免該問題,需要將size設(shè)置"足夠大",但也要注意其內(nèi)存空間占用。
當(dāng)前文章:一文了解Golang中的緩存庫freecache
網(wǎng)站URL:http://m.5511xx.com/article/cojhies.html


咨詢
建站咨詢
