日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關咨詢
選擇下列產品馬上在線溝通
服務時間:8:30-17:00
你可能遇到了下面的問題
關閉右側工具欄

新聞中心

這里有您想知道的互聯(lián)網營銷解決方案
Go實現(xiàn)分布式高可用后臺:使用gRPC實現(xiàn)日志微服務

第一步還是要定義proto文件,修改proglog/api/v1下面的log.proto文件:

為朔州等地區(qū)用戶提供了全套網頁設計制作服務,及朔州網站建設行業(yè)解決方案。主營業(yè)務為成都網站制作、成都網站設計、朔州網站設計,以傳統(tǒng)方式定制建設網站,并提供域名空間備案等一條龍服務,秉承以專業(yè)、用心的態(tài)度為用戶提供真誠的服務。我們深信只要達到每一位用戶的要求,就會得到認可,從而選擇與我們長期合作。這樣,我們也可以走得更遠!

syntax = "proto3";
package log.v1;

option go_package = "api/log_v1";

service Log {
rpc Produce(ProduceRequest) returns (ProduceResponse) {}
rpc Consume(ConsumeRequest) returns (ConsumeResponse) {}
rpc ConsumeStream(ConsumeRequest) returns (stream ConsumeResponse){}
rpc ProduceStream(stream ProduceRequest) returns (stream ProduceResponse) {}
}

message Record {
bytes value = 1;
uint64 offset = 2;
}

message ProduceRequest {
Record record = 1;
}

message ProduceResponse {
uint64 offset = 1;
}

message ConsumeRequest {
uint64 offset = 1;
}

message ConsumeResponse {
Record record = 2;
}

代碼的邏輯跟前面幾節(jié)我們嘗試使用gRPC時的proto文件定義邏輯沒什么不同,Produce接口是客戶端向服務端提交一條日志信息,Consume是客戶端向服務端提交日志編號,然后服務端返回日志信息,ConsumeStream是客戶端向服務端提交一連串的日志編號,然后服務端返回一連串的日志信息,ProduceStream是客戶端向服務端提交一連串的日志信息,然后服務端返回日志添加后對應的編號。完成上面proto文件定義后,將它編譯出對應的pb.go文件,這些文件會放置在api/v1/api_log_v1目錄下,然后我們看看服務端的邏輯設計。

在internal/server下新建server.go文件,首先我們添加依賴模塊,同時生成gRPC服務器對象,并注冊我們要實現(xiàn)的接口:

package server 

import (
"context"
api "api/v1/api/log_v1"
"google.golang.org/grpc"
)

type commitLog interface {
Append(*api.Record) (uint64, error)
Read(uint64) (*api.Record, error)
}

type Config struct { //實現(xiàn)依賴注入
CommitLog commitLog
}

var _ api.LogServer = (*grpcServer)(nil) //gRPC服務器對象

func NewGRPCServer(config *Config) (*grpc.Server, error) {
gsrv := grpc.NewServer()
srv, err := newgrpcServer(config)
if err != nil {
return nil, err
}

api.RegisterLogServer(gsrv, srv)
return gsrv, nil
}

type grpcServer struct {
api.UnimplementedLogServer
*Config
}

func newgrpcServer(config *Config)(srv *grpcServer, err error) {
srv = &grpcServer { //grpcServer會實現(xiàn)proto文件里面定義的接口
Config: config,
}

return srv, nil
}

在上面代碼中有一點需要注意,那就是它使用了常用的設計模式叫依賴注入,我們的服務需要使用到日志模塊提供的功能,但是我們這里只需要知道日志模塊提供的接口,也就是Append和Read,我們不需要關心它的具體實現(xiàn),這樣我們就能實現(xiàn)邏輯上的解耦合,在啟動我們的服務程序時,只需要調用者將實現(xiàn)了commitLog接口的實例傳給我們即可,至于接口的實現(xiàn)細節(jié)我們不需要關心,通過依賴注入這種設計模式能夠使得系統(tǒng)設計的復雜度降低,靈活性提升。

接下來就是對四個服務接口的實現(xiàn),其邏輯跟我們前兩節(jié)做的沒有什么區(qū)別:

func (s *grpcServer) Produce(ctx context.Context, 
req *api.ProduceRequest) (*api.ProduceResponse, error){
//收到客戶端發(fā)來的日志添加請求,然后調用日志模塊Append接口進行添加
offset, err := s.CommitLog.Append(req.Record)
if err != nil {
return nil, err
}

//添加完成后返回日志編號
return &api.ProduceResponse{Offset: offset}, nil
}

func (s *grpcServer) Consume(ctx context.Context,
req *api.ConsumeRequest)(*api.ConsumeResponse, error) {
//收到客戶端發(fā)來的日志編號,返回日志內容
record, err := s.CommitLog.Read(req.Offset)
if err != nil {
return nil, err
}

return &api.ConsumeResponse{Record: record}, nil
}

func (s *grpcServer) ProduceStream (stream api.Log_ProduceStreamServer) error {
for {
//客戶端發(fā)來一系列日志數(shù)據(jù),服務端通過Recv()依次收取,然后將日志進行添加
req, err := stream.Recv()
if err != nil {
return err
}
res, err := s.Produce(stream.Context(), req)
if err != nil {
return err
}
if err = stream.Send(res); err != nil {
return err
}
}
}

func (s *grpcServer) ConsumeStream(req *api.ConsumeRequest, stream api.Log_ConsumeStreamServer) error {
for {
//客戶端發(fā)來一系列日志編號,服務端返回一系列與編號對應的日志內容
select {
case <-stream.Context().Done():
//進入這里表明客戶端終端了連接
return nil
default:
res, err := s.Consume(stream.Context(), req)
switch err.(type){
case nil:
case api.ErrorOffsetOutOfRange:
continue
default:
return err
}
//將獲得的日志信息發(fā)送給客戶端
if err = stream.Send(res); err != nil {
return err
}
req.Offset++
}
}
}

上面代碼的實現(xiàn)邏輯與我們前面描述的一模一樣,因此沒有多少可以探究的地方,最后我們測試一下上面代碼的實現(xiàn),新建server_test.go,添加內容如下:

package server 

import (
"context"
"io/ioutil"
"net"
"testing"
"github.com/stretchr/testify/require"
api "api/v1/api/log_v1"
"internal/log"
"google.golang.org/grpc"
)

func TestServer(t *testing.T) {
for scenario, fn := range map[string]func(
t *testing.T,
client api.LogClient,
config *Config,
) {
"produce/consume a meesage to/from the log success": testProduceConsume ,
"produce/consume stream success": testProduceConsumeStream,
"consume past log boundary fails: ": testConsumePastBoundary,
} {
t.Run(scenario, func(t *testing.T) {
//在運行測試用例前先創(chuàng)建服務端對象
client, config, teardown := setupTest(t, nil)
defer teardown() //關閉服務端
fn(t, client, config)
})
}
}

func setupTest(t *testing.T, fn func(*Config)) (client api.LogClient, cfg*Config, teardown func()) {
t.Helper()
//生成tcp連接,使用0意味著使用隨機端口
l, err := net.Listen("tcp", ":0")
require.NoError(t, err)

clientOptions := []grpc.DialOption{grpc.WithInsecure()}
cc, err := grpc.Dial(l.Addr().String() , clientOptions...)
require.NoError(t, err)

dir, err := ioutil.TempDir("", "server-test")
require.NoError(t, err)

clog, err := log.NewLog(dir, log.Config{})
require.NoError(t, err)

cfg = &Config{
CommitLog: clog,
}
if fn != nil {
fn(cfg)
}

//創(chuàng)建服務端對象
server, err := NewGRPCServer(cfg)
require.NoError(t ,err)
go func() {
//啟動服務端
server.Serve(l)
}()
//創(chuàng)建客戶端對象
client = api.NewLogClient(cc)
return client, cfg, func() {
server.Stop()
cc.Close()
l.Close()
clog.Remove()
}
}

func testProduceConsume(t *testing.T, client api.LogClient, config*Config) {
ctx := context.Background()
want := &api.Record{
Value: []byte("hello world"),
}
//客戶端提交一條日志,然后拿到日志編號后再用于請求日志內容,檢驗服務端返回的日志內容與提交的是否一致
produce, err := client.Produce(ctx, &api.ProduceRequest{
Record: want,
})

require.NoError(t, err)
consume, err := client.Consume(ctx, &api.ConsumeRequest{
Offset: produce.Offset,
})

require.NoError(t, err)
require.Equal(t, want.Value, consume.Record.Value)
require.Equal(t, want.Offset, consume.Record.Offset)
}

func testConsumePastBoundary(t *testing.T, client api.LogClient, config *Config) {
ctx := context.Background()
produce, err := client.Produce(ctx, &api.ProduceRequest{
Record: &api.Record {
Value: []byte("hello world"),
},
})

//使用不存在的日志編號進行請求,服務端應該返回相應錯誤
require.NoError(t, err)
consume, err := client.Consume(ctx, &api.ConsumeRequest{
Offset: produce.Offset + 1,
})

if consume != nil {
t.Fatal("consume not nil")
}

got := grpc.Code(err)
want := grpc.Code(api.ErrorOffsetOutOfRange{}.GRPCStatus().Err())
if got != want {
t.Fatalf("got err: %v, want %v", got, want)
}
}

func testProduceConsumeStream(t *testing.T, client api.LogClient, config *Config) {
ctx := context.Background()
records := []*api.Record{{
Value: []byte("first message"),
Offset: 0,
},
{
Value: []byte("second message"),
Offset: 0,
},
}
//客戶端向服務端提交多個日志,獲得多個日志編號,然后再提交獲得的編號,從而讓服務端返回一系列日志數(shù)據(jù)
//接著比對服務端返回的日志內容和服務端是否一致
{
stream, err := client.ProduceStream(ctx)
require.NoError(t, err)

for offset, record := range records {
err = stream.Send(&api.ProduceRequest{
Record: record,
})
require.NoError(t, err)
res, err := stream.Recv()
require.NoError(t, err)
if res.Offset != uint64(offset) {
t.Fatalf("got offset: %d, want: %d", res.Offset, offset,)
}
}
}

{
stream, err := client.ConsumeStream(ctx, &api.ConsumeRequest{Offset: 0},)
require.NoError(t, err)
for i, record := range records{
res, err := stream.Recv()
require.NoError(t, err)
require.Equal(t, res.Record, &api.Record{
Value: record.Value,
Offset: uint64(i),
})
}
}
}

測試代碼的邏輯通過注釋就能理解,在測試用例中,客戶端的創(chuàng)建,數(shù)據(jù)的發(fā)送和接收跟我們前面描述的沒什么區(qū)別,由此我們依靠gRPC框架就完成了日志服務,下一節(jié)我們看看gRPC框架提供的數(shù)據(jù)安全功能。


當前標題:Go實現(xiàn)分布式高可用后臺:使用gRPC實現(xiàn)日志微服務
網站鏈接:http://m.5511xx.com/article/cccjghi.html