From 0a6d0e569644040ecd789ba2829bf75b5c4bd890 Mon Sep 17 00:00:00 2001 From: medcl Date: Sat, 16 Dec 2023 14:48:51 +0800 Subject: [PATCH] fix: enroll should handle recognized nodes (#261) Reviewed-on: https://git.infini.ltd/infini/console/pulls/261 --- modules/agent/api/elasticsearch.go | 149 ++++++++++++++++++----------- modules/agent/common/config.go | 2 +- 2 files changed, 94 insertions(+), 57 deletions(-) diff --git a/modules/agent/api/elasticsearch.go b/modules/agent/api/elasticsearch.go index b0830a71..20e49996 100644 --- a/modules/agent/api/elasticsearch.go +++ b/modules/agent/api/elasticsearch.go @@ -379,7 +379,7 @@ type ClusterInfo struct { ClusterIDs []string `json:"cluster_id"` } -var autoEnrollRunning=atomic.Bool{} +var autoEnrollRunning = atomic.Bool{} func (h *APIHandler) autoEnrollESNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { //{"cluster_id":["infini_default_system_cluster"]} @@ -398,8 +398,7 @@ func (h *APIHandler) autoEnrollESNode(w http.ResponseWriter, req *http.Request, panic(errors.New("please select cluster to enroll")) } - - if autoEnrollRunning.Load(){ + if autoEnrollRunning.Load() { return } @@ -407,18 +406,20 @@ func (h *APIHandler) autoEnrollESNode(w http.ResponseWriter, req *http.Request, go func(clusterInfo ClusterInfo) { defer func() { autoEnrollRunning.Swap(false) - if r := recover(); r != nil { - var v string - switch r.(type) { - case error: - v = r.(error).Error() - case runtime.Error: - v = r.(runtime.Error).Error() - case string: - v = r.(string) - } - if v != "" { - log.Error(v) + if !global.Env().IsDebug { + if r := recover(); r != nil { + var v string + switch r.(type) { + case error: + v = r.(error).Error() + case runtime.Error: + v = r.(runtime.Error).Error() + case string: + v = r.(string) + } + if v != "" { + log.Error(v) + } } } log.Debug("finish auto enroll") @@ -451,15 +452,24 @@ func (h *APIHandler) autoEnrollESNode(w http.ResponseWriter, req *http.Request, } log.Debugf("instance:%v,%v, has: %v nodes, %v unknown nodes", instanceID, instanceEndpoint, len(nodes.Nodes), len(nodes.UnknownProcess)) if len(nodes.UnknownProcess) > 0 { - pids:=h.bindInstanceToCluster(clusterInfo, nodes, instanceID, instanceEndpoint) - log.Infof("instance:%v,%v, success enroll %v nodes",instanceID, instanceEndpoint,len(pids)) + pids := h.bindInstanceToCluster(clusterInfo, nodes, instanceID, instanceEndpoint) + log.Infof("instance:%v,%v, success enroll %v nodes", instanceID, instanceEndpoint, len(pids)) + } + + if len(nodes.Nodes)>0{ + for k,v:=range nodes.Nodes{ + log.Debug(k,v.Status,v.Enrolled) + if !v.Enrolled{ + pids := h.bindInstanceToCluster(clusterInfo, nodes, instanceID, instanceEndpoint) + log.Infof("instance:%v,%v, success enroll %v nodes", instanceID, instanceEndpoint, len(pids)) + } + } } } } } } - }(clusterInfo) //get all unknown nodes @@ -528,7 +538,6 @@ func (h *APIHandler) bindInstanceToCluster(clusterInfo ClusterInfo, nodes *elast for _, clusterID := range clusterInfo.ClusterIDs { meta := elastic.GetMetadata(clusterID) if meta != nil { - states, err := elastic.GetClient(clusterID).GetClusterState() if err != nil || states == nil { log.Error(err) @@ -537,53 +546,43 @@ func (h *APIHandler) bindInstanceToCluster(clusterInfo ClusterInfo, nodes *elast clusterUUID := states.ClusterUUID - if meta.Config.AgentCredentialID != "" { + //no auth or agent auth configured + if meta.Config.AgentCredentialID != "" || meta.Config.CredentialID == "" { auth, err := common.GetAgentBasicAuth(meta.Config) if err != nil { panic(err) } - if auth != nil { - //try connect - for _, node := range nodes.UnknownProcess { - for _, v := range node.ListenAddresses { - ip := v.IP - if util.ContainStr(v.IP, "::") { - ip = fmt.Sprintf("[%s]", v.IP) - } - nodeHost := fmt.Sprintf("%s:%d", ip, v.Port) - success, tryAgain, nodeInfo := h.getESNodeInfoViaProxy(nodeHost, "http", auth, instanceEndpoint) - if !success && tryAgain { - //try https again - success, tryAgain, nodeInfo = h.getESNodeInfoViaProxy(nodeHost, "https", auth, instanceEndpoint) + + for _,v:=range nodes.Nodes{ + if !v.Enrolled{ + if v.NodeInfo!=nil{ + pid:=v.NodeInfo.Process.Id + nodeHost:=v.NodeInfo.GetHttpPublishHost() + nodeInfo:=h.internalProcessBind(clusterID,clusterUUID,instanceID,instanceEndpoint,pid,nodeHost,auth) + if nodeInfo!=nil{ + discoveredPIDs[pid] = nodeInfo } + } + } + } - if success { - log.Debug("connect to es node success:", nodeHost, ", pid: ", node.PID) - discoveredPIDs[node.PID] = nodeInfo + //try connect + for _, node := range nodes.UnknownProcess { - if nodeInfo.ClusterInfo.ClusterUUID != clusterUUID { - log.Error("cluster uuid not match, cluster id: ", clusterID, ", cluster uuid: ", clusterUUID, ", node cluster uuid: ", nodeInfo.ClusterInfo.ClusterUUID) - continue - } + pid:=node.PID - //enroll this node - item := BindingItem{ - ClusterID: clusterID, - ClusterUUID: nodeInfo.ClusterInfo.ClusterUUID, - NodeUUID: nodeInfo.NodeUUID, - } + for _, v := range node.ListenAddresses { - settings := NewNodeAgentSettings(instanceID, &item) - err = orm.Update(&orm.Context{ - Refresh: "wait_for", - }, settings) + ip := v.IP + port:=v.Port - if err == nil { - nodeInfo.ClusterID = clusterID - nodeInfo.Enrolled = true - } - break - } + if util.ContainStr(ip, "::") { + ip = fmt.Sprintf("[%s]", ip) + } + nodeHost := fmt.Sprintf("%s:%d", ip, port) + nodeInfo:=h.internalProcessBind(clusterID,clusterUUID,instanceID,instanceEndpoint,pid,nodeHost,auth) + if nodeInfo!=nil{ + discoveredPIDs[pid] = nodeInfo } } } @@ -594,6 +593,44 @@ func (h *APIHandler) bindInstanceToCluster(clusterInfo ClusterInfo, nodes *elast return discoveredPIDs } +func (h *APIHandler) internalProcessBind(clusterID,clusterUUID,instanceID,instanceEndpoint string,pid int,nodeHost string,auth *model.BasicAuth) *elastic.LocalNodeInfo{ + success, tryAgain, nodeInfo := h.getESNodeInfoViaProxy(nodeHost, "http", auth, instanceEndpoint) + if !success && tryAgain { + //try https again + success, tryAgain, nodeInfo = h.getESNodeInfoViaProxy(nodeHost, "https", auth, instanceEndpoint) + } + + log.Debug(clusterUUID,nodeHost,instanceEndpoint,success, tryAgain, nodeInfo) + + if success { + log.Debug("connect to es node success:", nodeHost, ", pid: ", pid) + if nodeInfo.ClusterInfo.ClusterUUID != clusterUUID { + log.Info("cluster uuid not match, cluster id: ", clusterID, ", cluster uuid: ", clusterUUID, ", node cluster uuid: ", nodeInfo.ClusterInfo.ClusterUUID) + return nil + } + + //enroll this node + item := BindingItem{ + ClusterID: clusterID, + ClusterUUID: nodeInfo.ClusterInfo.ClusterUUID, + NodeUUID: nodeInfo.NodeUUID, + } + + settings := NewNodeAgentSettings(instanceID, &item) + err := orm.Update(&orm.Context{ + Refresh: "wait_for", + }, settings) + + if err == nil { + nodeInfo.ClusterID = clusterID + nodeInfo.Enrolled = true + } + return nodeInfo + } + return nil +} + + func (h *APIHandler) getESNodeInfoViaProxy(esHost string, esSchema string, auth *model.BasicAuth, endpoint string) (success, tryAgain bool, info *elastic.LocalNodeInfo) { esConfig := elastic.ElasticsearchConfig{Host: esHost, Schema: esSchema, BasicAuth: auth} return h.getESNodeInfoViaProxyWithConfig(&esConfig, auth, endpoint) diff --git a/modules/agent/common/config.go b/modules/agent/common/config.go index 6cf5b6fc..3cb7d7f1 100644 --- a/modules/agent/common/config.go +++ b/modules/agent/common/config.go @@ -22,7 +22,7 @@ func GetAgentConfig() *model.AgentConfig { } _, err := env.ParseConfig("agent", agentCfg ) if err != nil { - log.Debug("agent config not found: %v", err) + log.Errorf("agent config not found: %v", err) } if agentCfg.Setup.CACertFile == "" && agentCfg.Setup.CAKeyFile == "" { agentCfg.Setup.CACertFile, agentCfg.Setup.CAKeyFile, err = common.GetOrInitDefaultCaCerts()