diff --git a/modules/agent/api/elasticsearch.go b/modules/agent/api/elasticsearch.go index 77463934..0def2e83 100644 --- a/modules/agent/api/elasticsearch.go +++ b/modules/agent/api/elasticsearch.go @@ -52,12 +52,12 @@ func GetEnrolledNodesByAgent(instance *model.Instance) (map[string]BindingItem, if ok { item := BindingItem{} item.ClusterID = util.ToString(f["cluster_id"]) - item.ClusterName = util.ToString(f["cluster_name"]) + //item.ClusterName = util.ToString(f["cluster_name"]) item.ClusterUUID = util.ToString(f["cluster_uuid"]) - item.PublishAddress = util.ToString(f["publish_address"]) - item.NodeName = util.ToString(f["node_name"]) - item.PathHome = util.ToString(f["path_home"]) - item.PathLogs = util.ToString(f["path_logs"]) + //item.PublishAddress = util.ToString(f["publish_address"]) + //item.NodeName = util.ToString(f["node_name"]) + //item.PathHome = util.ToString(f["path_home"]) + //item.PathLogs = util.ToString(f["path_logs"]) item.NodeUUID = nodeID t, ok := v["updated"] @@ -108,7 +108,7 @@ func refreshNodesInfo(inst *model.Instance) (*elastic.DiscoveryResult, error) { } } - var findPIDS=map[int]*elastic.NodesInfo{} + var findPIDS = map[int]*elastic.NodesInfo{} //TODO, merge requests to one for k, v := range enrolledNodesByAgent { @@ -122,33 +122,38 @@ func refreshNodesInfo(inst *model.Instance) (*elastic.DiscoveryResult, error) { status := "online" nodeInfo, err := client.GetNodeInfo(v.NodeUUID) var clusterInfo *elastic.ClusterInformation - if err != nil ||nodeInfo == nil { - status= "offline" + if err != nil || nodeInfo == nil { + status = "offline" //get nodes information - nodeInfos, err := metadata.GetNodeInformation(v.ClusterID,[]string{v.NodeUUID}) - if err!=nil||len(nodeInfos)==0{ - log.Error("node info not found:",v.ClusterID,",",[]string{v.NodeUUID},",",err,err!=nil,len(nodeInfos)==0) + nodeInfos, err := metadata.GetNodeInformation(v.ClusterID, []string{v.NodeUUID}) + if err != nil || len(nodeInfos) == 0 { + log.Error("node info not found:", v.ClusterID, ",", []string{v.NodeUUID}, ",", err, err != nil, len(nodeInfos) == 0) continue } - nodeInfo=nodeInfos[0] + //get node information + nodeInfo, ok = nodeInfos[v.NodeUUID] + if !ok { + log.Error("node info not found:", v.ClusterID, ",", v.NodeUUID, ",", err) + continue + } //get cluster information - clusterInfo,err=metadata.GetClusterInformation(v.ClusterID) - if err!=nil||clusterInfo==nil{ - log.Error("cluster info not found:",v.ClusterID,",",err,clusterInfo==nil) + clusterInfo, err = metadata.GetClusterInformation(v.ClusterID) + if err != nil || clusterInfo == nil { + log.Error("cluster info not found:", v.ClusterID, ",", err, clusterInfo == nil) continue } - }else{ + } else { clusterInfo, err = adapter.ClusterVersion(elastic.GetMetadata(v.ClusterID)) - if err != nil || clusterInfo == nil{ + if err != nil || clusterInfo == nil { log.Error(err) continue } } - findPIDS[nodeInfo.Process.Id]=nodeInfo + findPIDS[nodeInfo.Process.Id] = nodeInfo newNodes[k] = &elastic.LocalNodeInfo{ Status: status, @@ -162,14 +167,14 @@ func refreshNodesInfo(inst *model.Instance) (*elastic.DiscoveryResult, error) { } nodesInfo.Nodes = newNodes - newUnknows:=[]model.ProcessInfo{} - for _,v:=range nodesInfo.UnknownProcess{ + newUnknows := []model.ProcessInfo{} + for _, v := range nodesInfo.UnknownProcess { - if _,ok:=findPIDS[v.PID];!ok{ - newUnknows=append(newUnknows,v) + if _, ok := findPIDS[v.PID]; !ok { + newUnknows = append(newUnknows, v) } } - nodesInfo.UnknownProcess=newUnknows + nodesInfo.UnknownProcess = newUnknows return nodesInfo, nil } @@ -191,17 +196,18 @@ func GetElasticsearchNodesViaAgent(ctx context.Context, instance *model.Instance } type BindingItem struct { - ClusterName string `json:"cluster_name"` - ClusterUUID string `json:"cluster_uuid"` - NodeUUID string `json:"node_uuid"` - PublishAddress string `json:"publish_address"` - NodeName string `json:"node_name"` - PathLogs string `json:"path_logs"` - PathHome string `json:"path_home"` + ClusterID string `json:"cluster_id"` + ClusterUUID string `json:"cluster_uuid"` + NodeUUID string `json:"node_uuid"` + + //PublishAddress string `json:"publish_address"` + //NodeName string `json:"node_name"` + //PathLogs string `json:"path_logs"` + //PathHome string `json:"path_home"` + //ClusterName string `json:"cluster_name"` //infini system assigned id - ClusterID string `json:"cluster_id"` - Updated int64 `json:"updated"` + Updated int64 `json:"updated"` } func GetElasticLogFiles(ctx context.Context, instance *model.Instance, logsPath string) (interface{}, error) { @@ -255,8 +261,9 @@ func GetElasticLogFileContent(ctx context.Context, instance *model.Instance, bod } func (h *APIHandler) getLogFilesByNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + clusterID := ps.MustGetParameter("id") nodeID := ps.MustGetParameter("node_id") - inst, pathLogs, err := getAgentByNodeID(nodeID) + inst, pathLogs, err := getAgentByNodeID(clusterID, nodeID) if err != nil { h.WriteError(w, err.Error(), http.StatusInternalServerError) log.Error(err) @@ -283,8 +290,9 @@ func (h *APIHandler) getLogFilesByNode(w http.ResponseWriter, req *http.Request, } func (h *APIHandler) getLogFileContent(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + clusterID := ps.MustGetParameter("id") nodeID := ps.MustGetParameter("node_id") - inst, pathLogs, err := getAgentByNodeID(nodeID) + inst, pathLogs, err := getAgentByNodeID(clusterID, nodeID) if err != nil { h.WriteError(w, err.Error(), http.StatusInternalServerError) log.Error(err) @@ -318,12 +326,13 @@ func (h *APIHandler) getLogFileContent(w http.ResponseWriter, req *http.Request, } //instance, pathLogs -func getAgentByNodeID(nodeID string) (*model.Instance, string, error) { +func getAgentByNodeID(clusterID, nodeID string) (*model.Instance, string, error) { q := orm.Query{ Size: 1000, Conds: orm.And(orm.Eq("metadata.category", "node_settings"), orm.Eq("metadata.name", "agent"), + orm.Eq("payload.cluster_id", clusterID), orm.Eq("payload.node_uuid", nodeID), ), } @@ -333,17 +342,25 @@ func getAgentByNodeID(nodeID string) (*model.Instance, string, error) { return nil, "", err } + nodeInfo,err:=metadata.GetNodeConfig(clusterID, nodeID) + if err!=nil||nodeInfo==nil{ + log.Error("node info is nil") + return nil, "", err + } + + pathLogs := nodeInfo.Payload.NodeInfo.GetPathLogs() + for _, row := range result.Result { v, ok := row.(map[string]interface{}) if ok { - pathLogs := "" - payload, ok := v["payload"] - if ok { - payloadMap, ok := payload.(map[string]interface{}) - if ok { - pathLogs = util.ToString(payloadMap["path_logs"]) - } - } + //payload, ok := v["payload"] + //if ok { + // payloadMap, ok := payload.(map[string]interface{}) + // if ok { + // pathLogs = util.ToString(payloadMap["path_logs"]) + // } + //} + x, ok := v["metadata"] if ok { @@ -417,6 +434,15 @@ func (h *APIHandler) discoveryESNodesInfo(w http.ResponseWriter, req *http.Reque for _, clusterID := range clusterInfo.ClusterIDs { meta := elastic.GetMetadata(clusterID) if meta != nil { + + states,err:=elastic.GetClient(clusterID).GetClusterState() + if err!=nil||states==nil{ + log.Error(err) + continue + } + + clusterUUID:=states.ClusterUUID + if meta.Config.AgentCredentialID != "" { auth, err := common.GetAgentBasicAuth(meta.Config) if err != nil { @@ -438,8 +464,34 @@ func (h *APIHandler) discoveryESNodesInfo(w http.ResponseWriter, req *http.Reque } if success { - log.Debug("connect to es node success:", nodeHost,", pid: ",node.PID) + log.Debug("connect to es node success:", nodeHost, ", pid: ", node.PID) discoveredPIDs[node.PID] = nodeInfo + + + if nodeInfo.ClusterInfo.ClusterUUID!=clusterUUID{ + log.Error("cluster uuid not match, cluster id: ", clusterID, ", cluster uuid: ", clusterUUID, ", node cluster uuid: ", nodeInfo.ClusterInfo.ClusterUUID) + continue + } + + //enroll this node + item := BindingItem{ + ClusterID: clusterID, + ClusterUUID: nodeInfo.ClusterInfo.ClusterUUID, + //ClusterName: nodeInfo.ClusterInfo.ClusterName, + //NodeName: nodeInfo.NodeInfo.Name, + NodeUUID: nodeInfo.NodeUUID, + //PathHome: nodeInfo.NodeInfo., + } + + settings := NewNodeAgentSettings(instance.ID, &item) + err = orm.Update(&orm.Context{ + Refresh: "wait_for", + }, settings) + + if err == nil { + nodeInfo.ClusterID = clusterID + nodeInfo.Enrolled = true + } break } } @@ -542,14 +594,15 @@ func NewNodeAgentSettings(instanceID string, item *BindingItem) *model.Setting { } settings.Payload = util.MapStr{ - "cluster_id": item.ClusterID, - "cluster_name": item.ClusterName, - "cluster_uuid": item.ClusterUUID, - "node_uuid": item.NodeUUID, - "publish_address": item.PublishAddress, - "node_name": item.NodeName, - "path_home": item.PathHome, - "path_logs": item.PathLogs, + "cluster_id": item.ClusterID, + "cluster_uuid": item.ClusterUUID, + "node_uuid": item.NodeUUID, + + //"cluster_name": item.ClusterName, + //"publish_address": item.PublishAddress, + //"node_name": item.NodeName, + //"path_home": item.PathHome, + //"path_logs": item.PathLogs, } return &settings diff --git a/modules/agent/api/init.go b/modules/agent/api/init.go index 5cb4328a..48cb6b8a 100644 --- a/modules/agent/api/init.go +++ b/modules/agent/api/init.go @@ -28,8 +28,6 @@ func Init() { api.HandleAPIMethod(api.POST, "/instance/:instance_id/node/_enroll", handler.RequirePermission(handler.enrollESNode, enum.PermissionAgentInstanceWrite)) api.HandleAPIMethod(api.POST, "/instance/:instance_id/node/_revoke", handler.RequirePermission(handler.revokeESNode, enum.PermissionAgentInstanceWrite)) - //api.HandleAPIMethod(api.POST, "/auto_associate", handler.RequirePermission(handler.autoAssociateESNode, enum.PermissionAgentInstanceWrite)) - //get elasticsearch node logs, direct fetch or via stored logs(TODO) api.HandleAPIMethod(api.GET, "/elasticsearch/:id/node/:node_id/logs/_list", handler.RequirePermission(handler.getLogFilesByNode, enum.PermissionAgentInstanceRead)) api.HandleAPIMethod(api.POST, "/elasticsearch/:id/node/:node_id/logs/_read", handler.RequirePermission(handler.getLogFileContent, enum.PermissionAgentInstanceRead)) diff --git a/modules/agent/api/remote_config.go b/modules/agent/api/remote_config.go index e8f903f6..3f8c663f 100644 --- a/modules/agent/api/remote_config.go +++ b/modules/agent/api/remote_config.go @@ -15,6 +15,7 @@ import ( "infini.sh/framework/core/orm" "infini.sh/framework/core/util" common2 "infini.sh/framework/modules/elastic/common" + metadata2 "infini.sh/framework/modules/elastic/metadata" "infini.sh/framework/plugins/managed/common" "time" ) @@ -192,12 +193,22 @@ func getAgentIngestConfigs(instance string, items map[string]BindingItem) (strin } } - if v.PublishAddress==""{ + nodeInfo,err:=metadata2.GetNodeConfig(v.ClusterID,v.NodeUUID) + if err!=nil{ + log.Error(err) + continue + } + + publishAddress:=nodeInfo.Payload.NodeInfo.GetHttpPublishHost() + + if publishAddress==""{ log.Errorf("publish address is empty: %v",v.NodeUUID) continue } - nodeEndPoint := metadata.PrepareEndpoint(v.PublishAddress) + nodeEndPoint := metadata.PrepareEndpoint(publishAddress) + + pathLogs:=nodeInfo.Payload.NodeInfo.GetPathLogs() if v.Updated > latestVersion { latestVersion = v.Updated @@ -217,7 +228,7 @@ func getAgentIngestConfigs(instance string, items map[string]BindingItem) (strin "CLUSTER_LEVEL_TASKS_ENABLED: %v\n "+ "NODE_LEVEL_TASKS_ENABLED: %v\n "+ "NODE_LOGS_PATH: \"%v\"\n\n\n", taskID, taskID, - v.ClusterID,v.ClusterUUID,v.NodeUUID, nodeEndPoint, username, password, clusterLevelEnabled, nodeLevelEnabled, v.PathLogs))) + v.ClusterID,v.ClusterUUID,v.NodeUUID, nodeEndPoint, username, password, clusterLevelEnabled, nodeLevelEnabled, pathLogs))) } hash := util.MD5digest(buffer.String())