日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關咨詢
選擇下列產品馬上在線溝通
服務時間:8:30-17:00
你可能遇到了下面的問題
關閉右側工具欄

新聞中心

這里有您想知道的互聯(lián)網營銷解決方案
GO實現(xiàn)高并發(fā)高可用分布式系統(tǒng):Log微服務的實現(xiàn)

GO 實現(xiàn)高并發(fā)高可用分布式系統(tǒng):Log微服務的實現(xiàn)

作者:陳屹 2022-01-10 19:45:40

開發(fā)

架構

分布式 在大數據時代,具備高并發(fā),高可用,理解微服務系統(tǒng)設計的人員需求很大,如果你想從事后臺開發(fā),在JD的描述中最常見的要求就是有所謂的“高并發(fā)”系統(tǒng)開發(fā)經驗。

創(chuàng)新互聯(lián)建站10多年企業(yè)網站制作服務;為您提供網站建設,網站制作,網頁設計及高端網站定制服務,企業(yè)網站制作及推廣,對成都酒樓設計等多個方面擁有多年的網站營銷經驗的網站建設公司。

本文轉載自微信公眾號「Coding迪斯尼」,作者陳屹。轉載本文請聯(lián)系Coding迪斯尼公眾號。

在大數據時代,具備高并發(fā),高可用,理解微服務系統(tǒng)設計的人員需求很大,如果你想從事后臺開發(fā),在JD的描述中最常見的要求就是有所謂的“高并發(fā)”系統(tǒng)開發(fā)經驗。但我發(fā)現(xiàn)在市面上并沒有直接針對“高并發(fā)”,“高可用”的教程,你搜到的資料往往都是只言片語,要不就是闡述那些令人摸不著頭腦的理論。但是技術的掌握必須從實踐中來,我找了很久發(fā)現(xiàn)很少有指導人動手實踐基于微服務的高并發(fā)系統(tǒng)開發(fā),因此我希望結合自己的學習和實踐經驗跟大家分享一下這方面的技術,特別是要強調具體的動手實踐來理解和掌握分布式系統(tǒng)設計的理論和技術。

所謂“微服務”其實沒什么神奇的地方,它只不過是把我們原來聚合在一起的模塊分解成多個獨立的,基于服務器程序存在的形式,假設我們開發(fā)的后臺系統(tǒng)分為日志,存儲,業(yè)務邏輯,算法邏輯等模塊,以前這些模塊會聚合成一個整體形成一個復雜龐大的應用程序:

這種方式存在很多問題,第一是過多模塊糅合在一起會使得系統(tǒng)設計過于復雜,因為模塊直接存在各種邏輯耦合,這使得隨著時間的推移,系統(tǒng)的開發(fā)和維護變得越來越困難。第二是系統(tǒng)越來越脆弱,只要其中一個模塊發(fā)送錯誤或奔潰,整個系統(tǒng)可能就會垮塌。第三是可擴展性不強,系統(tǒng)很難通過硬件性能的增強而實現(xiàn)相應擴展。

要實現(xiàn)高并發(fā),高可用,其基本思路就是將模塊拆解,然后讓他們成為獨立運行的服務器程序,各個模塊之間通過消息發(fā)送的方式完成配合:

這種模式的好處在于:1,模塊之間解耦合,一個模塊出問題對整個系統(tǒng)影響很小。2,可擴展,高可用,我們可以將模塊部署到不同服務器上,當流量增加,我們只要簡單的增加服務器數量就能使得系統(tǒng)的響應能力實現(xiàn)同等擴展。3,魯棒性增強,由于模塊能備份多個,其中一個模塊出問題,請求可以重定向到其他同樣模塊,于是系統(tǒng)的可靠性能大大增強。

當然任何收益都有對應代價,分布式系統(tǒng)的設計開發(fā)相比于原來的聚合性系統(tǒng)會多出很多難點。例如負載均衡,服務發(fā)現(xiàn),模塊協(xié)商,共識達成等,分布式算法強調的就是這些問題的解決,但是理論總是抽象難以理解,倘若不能動手實現(xiàn)一個高可用高并發(fā)系統(tǒng),你看多少理論都是霧里看花,越看越糊涂,所以我們必須通過動手實踐來理解和掌握理論,首先我們從最簡單的服務入手,那就是日志服務,我們將使用GO來實現(xiàn)。

首先創(chuàng)建根目錄,可以命名為go_distributed_system,后面所有服務模塊都實現(xiàn)在該目錄下,然后創(chuàng)建子目錄proglog,進去后我們再創(chuàng)建子目錄internel/server/在這里我們實現(xiàn)日志服務的邏輯模塊,首先在internel/server下面執(zhí)行初始化命令:

  
 
 
 
  1. go mod init internal/server

這里開發(fā)的模塊會被其他模塊引用,所以我們需要創(chuàng)建mod文件。首先我們需要完成日志系統(tǒng)所需的底層數據結構,創(chuàng)建log.go文件,相應代碼如下:

  
 
 
 
  1. package server
  2. import (
  3.     "fmt"
  4.     "sync"
  5. )
  6. type Log struct {
  7.     mu sync.Mutex
  8.     records [] Record 
  9. }
  10. func NewLog() *Log {
  11.     return &Log{ch : make(chan Record),} 
  12. }
  13. func(c *Log) Append(record Record) (uint64, error) {
  14.      c.mu.Lock()
  15.     defer c.mu.Unlock()
  16.     record.Offset = uint64(len(c.records))
  17.     c.records = append(c.records, record)
  18.     return record.Offset, nil 
  19. }
  20. func (c *Log) Read(offset uint64)(Record, error) {
  21.     c.mu.Lock()
  22.     defer c.mu.Unlock()
  23.     if offset >= uint64(len(c.records)) {
  24.         return Record{}, ErrOffsetNotFound 
  25.     }
  26.     return c.records[offset], nil 
  27. }
  28. type Record struct {
  29.     Value []byte `json:"value"`
  30.     Offset uint64 `json:"offset"`
  31. }
  32. var ErrOffsetNotFound = fmt.Errorf("offset not found")

由于我們的日志服務將以http服務器程序的方式接收日志讀寫請求,因此多個讀或寫請求會同時執(zhí)行,所以我們需要對records數組進行互斥操作,因此使用了互斥鎖,在每次讀取records數組前先獲得鎖,這樣能防止服務在同時接收多個讀寫請求時破壞掉數據的一致性。

所有的日志讀寫請求會以http POST 和 GET的方式發(fā)起,數據通過json來封裝,所以我們下面將創(chuàng)建一個http服務器對象,新建文件http.go,完成如下代碼:

  
 
 
 
  1. package server 
  2. import (
  3.     "encoding/json"
  4.     "net/http"
  5.     "github.com/gorilla/mux"
  6. )
  7. func NewHttpServer(addr string) *http.Server {
  8.     httpsrv := newHttpServer()
  9.     r := mux.NewRouter()
  10.     r.HandleFunc("/", httpsrv.handleLogWrite).Methods("POST")
  11.     r.HandleFunc("/", httpsrv.hadnleLogRead).Methods("GET")
  12.     return &http.Server{
  13.         Addr : addr,
  14.         Handler: r,
  15.     }
  16. }
  17. type httpServer struct{
  18.     Log *Log 
  19. }
  20. func newHttpServer() *httpServer {
  21.     return &httpServer {
  22.         Log: NewLog(),
  23.     }
  24. }
  25. type WriteRequest struct {
  26.     Record Record `json:"record"`
  27. }
  28. type WriteResponse struct {
  29.     Offset uint64 `json:"offset"`
  30. }
  31. type ReadRequest struct {
  32.     Offset uint64 `json:"offset"`
  33. }
  34. type ReadResponse struct {
  35.     Record Record `json:"record"`
  36. }
  37. func (s *httpServer) handleLogWrite(w http.ResponseWriter, r * http.Request) {
  38.     var req WriteRequest 
  39.     //服務以json格式接收請求
  40.     err := json.NewDecoder(r.Body).Decode(&req)
  41.     if err != nil {
  42.         http.Error(w, err.Error(), http.StatusBadRequest)
  43.         return 
  44.     }
  45.     off, err := s.Log.Append(req.Record)
  46.     if err != nil {
  47.         http.Error(w, err.Error(), http.StatusInternalServerError)
  48.         return 
  49.     }
  50.     res := WriteResponse{Offset: off}
  51.     //服務以json格式返回結果
  52.     err = json.NewEncoder(w).Encode(res)
  53.     if err != nil {
  54.         http.Error(w, err.Error(), http.StatusInternalServerError)
  55.         return 
  56.     }
  57. }
  58. func (s *httpServer) hadnleLogRead(w http.ResponseWriter, r *http.Request) {
  59.     var req ReadRequest 
  60.     err := json.NewDecoder(r.Body).Decode(&req)
  61.     if err != nil {
  62.         http.Error(w, err.Error(), http.StatusBadRequest)
  63.         return 
  64.     }
  65.     record, err := s.Log.Read(req.Offset)
  66.     if err == ErrOffsetNotFound {
  67.         http.Error(w, err.Error(), http.StatusNotFound)
  68.         return
  69.     }
  70.     if err != nil {
  71.         http.Error(w, err.Error(), http.StatusInternalServerError)
  72.         return 
  73.     }
  74.     res := ReadResponse{Record: record}
  75.     err = json.NewEncoder(w).Encode(res)
  76.     if err != nil {
  77.         http.Error(w, err.Error(), http.StatusInternalServerError)
  78.         return
  79.     }
  80. }

上面代碼顯示出“分布式”,“微服務”的特點。相應的功能代碼以單獨服務器的形式運行,通過網絡來接收服務請求,這對應“分布式”,每個獨立模塊只完成一個特定任務,這就對應“微服務”,由于這種方式可以同時在不同的機器上運行,于是展示了“可擴展性”。

同時服務既然以http 服務器的形式存在,因此服務的請求和返回也要走Http形式,同時數據以Json方式進行封裝。同時實現(xiàn)的邏輯很簡單,但有日志寫請求時,我們把請求解析成Record結構體后加入到隊列末尾,當有讀取日志的請求時,我們獲得客戶端發(fā)來的讀取偏移,然后取出對應的記錄,封裝成json格式后返回給客戶。

完成了服務器的代碼后,我們需要將服務器運行起來,為了達到模塊化的目的,我們把服務器的啟動放置在另一個地方,在proglog根目錄下創(chuàng)建cmd/server,在里面添加main.go:

  
 
 
 
  1. package main 
  2. import (
  3.     "log"
  4.     "internal/server"
  5. )
  6. func main() {
  7.     srv := server.NewHttpServer(":8080")
  8.     log.Fatal(srv.ListenAndServe())
  9. }

同時為了能夠引用internal/server下面的模塊,我們需要在cmd/server下先通過go mod init cmd/server進行初始化,然后在go.mod文件中添加如下一行:

  
 
 
 
  1. replace internal/server => ../../internal/server

然后執(zhí)行命令 go mod tidy,這樣本地模塊就知道根據給定的目錄轉換去引用模塊,最后使用go run main.go啟動日志服務,現(xiàn)在我們要做的是測試服務器的可用性,我們同樣在目錄下創(chuàng)建server_test.go,然后編寫測試代碼,基本邏輯就是想服務器發(fā)送日志寫請求,然后再發(fā)送讀請求,并比較讀到的數據是否和我們寫入的數據一致,代碼如下:

  
 
 
 
  1. package main
  2. import(
  3.     "encoding/json"
  4.     "net/http"
  5.     "internal/server"
  6.     "bytes"
  7.     "testing"
  8.     "io/ioutil"
  9. )
  10. func TestServerLogWrite(t *testing.T) {
  11.     var tests = []struct {
  12.         request server.WriteRequest
  13.         want_response server.WriteResponse 
  14.     } {
  15.         {request: server.WriteRequest{server.Record{[]byte(`this is log request 1`), 0}}, 
  16.          want_response:  server.WriteResponse{Offset: 0, },},
  17.          {request: server.WriteRequest{server.Record{[]byte(`this is log request 2`), 0}}, 
  18.          want_response:  server.WriteResponse{Offset: 1, },},
  19.          {request: server.WriteRequest{server.Record{[]byte(`this is log request 3`), 0}}, 
  20.          want_response:  server.WriteResponse{Offset: 2, },},
  21.     }
  22.     for _, test := range tests {
  23.         //將請求轉換成json格式并post給日志服務
  24.         request := &test.request 
  25.         request_json, err := json.Marshal(request)
  26.         if err != nil {
  27.             t.Errorf("convert request to json fail")
  28.             return 
  29.         }
  30.         resp, err := http.Post("http://localhost:8080", "application/json",bytes.NewBuffer(request_json))
  31.         defer resp.Body.Close()
  32.         if err != nil {
  33.             t.Errorf("http post request fail: %v", err)
  34.             return
  35.         }
  36.         //解析日志服務返回結果
  37.         body, err := ioutil.ReadAll(resp.Body)
  38.         var response server.WriteResponse 
  39.         err = json.Unmarshal([]byte(body), &response)
  40.         if err != nil {
  41.             t.Errorf("Unmarshal write response fail: %v", err)
  42.         }
  43.         //檢測結果是否與預期一致
  44.         if response.Offset != test.want_response.Offset {
  45.             t.Errorf("got offset: %d, but want offset: %d", response.Offset, test.want_response.Offset)
  46.         }
  47.     }
  48.     var read_tests = []struct {
  49.         request server.ReadRequest 
  50.         want server.ReadResponse 
  51.     } {
  52.         {request: server.ReadRequest{Offset : 0,}, 
  53.         want: server.ReadResponse{server.Record{[]byte(`this is log request 1`), 0}} },
  54.         {request: server.ReadRequest{Offset : 1,}, 
  55.         want: server.ReadResponse{server.Record{[]byte(`this is log request 2`), 0}} },
  56.         {request: server.ReadRequest{Offset : 2,}, 
  57.         want: server.ReadResponse{server.Record{[]byte(`this is log request 3`), 0}} },
  58.     }
  59.     for _, test := range read_tests {
  60.         request := test.request 
  61.         request_json , err := json.Marshal(request)
  62.         if err != nil {
  63.             t.Errorf("convert read request to json fail")
  64.             return 
  65.         }
  66.         //將請求轉換為json并放入GET請求體
  67.         client := &http.Client{}
  68.         req, err := http.NewRequest(http.MethodGet, "http://localhost:8080", bytes.NewBuffer(request_json))
  69.         req.Header.Set("Content-Type", "application/json")
  70.         resp, err := client.Do(req)
  71.         if err != nil {
  72.             t.Errorf("read request fail: %v", err)
  73.             return 
  74.         }
  75.         //解析讀請求返回的結果
  76.         defer resp.Body.Close()
  77.         body, err := ioutil.ReadAll(resp.Body)
  78.         var response server.ReadResponse
  79.         err = json.Unmarshal([]byte(body), &response)
  80.         if err != nil {
  81.             t.Errorf("Unmarshal read response fail: %v", err)
  82.             return 
  83.         }
  84.         res := bytes.Compare(response.Record.Value, test.want.Record.Value)
  85.         if res != 0 {
  86.             t.Errorf("got value: %q, but want value : %q", response.Record.Value, test.want.Record.Value)
  87.         }
  88.     }
  89. }

完成上面代碼后,使用go test運行,結果如下圖所示:

從結果看到,我們的測試能通過,也就是無論是向日志服務提交寫入請求還是讀取請求,所得的結果跟我們預想的一致。總結一下,本節(jié)我們設計了一個簡單的JSON/HTTP 日志服務,它能夠接收基于JSON的http寫請求和讀請求,后面我們還會研究基于gPRC技術的微服務開發(fā)技術.

代碼獲取

https://github.com/wycl16514/golang_distribute_system_log_service.git


當前標題:GO實現(xiàn)高并發(fā)高可用分布式系統(tǒng):Log微服務的實現(xiàn)
網頁URL:http://m.5511xx.com/article/dpdgjsi.html