日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
請求量太大下游扛不住怎么辦?進來學一招

背景

這個問題簡單說一下背景,如果不明白可以看上篇文章 ,不想看也沒關(guān)系,這是個通用的解法,后面我會總結(jié)抽象下。

創(chuàng)新互聯(lián)專注于金牛網(wǎng)站建設(shè)服務(wù)及定制,我們擁有豐富的企業(yè)做網(wǎng)站經(jīng)驗。 熱誠為您提供金牛營銷型網(wǎng)站建設(shè),金牛網(wǎng)站制作、金牛網(wǎng)頁設(shè)計、金牛網(wǎng)站官網(wǎng)定制、微信小程序開發(fā)服務(wù),打造金牛網(wǎng)絡(luò)公司原創(chuàng)品牌,更為您提供金牛網(wǎng)站排名全網(wǎng)營銷落地服務(wù)。

在上篇文章的最后提到對每個摘除的地址做決策時,需要順序執(zhí)行,且每一個要摘除的地址都要實時獲取該集群的地址信息,以便做出是否需要兜底的決策。

當被摘除的機器非常多時,獲取地址信息的請求量就會非常大,對注冊中心造成了不小的壓力。

請求數(shù)據(jù)源的接口如下所示(其中 cuuid 是集群的 id)

type Read interface {
ListClusterEndpoints(ctx context.Context, cuuid string) ([]ptypes.Endpoint, error)
}

相信大家也能理解這個非常簡單的背景并且能想到一些解法。每次決策需要按 cuuid 獲取集群,也就是單個單個地獲取實時集群地址信息,由于是實時信息,緩存首先排除,其次自然而然地能想到如果能將請求合并一下,是不是就能解決請求量大的問題?

難點

如果只是改邏輯合并一下請求,吭哧吭哧改代碼就完了,也不值得寫這篇文章了,如何改最少的代碼來實現(xiàn)合并請求才是最難的。

解法

那天遇到這個問題,晚上輾轉(zhuǎn)反側(cè)想到了這個解法,其實主要也是參考 Go http client 的實現(xiàn),都說看源碼沒用,這不就是用處么?

Read? 數(shù)據(jù)源接口定義保持不變,也就是上層的業(yè)務(wù)代碼完全不用改,只需要把 ListClusterEndpoints 的實現(xiàn)換掉。

我們可以用一個隊列把每個請求入隊,入隊列以后,調(diào)用方阻塞,然后起一些協(xié)程去隊列里取一批請求參數(shù),發(fā)起批量請求,響應(yīng)之后喚醒阻塞的調(diào)用方。

為此,我們實現(xiàn)一個可以阻塞并被其他協(xié)程喚醒的工具:

type token struct {
value interface{}
err error
}

type Token chan token

func NewToken() Token {
return make(Token, 1)
}

func (t Token) Done(value interface{}, err error) {
t <- token{value: value, err: err}
}

func (t Token) Wait(timeout time.Duration) (value interface{}, err error) {
if timeout <= 0 {
tk := <-t
return tk.value, tk.err
}

select {
case tk := <-t:
return tk.value, tk.err
case <-time.After(timeout):
return nil, ErrTokenTimeout
}
}

其次,定義隊列和其他參數(shù):

type DataSource struct {
paramCh chan param
readTimeout time.Duration
concurrency int
step int
}

type param struct {
cuuid string
token Token
}

替換掉原來 ListClusterEndpoints 的實現(xiàn):

func (p *DataSource) ListClusterEndpoints(ctx context.Context, cuuid string) ([]ptypes.Endpoint, error) {
req := param{
cuuid: cuuid,
token: NewToken(),
}

select {
case p.paramCh <- req:
default:
return nil, fmt.Errorf("list cluster endpoints write channel failed")
}

value, err := req.token.Wait(p.readTimeout)
if err != nil {
return nil, err
}
eps, ok := value.([]ptypes.Endpoint)
if !ok {
return nil, fmt.Errorf("value is not endpoints")
}
return endpoints, nil
}

再起幾個協(xié)程來處理任務(wù):

func (p *DataSource) startListClusterEndpointsLoop() {
for i := 0; i < p.concurrency; i++ {
go func() {
for {
reqs := p.getListClusterEndpointsReqFromChan()
p.doBatchListClusterEndpoints(reqs)
}
}()
}
}

最關(guān)鍵的是 getListClusterEndpointsReqFromChan 的實現(xiàn),既不能讓協(xié)程空跑,這樣太消耗cpu,又要能及時地取到一批參數(shù),我們采取的方法是先阻塞地獲取一個參數(shù),如果沒數(shù)據(jù)則阻塞,如果有數(shù)據(jù),繼續(xù)取,直到數(shù)量達到上限或者取不到數(shù)據(jù)為止,此時這一批數(shù)據(jù)就可以批量地進行調(diào)用了。

func (p *DataSource) getListClusterEndpointsReqFromChan() []param {
reqs := make([]param, 0)
select {
case req := <-p.paramCh:
reqs = append(reqs, req)
for i := 1; i < p.step; i++ {
select {
case reqNext := <-p.paramCh:
reqs = append(reqs, reqNext)
default:
break
}
}
}
return reqs
}

最后

這個方法很簡單,但是有一些要注意的地方,得做好監(jiān)控,比如調(diào)用方單個請求的QPS、RT,實際批量請求的QPS、RT,這樣才好計算出處理協(xié)程開多少個合適,還有隊列寫入失敗、隊列長度等等監(jiān)控,當容量不足時及時做出調(diào)整。


文章題目:請求量太大下游扛不住怎么辦?進來學一招
新聞來源:http://m.5511xx.com/article/cojohio.html