From 6672f545b7542022f35cace1646597bee2459902 Mon Sep 17 00:00:00 2001 From: medcl Date: Tue, 24 Oct 2023 19:47:31 +0800 Subject: [PATCH] improve nodes discovery --- modules/agent/api/elasticsearch.go | 58 ++++++++++++++++++++++++++++++ modules/agent/api/remote_config.go | 33 +++++++++++------ 2 files changed, 80 insertions(+), 11 deletions(-) diff --git a/modules/agent/api/elasticsearch.go b/modules/agent/api/elasticsearch.go index 13bf827d..65c74aac 100644 --- a/modules/agent/api/elasticsearch.go +++ b/modules/agent/api/elasticsearch.go @@ -14,7 +14,9 @@ import ( "infini.sh/framework/core/model" "infini.sh/framework/core/orm" "infini.sh/framework/core/util" + "infini.sh/framework/modules/elastic/adapter" "infini.sh/framework/modules/elastic/common" + "infini.sh/framework/modules/elastic/metadata" "infini.sh/framework/plugins/managed/server" "net/http" "time" @@ -90,14 +92,70 @@ func refreshNodesInfo(inst *model.Instance) (*elastic.DiscoveryResult, error) { return nil, fmt.Errorf("error on get nodes info from agent: %w", err) } + newNodes := map[string]*elastic.LocalNodeInfo{} + //binding nodes info with agent for nodeID, node := range nodesInfo.Nodes { v, ok := enrolledNodesByAgent[nodeID] + node.Status = "online" if ok { node.ClusterID = v.ClusterID node.Enrolled = true + + //output + newNodes[nodeID] = node + } else { + newNodes[nodeID] = node } } + //TODO, merge requests to one + for k, v := range enrolledNodesByAgent { + if _, ok := newNodes[k]; !ok { + client := elastic.GetClient(v.ClusterID) + status := "online" + nodeInfo, err := client.GetNodeInfo(v.NodeUUID) + var clusterInfo *elastic.ClusterInformation + if err != nil ||nodeInfo == nil { + status= "offline" + + //get nodes information + nodeInfos, err := metadata.GetNodeInformation(v.ClusterID,[]string{v.NodeUUID}) + if err!=nil||len(nodeInfos)==0{ + log.Error("node info not found:",v.ClusterID,",",[]string{v.NodeUUID},",",err,err!=nil,len(nodeInfos)==0) + continue + } + + nodeInfo=nodeInfos[0] + + //get cluster information + clusterInfo,err=metadata.GetClusterInformation(v.ClusterID) + if err!=nil||clusterInfo==nil{ + log.Error("cluster info not found:",v.ClusterID,",",err,clusterInfo==nil) + continue + } + + + }else{ + clusterInfo, err = adapter.ClusterVersion(elastic.GetMetadata(v.ClusterID)) + if err != nil || clusterInfo == nil{ + log.Error(err) + continue + } + } + + newNodes[k] = &elastic.LocalNodeInfo{ + Status: status, + ClusterID: v.ClusterID, + NodeUUID: v.NodeUUID, + Enrolled: true, + NodeInfo: nodeInfo, + ClusterInfo: clusterInfo, + } + } + } + + nodesInfo.Nodes = newNodes + return nodesInfo, nil } diff --git a/modules/agent/api/remote_config.go b/modules/agent/api/remote_config.go index bd23ac63..5e3467a0 100644 --- a/modules/agent/api/remote_config.go +++ b/modules/agent/api/remote_config.go @@ -102,18 +102,21 @@ func dynamicAgentConfigProvider(instance model.Instance) []*common.ConfigFile { cfg := common.ConfigFile{} cfg.Name = "generated_metrics_tasks.yml" cfg.Location = "generated_metrics_tasks.yml" - cfg.Content = getAgentIngestConfigs(ids) + cfg.Content, cfg.Hash = getAgentIngestConfigs(instance.ID, ids) - hash := util.MD5digest(cfg.Content) + hash := cfg.Hash + if cfg.Hash == "" { + hash = util.MD5digest(cfg.Content) + } //if local's hash is different from remote's hash, then update local's hash, update version to current timestamp - v, err := kv.GetValue(LastAgentHash, []byte(global.Env().SystemConfig.NodeConfig.ID)) + v, err := kv.GetValue(LastAgentHash, []byte(global.Env().SystemConfig.NodeConfig.ID+":"+instance.ID)) if err != nil || v == nil || string(v) != hash { - err := kv.AddValue(LastAgentHash, []byte(global.Env().SystemConfig.NodeConfig.ID), []byte(hash)) + err := kv.AddValue(LastAgentHash, []byte(global.Env().SystemConfig.NodeConfig.ID+":"+instance.ID), []byte(hash)) if err != nil { panic(err) } latestTimestamp = time.Now().Unix() - log.Info("local hash is different from remote's hash, update local's hash, update version to current timestamp") + log.Infof("hash: %v vs %v, update version to current timestamp: %v", string(v), hash, latestTimestamp) } cfg.Size = int64(len(cfg.Content)) @@ -126,7 +129,11 @@ func dynamicAgentConfigProvider(instance model.Instance) []*common.ConfigFile { return result } -func getAgentIngestConfigs(items map[string]BindingItem) string { +func getAgentIngestConfigs(instance string, items map[string]BindingItem) (string, string) { + + if instance == "" { + panic("instance id is empty") + } buffer := bytes.NewBuffer([]byte("configs.template: ")) @@ -134,16 +141,18 @@ func getAgentIngestConfigs(items map[string]BindingItem) string { newItems := []util.KeyValue{} for k, v := range items { - newItems = append(newItems, util.KeyValue{Key: k, Value: v.Updated,Payload: v}) + newItems = append(newItems, util.KeyValue{Key: k, Value: v.Updated, Payload: v}) } - newItems=util.SortKeyValueArray(newItems,false) + newItems = util.SortKeyValueArray(newItems, false) var latestVersion int64 for _, x := range newItems { - v,ok:=x.Payload.(BindingItem) - if !ok{continue} + v, ok := x.Payload.(BindingItem) + if !ok { + continue + } if v.ClusterID == "" { panic("cluster id is empty") @@ -188,11 +197,13 @@ func getAgentIngestConfigs(items map[string]BindingItem) string { "NODE_LOGS_PATH: \"%v\"\n\n\n", v.NodeUUID, v.ClusterID, clusterEndPoint, username, password, clusterLevelEnabled, nodeLevelEnabled, v.PathLogs))) } + hash := util.MD5digest(buffer.String()) + //password: $[[keystore.$[[CLUSTER_ID]]_password]] buffer.WriteString("\n") buffer.WriteString(fmt.Sprintf("#MANAGED_CONFIG_VERSION: %v\n#MANAGED: true\n", latestVersion)) - return buffer.String() + return buffer.String(), hash } const LastAgentHash = "last_agent_hash"