improve nodes discovery

This commit is contained in:
medcl 2023-10-24 19:47:31 +08:00
parent 4c8ee4a3e2
commit 6672f545b7
2 changed files with 80 additions and 11 deletions

View File

@ -14,7 +14,9 @@ import (
"infini.sh/framework/core/model"
"infini.sh/framework/core/orm"
"infini.sh/framework/core/util"
"infini.sh/framework/modules/elastic/adapter"
"infini.sh/framework/modules/elastic/common"
"infini.sh/framework/modules/elastic/metadata"
"infini.sh/framework/plugins/managed/server"
"net/http"
"time"
@ -90,14 +92,70 @@ func refreshNodesInfo(inst *model.Instance) (*elastic.DiscoveryResult, error) {
return nil, fmt.Errorf("error on get nodes info from agent: %w", err)
}
newNodes := map[string]*elastic.LocalNodeInfo{}
//binding nodes info with agent
for nodeID, node := range nodesInfo.Nodes {
v, ok := enrolledNodesByAgent[nodeID]
node.Status = "online"
if ok {
node.ClusterID = v.ClusterID
node.Enrolled = true
//output
newNodes[nodeID] = node
} else {
newNodes[nodeID] = node
}
}
//TODO, merge requests to one
for k, v := range enrolledNodesByAgent {
if _, ok := newNodes[k]; !ok {
client := elastic.GetClient(v.ClusterID)
status := "online"
nodeInfo, err := client.GetNodeInfo(v.NodeUUID)
var clusterInfo *elastic.ClusterInformation
if err != nil ||nodeInfo == nil {
status= "offline"
//get nodes information
nodeInfos, err := metadata.GetNodeInformation(v.ClusterID,[]string{v.NodeUUID})
if err!=nil||len(nodeInfos)==0{
log.Error("node info not found:",v.ClusterID,",",[]string{v.NodeUUID},",",err,err!=nil,len(nodeInfos)==0)
continue
}
nodeInfo=nodeInfos[0]
//get cluster information
clusterInfo,err=metadata.GetClusterInformation(v.ClusterID)
if err!=nil||clusterInfo==nil{
log.Error("cluster info not found:",v.ClusterID,",",err,clusterInfo==nil)
continue
}
}else{
clusterInfo, err = adapter.ClusterVersion(elastic.GetMetadata(v.ClusterID))
if err != nil || clusterInfo == nil{
log.Error(err)
continue
}
}
newNodes[k] = &elastic.LocalNodeInfo{
Status: status,
ClusterID: v.ClusterID,
NodeUUID: v.NodeUUID,
Enrolled: true,
NodeInfo: nodeInfo,
ClusterInfo: clusterInfo,
}
}
}
nodesInfo.Nodes = newNodes
return nodesInfo, nil
}

View File

@ -102,18 +102,21 @@ func dynamicAgentConfigProvider(instance model.Instance) []*common.ConfigFile {
cfg := common.ConfigFile{}
cfg.Name = "generated_metrics_tasks.yml"
cfg.Location = "generated_metrics_tasks.yml"
cfg.Content = getAgentIngestConfigs(ids)
cfg.Content, cfg.Hash = getAgentIngestConfigs(instance.ID, ids)
hash := util.MD5digest(cfg.Content)
hash := cfg.Hash
if cfg.Hash == "" {
hash = util.MD5digest(cfg.Content)
}
//if local's hash is different from remote's hash, then update local's hash, update version to current timestamp
v, err := kv.GetValue(LastAgentHash, []byte(global.Env().SystemConfig.NodeConfig.ID))
v, err := kv.GetValue(LastAgentHash, []byte(global.Env().SystemConfig.NodeConfig.ID+":"+instance.ID))
if err != nil || v == nil || string(v) != hash {
err := kv.AddValue(LastAgentHash, []byte(global.Env().SystemConfig.NodeConfig.ID), []byte(hash))
err := kv.AddValue(LastAgentHash, []byte(global.Env().SystemConfig.NodeConfig.ID+":"+instance.ID), []byte(hash))
if err != nil {
panic(err)
}
latestTimestamp = time.Now().Unix()
log.Info("local hash is different from remote's hash, update local's hash, update version to current timestamp")
log.Infof("hash: %v vs %v, update version to current timestamp: %v", string(v), hash, latestTimestamp)
}
cfg.Size = int64(len(cfg.Content))
@ -126,7 +129,11 @@ func dynamicAgentConfigProvider(instance model.Instance) []*common.ConfigFile {
return result
}
func getAgentIngestConfigs(items map[string]BindingItem) string {
func getAgentIngestConfigs(instance string, items map[string]BindingItem) (string, string) {
if instance == "" {
panic("instance id is empty")
}
buffer := bytes.NewBuffer([]byte("configs.template: "))
@ -134,16 +141,18 @@ func getAgentIngestConfigs(items map[string]BindingItem) string {
newItems := []util.KeyValue{}
for k, v := range items {
newItems = append(newItems, util.KeyValue{Key: k, Value: v.Updated,Payload: v})
newItems = append(newItems, util.KeyValue{Key: k, Value: v.Updated, Payload: v})
}
newItems=util.SortKeyValueArray(newItems,false)
newItems = util.SortKeyValueArray(newItems, false)
var latestVersion int64
for _, x := range newItems {
v,ok:=x.Payload.(BindingItem)
if !ok{continue}
v, ok := x.Payload.(BindingItem)
if !ok {
continue
}
if v.ClusterID == "" {
panic("cluster id is empty")
@ -188,11 +197,13 @@ func getAgentIngestConfigs(items map[string]BindingItem) string {
"NODE_LOGS_PATH: \"%v\"\n\n\n", v.NodeUUID, v.ClusterID, clusterEndPoint, username, password, clusterLevelEnabled, nodeLevelEnabled, v.PathLogs)))
}
hash := util.MD5digest(buffer.String())
//password: $[[keystore.$[[CLUSTER_ID]]_password]]
buffer.WriteString("\n")
buffer.WriteString(fmt.Sprintf("#MANAGED_CONFIG_VERSION: %v\n#MANAGED: true\n", latestVersion))
return buffer.String()
return buffer.String(), hash
}
const LastAgentHash = "last_agent_hash"