日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
Nacos源碼系列—關(guān)于服務(wù)端那些事兒

前言

服務(wù)注冊

上面是我們從官網(wǎng)中找到的Nacos架構(gòu)圖,從這個(gè)圖中我們大體可以得出我們要找的接口應(yīng)該是在NamingService這個(gè)服務(wù)中,同時(shí)我們在項(xiàng)目結(jié)構(gòu)中也可以看到naming這個(gè)模塊,naming就是實(shí)現(xiàn)服務(wù)注冊的,我們都知道請求路徑都是通過controller來進(jìn)行處理的,而在其中我們可以看到一個(gè)InstanceController的這么一個(gè)類,那么注冊實(shí)例肯定會和它有關(guān)??梢钥吹絀nstanceController類的請求路由即是我們POST請求的路由的部分,如下:

創(chuàng)新互聯(lián)-專業(yè)網(wǎng)站定制、快速模板網(wǎng)站建設(shè)、高性價(jià)比開州網(wǎng)站開發(fā)、企業(yè)建站全套包干低至880元,成熟完善的模板庫,直接使用。一站式開州網(wǎng)站制作公司更省心,省錢,快速模板網(wǎng)站建設(shè)找我們,業(yè)務(wù)覆蓋開州地區(qū)。費(fèi)用合理售后完善,10年實(shí)體公司更值得信賴。

所以,我們就從開始研究接收請求處理服務(wù)注冊的源碼,我們找到通過RestFul API接口,請求類型為Post,的方法,符合條件的只有InstanceController.register方法,這個(gè)方法用來接收用戶的請求,并且把收到的信息進(jìn)行解析,裝換成實(shí)例信息,然后通過getInstanceOperator().registerInstance進(jìn)行調(diào)用,這個(gè)方法也是服務(wù)注冊的核心。

   @CanDistro
@PostMapping
@Secured(action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
//從 request信息中獲取namespaceId,如果沒有默認(rèn)為public
final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
//獲取服務(wù)名稱 格式:“group@@serviceName”
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
//將request參數(shù)還原成instance實(shí)例
final Instance instance = HttpRequestInstanceBuilder.newBuilder()
.setDefaultInstanceEphemeral(switchDomain.isDefaultInstanceEphemeral()).setRequest(request).build();
//【核心】注冊服務(wù)實(shí)例
getInstanceOperator().registerInstance(namespaceId, serviceName, instance);
return "ok";
}

我們先來看一下下面這個(gè)核心方法;

  private InstanceOperator getInstanceOperator() {
return upgradeJudgement.isUseGrpcFeatures() ? instanceServiceV2 : instanceServiceV1;
}

getInstanceOperator() 這個(gè)判斷是否走Grpc協(xié)議,默認(rèn)走Grpc,所以我們使用的是instanceServiceV2這個(gè)實(shí)例對象;

  private InstanceOperator getInstanceOperator() {
return upgradeJudgement.isUseGrpcFeatures() ? instanceServiceV2 : instanceServiceV1;
}

instanceServiceV2就是InstanceOperatorClientImpl,方法所以我們需要進(jìn)入的是下面這個(gè)實(shí)例的處理類。

具體方法如下所示:

  @Override
public void registerInstance(String namespaceId, String serviceName, Instance instance) {
//判斷是否為臨時(shí)客戶端
boolean ephemeral = instance.isEphemeral();
//獲取客戶端ID
String clientId = IpPortBasedClient.getClientId(instance.toInetAddr(), ephemeral);
//通過客戶端ID創(chuàng)建客戶端連接
createIpPortClientIfAbsent(clientId);
//獲取服務(wù)信息
Service service = getService(namespaceId, serviceName, ephemeral);
//注冊服務(wù)
clientOperationService.registerInstance(service, instance, clientId);
}

從Nacos2.0以后,新增了Client模型,管理與該客戶機(jī)有關(guān)的數(shù)據(jù)內(nèi)容,如果一個(gè)客戶機(jī)發(fā)布了一個(gè)服務(wù),那么這個(gè)客戶機(jī)發(fā)布的所有服務(wù)和訂閱者信息都會被更新到一個(gè)Client對象中,這個(gè)Client對象對應(yīng)于這個(gè)客戶機(jī)的鏈接,然后通過事件機(jī)制觸發(fā)索引信息的更新。Client負(fù)責(zé)管理一個(gè)客戶機(jī)的服務(wù)實(shí)例注冊Publish和服務(wù)訂閱Subscribe,可以方便地對需要推送的服務(wù)范圍進(jìn)行快速聚合,同時(shí)一個(gè)客戶端gRPC長連接對應(yīng)一個(gè)Client,每個(gè)Client有自己唯一的 clientId。

package com.alibaba.nacos.naming.core.v2.client;
public interface Client {

// 客戶端id
String getClientId();
// 是否臨時(shí)客戶端
boolean isEphemeral();
//設(shè)置客戶端更新時(shí)間
void setLastUpdatedTime();
//獲取客戶端更新時(shí)間
long getLastUpdatedTime();
// 服務(wù)實(shí)例注冊
boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo);
//服務(wù)實(shí)例移除
InstancePublishInfo removeServiceInstance(Service service);
//服務(wù)實(shí)例查詢
InstancePublishInfo getInstancePublishInfo(Service service);
Collection getAllPublishedService();
// 服務(wù)訂閱
boolean addServiceSubscriber(Service service, Subscriber subscriber);
///取消訂閱
boolean removeServiceSubscriber(Service service);
//查詢訂閱
Subscriber getSubscriber(Service service);
Collection getAllSubscribeService();
// 生成同步給其他節(jié)點(diǎn)的client數(shù)據(jù)
ClientSyncData generateSyncData();
// 是否過期
boolean isExpire(long currentTime);
// 釋放資源
void release();

}

知道了Client模型后,我們來接著從clientOperationService.registerInstance(service, instance, clientId);找到對應(yīng)的具體實(shí)現(xiàn)。

EphemeralClientOperationServiceImpl.registerInstance()

下面這個(gè)方法是具體來負(fù)責(zé)處理服務(wù)注冊,我們來詳細(xì)了解一下:

    @Override
public void registerInstance(Service service, Instance instance, String clientId) {
//確保Service單例存在,注意Service的equals和hasCode方法
Service singleton = ServiceManager.getInstance().getSingleton(service);
//如果不是臨時(shí)客戶端
if (!singleton.isEphemeral()) {
throw new NacosRuntimeException(NacosException.INVALID_PARAM,
String.format("Current service %s is persistent service, can't register ephemeral instance.",
singleton.getGroupedServiceName()));
}
//根據(jù)客戶端ID找到客戶端信息,這個(gè)關(guān)系在連接建立的時(shí)候存儲
Client client = clientManager.getClient(clientId);
if (!clientIsLegal(client, clientId)) {
return;
}
//將客戶端實(shí)例模型,裝換成服務(wù)端實(shí)例模型
InstancePublishInfo instanceInfo = getPublishInfo(instance);
//將實(shí)例存儲到client中
client.addServiceInstance(singleton, instanceInfo);
//設(shè)置最后更新時(shí)間
client.setLastUpdatedTime();
//建立服務(wù)和客戶端的關(guān)聯(lián)關(guān)系
NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
NotifyCenter
.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
}

ServiceManager.getInstance().getSingleton() 當(dāng)調(diào)用getSingleton的時(shí)候會負(fù)責(zé)管理service的單例,在這里service會重寫equlas和hasCode方法作為key。

public class ServiceManager {

//單例service 看service中equals和hasCode方法
private final ConcurrentHashMap singletonRepository;
//namespace下所有的service
private final ConcurrentHashMap> namespaceSingletonMaps;

//通過Map儲存單例的Service
public Service getSingleton(Service service) {
singletonRepository.putIfAbsent(service, service);
Service result = singletonRepository.get(service);
namespaceSingletonMaps.computeIfAbsent(result.getNamespace(), (namespace) -> new ConcurrentHashSet<>());
namespaceSingletonMaps.get(result.getNamespace()).add(result);
return result;
}
}

service中 equal和hasCode方法,namespace+group+name在服務(wù)端是一個(gè)單例Service。

    @Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof Service)) {
return false;
}
Service service = (Service) o;
return namespace.equals(service.namespace) && group.equals(service.group) && name.equals(service.name);
}

@Override
public int hashCode() {
return Objects.hash(namespace, group, name);
}

clientManager.getClient() 這里對應(yīng)的實(shí)現(xiàn)類為ConnectionBasedClientManager這個(gè)實(shí)現(xiàn)類負(fù)責(zé)管理長連接clientId和client模型的映射關(guān)系。

@Component("connectionBasedClientManager")
public class ConnectionBasedClientManager extends ClientConnectionEventListener implements ClientManager {
//通過map存儲ID和client之間的關(guān)聯(lián)關(guān)系
private final ConcurrentMap clients = new ConcurrentHashMap<>();

//根據(jù)clientId查詢client
@Override
public Client getClient(String clientId) {
return clients.get(clientId);
}
}

client.addServiceInstance(); 抽象類為AbstractClient:負(fù)責(zé)存儲當(dāng)前客戶端服務(wù)注冊表,也就是 service和instance的關(guān)系。

  protected final ConcurrentHashMap publishers = new ConcurrentHashMap<>(16, 0.75f, 1);、    

//將service和實(shí)例進(jìn)行關(guān)聯(lián)
@Override
public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
if (null == publishers.put(service, instancePublishInfo)) {
//監(jiān)控指標(biāo)自增實(shí)例數(shù)
MetricsMonitor.incrementInstanceCount();
}
NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());
return true;
}

ClientOperationEvent.ClientRegisterServiceEvent() :這里目的是為了過濾目標(biāo)服務(wù)得到最終instance列表建立service和client的關(guān)系,能夠方便我們快速查詢,同時(shí)會觸發(fā)ClientServiceIndexesManager的監(jiān)聽事件。


//服務(wù)與發(fā)布client的關(guān)系
private final ConcurrentMap> publisherIndexes = new ConcurrentHashMap<>();
//服務(wù)與訂閱clientId的關(guān)系
private final ConcurrentMap> subscriberIndexes = new ConcurrentHashMap<>();

private void handleClientOperation(ClientOperationEvent event) {
Service service = event.getService();
String clientId = event.getClientId();
if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {
addPublisherIndexes(service, clientId);
} else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {
removePublisherIndexes(service, clientId);
} else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {
addSubscriberIndexes(service, clientId);
} else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {
removeSubscriberIndexes(service, clientId);
}
}


private void addPublisherIndexes(Service service, String clientId) {
publisherIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
publisherIndexes.get(service).add(clientId);
NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
}

private void removePublisherIndexes(Service service, String clientId) {
if (!publisherIndexes.containsKey(service)) {
return;
}
publisherIndexes.get(service).remove(clientId);
NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
}

private void addSubscriberIndexes(Service service, String clientId) {
subscriberIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
// Fix #5404, Only first time add need notify event.
if (subscriberIndexes.get(service).add(clientId)) {
NotifyCenter.publishEvent(new ServiceEvent.ServiceSubscribedEvent(service, clientId));
}
}

private void removeSubscriberIndexes(Service service, String clientId) {
if (!subscriberIndexes.containsKey(service)) {
return;
}
subscriberIndexes.get(service).remove(clientId);
if (subscriberIndexes.get(service).isEmpty()) {
subscriberIndexes.remove(service);
}
}

請求流程圖:

服務(wù)端監(jiān)控檢查

Nacos作為注冊中心不止提供了服務(wù)注冊和服務(wù)發(fā)現(xiàn)的功能,還提供了服務(wù)可用性檢測的功能,在1.0的版本中,臨時(shí)實(shí)例走的是distro協(xié)議,客戶端向注冊中心發(fā)送心跳來維持自身的健康(healthy)狀態(tài),持久實(shí)例則走的是Raft協(xié)議存儲。

1.兩種檢測機(jī)制:

  • 客戶端主動(dòng)上報(bào)機(jī)制
  • 服務(wù)器端主動(dòng)下探機(jī)制

客戶端主動(dòng)上報(bào)機(jī)制:你主動(dòng)找上級,說你沒有打卡(不健康狀態(tài))。

服務(wù)器端主動(dòng)下探機(jī)制:上級檢測到你有不打卡的記錄,主動(dòng)來找你。

對于Nacos健康檢測機(jī)制,我們不能主動(dòng)去設(shè)置,但是健康檢查機(jī)制是和Nacos的服務(wù)實(shí)例類型強(qiáng)相關(guān),主要是有兩種服務(wù)實(shí)例:

  • 臨時(shí)實(shí)例:客戶端主動(dòng)上報(bào)
  • 持久實(shí)例:服務(wù)端主動(dòng)下探

客戶端主動(dòng)上報(bào)

臨時(shí)實(shí)例每隔5秒會主動(dòng)上報(bào)自己的健康狀態(tài),發(fā)送心跳,如果發(fā)送心跳的間隔時(shí)間超過15秒,Nacos服務(wù)器端會將服務(wù)標(biāo)記為亞健康狀態(tài),如果超過30S沒有發(fā)送心跳,那么服務(wù)實(shí)例會被從服務(wù)列表中剔除。

在2.0版本以后,持久實(shí)例不變,臨時(shí)實(shí)例而是通過長連接來判斷實(shí)例是否健康。

2.長連接:

一個(gè)連接上可以連續(xù)發(fā)送多數(shù)據(jù)包,在連接保持期間,如果沒有數(shù)據(jù)包發(fā)送,需要雙方發(fā)鏈路檢測包,在Nacos2.0之后,使用Grpc協(xié)議代替了http協(xié)議。長連接會保持客戶端和服務(wù)端發(fā)送的狀態(tài),在源碼中ConnectionManager 管理所有客戶端的長連接。

ConnectionManager: 每3秒檢測所有超過20S內(nèi)沒有發(fā)生過通訊的客戶端,向客戶端發(fā)起ClientDetectionRequest探測請求,如果客戶端在1s內(nèi)成功響應(yīng),則檢測通過,否則執(zhí)行unregister方法移除Connection。

如果客戶端持續(xù)和服務(wù)端進(jìn)行通訊,服務(wù)端是不需要主動(dòng)下探的,只有當(dāng)客戶端沒有一直和服務(wù)端通信的時(shí)候,服務(wù)端才會主動(dòng)下探操作。

@Service
public class ConnectionManager extends Subscriber {

Map connections = new ConcurrentHashMap();

//只要spring容器啟動(dòng),會觸發(fā)這個(gè)方法
@PostConstruct
public void start() {
// 啟動(dòng)不健康連接排除功能.
RpcScheduledExecutor.COMMON_SERVER_EXECUTOR.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
// 1. 統(tǒng)計(jì)過時(shí)(20s)連接
Set> entries = connections.entrySet();
//2.獲得需要剔除的IP和端口
//3.根據(jù)限制獲取剔除的IP和端口
//4.如果還是有需要剔除的客戶端,則繼續(xù)執(zhí)行
//5.沒有活動(dòng)的客戶端執(zhí)行探測
//6.如果沒有馬上響應(yīng),則馬上剔除
//7.剔除后發(fā)布ClientDisconnectEvent事件
}
});

}
}

//注銷(移出)連接方法
public synchronized void unregister(String connectionId) {
Connection remove = this.connections.remove(connectionId);
if (remove != null) {
String clientIp = remove.getMetaInfo().clientIp;
AtomicInteger atomicInteger = connectionForClientIp.get(clientIp);
if (atomicInteger != null) {
int count = atomicInteger.decrementAndGet();
if (count <= 0) {
connectionForClientIp.remove(clientIp);
}
}
remove.close();
Loggers.REMOTE_DIGEST.info("[{}]Connection unregistered successfully. ", connectionId);
clientConnectionEventListenerRegistry.notifyClientDisConnected(remove);
}

當(dāng)服務(wù)端操作移除事件以后,會操作notifyClientDisConnected()方法,主要調(diào)用的是 clientConnectionEventListener.clientDisConnected(connection)方法,將連接信息傳入進(jìn)去。

  public void notifyClientDisConnected(final Connection connection) {

for (ClientConnectionEventListener clientConnectionEventListener : clientConnectionEventListeners) {
try {
clientConnectionEventListener.clientDisConnected(connection);
} catch (Throwable throwable) {
Loggers.REMOTE.info("[NotifyClientDisConnected] failed for listener {}",
clientConnectionEventListener.getName(), throwable);
}
}

clientConnectionEventListenerd的實(shí)現(xiàn)類是ConnectionBasedClientManager,在這里面會出發(fā)清除索引緩存等操作。

@Component("connectionBasedClientManager")
public class ConnectionBasedClientManager extends ClientConnectionEventListener implements ClientManager {
@Override
public boolean clientDisconnected(String clientId) {
Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId);
//同步移除client數(shù)據(jù)
ConnectionBasedClient client = clients.remove(clientId);
if (null == client) {
return true;
}
client.release();
//服務(wù)訂閱,將變更通知到客戶端
NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));
return true;
}
}

總結(jié)

到這里Nacos服務(wù)端的基礎(chǔ)的源碼就講完了,有些地方我們沒有展開來講,在后續(xù)的源碼講解中,會給大家詳細(xì)的進(jìn)行講解,今天主要講解了,服務(wù)端注冊以及監(jiān)控檢查的基礎(chǔ)代碼,后面會有最新的內(nèi)容呈現(xiàn)給大家,如果覺得文中對您有幫助的。


新聞標(biāo)題:Nacos源碼系列—關(guān)于服務(wù)端那些事兒
網(wǎng)站鏈接:http://m.5511xx.com/article/dpodhde.html