From e3c10817efe0386be27a7cb50717db53a4e9a10a Mon Sep 17 00:00:00 2001 From: medcl Date: Tue, 17 Oct 2023 11:26:53 +0800 Subject: [PATCH] refactoring enroll/revoke nodes --- modules/agent/api/elasticsearch.go | 216 ++++++++++++++++------------- modules/agent/api/host.go | 41 +++--- modules/agent/api/init.go | 5 +- modules/agent/api/tod.go | 62 ++++++--- 4 files changed, 186 insertions(+), 138 deletions(-) diff --git a/modules/agent/api/elasticsearch.go b/modules/agent/api/elasticsearch.go index dc5dfb4c..b3439e13 100644 --- a/modules/agent/api/elasticsearch.go +++ b/modules/agent/api/elasticsearch.go @@ -17,35 +17,12 @@ import ( "time" ) -//func (h *APIHandler) refreshESNodesInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { -// id := ps.MustGetParameter("instance_id") -// obj := model.Instance{} -// obj.ID = id -// exists, err := orm.Get(&obj) -// if !exists || err != nil { -// h.WriteJSON(w, util.MapStr{ -// "_id": id, -// "found": false, -// }, http.StatusNotFound) -// return -// } -// _, err = refreshNodesInfo(&obj) -// if err != nil { -// log.Error(err) -// h.WriteError(w, err.Error(), http.StatusInternalServerError) -// return -// } -// h.WriteAckOKJSON(w) -//} - -func refreshNodesInfo(inst *model.Instance) ([]*model.ESNodeInfo, error) { - oldNodesInfo, err := getNodesBindingToAgent(inst) +func refreshNodesInfo(inst *model.Instance) (*elastic.DiscoveryResult, error) { + oldNodesInfo, err := getEnrolledNodesByAgent(inst) if err != nil { return nil, fmt.Errorf("error on get binding nodes info: %w", err) } - log.Error("oldNodesInfo:",util.MustToJSON(oldNodesInfo)) - ctxTimeout, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() nodesInfo, err := GetElasticsearchNodesViaAgent(ctxTimeout, inst) @@ -54,32 +31,65 @@ func refreshNodesInfo(inst *model.Instance) ([]*model.ESNodeInfo, error) { return nil, fmt.Errorf("error on get nodes info from agent: %w", err) } - log.Error("nodesInfo:",util.MustToJSON(nodesInfo)) - - for _, node := range nodesInfo { - v,ok:=oldNodesInfo[node.NodeUUID] - if ok{ - node.ClusterID=v.ClusterID - } + for nodeID, node := range nodesInfo.Nodes { + v, ok := oldNodesInfo[nodeID] + if ok { + node.ClusterID = v.ClusterID + node.Enrolled = true + } } + ////not recognized by agent, need auth? + //for _, node := range nodesInfo.UnknownProcess{ + // for _, v := range node.ListenAddresses { + // //ask user to manual enroll this node + // //check local credentials, if it works, get node info + // } + //} + + // { + // //node was not recognized by agent, need auth? + // if node.HttpPort != "" { + // for _, v := range oldNodesInfo { + // if v.PublishAddress != "" { + // if util.UnifyLocalAddress(v.PublishAddress) == util.UnifyLocalAddress(node.PublishAddress) { + // node.ClusterID = v.ClusterID + // node.ClusterName = v.ClusterName + // node.NodeUUID = v.NodeUUID + // node.ClusterUuid = v.ClusterUUID + // node.NodeName = v.NodeName + // node.Path.Home = v.PathHome + // node.Path.Logs = v.PathLogs + // node.AgentID = inst.ID + // //TODO verify node info if the node id really match, need to fetch the credentials for agent + // //or let manager sync configs to this agent, verify the node info after receiving the configs + // //report any error along with this agent and node info + // break + // } + // } + // } + // } + // + //} + return nodesInfo, nil } //get nodes info via agent -func GetElasticsearchNodesViaAgent(ctx context.Context, instance *model.Instance) ([]*model.ESNodeInfo, error) { +func GetElasticsearchNodesViaAgent(ctx context.Context, instance *model.Instance) (*elastic.DiscoveryResult, error) { req := &util.Request{ Method: http.MethodGet, Path: "/elasticsearch/nodes/_discovery", Context: ctx, } - resBody := []*model.ESNodeInfo{} - err := doRequest(instance, req, &resBody) + + obj := elastic.DiscoveryResult{} + err := doRequest(instance, req, &obj) if err != nil { return nil, err } - return resBody, nil + return &obj, nil } func AuthESNode(ctx context.Context, agentBaseURL string, cfg elastic.ElasticsearchConfig) (*model.ESNodeInfo, error) { @@ -114,13 +124,20 @@ func getNodeByPidOrUUID(nodes map[int]*model.ESNodeInfo, pid int, uuid string, p } type BindingItem struct { - ClusterID string `json:"cluster_id"` - ClusterUUID string `json:"cluster_uuid"` - NodeUUID string `json:"node_uuid"` + ClusterName string `json:"cluster_name"` + ClusterUUID string `json:"cluster_uuid"` + NodeUUID string `json:"node_uuid"` + PublishAddress string `json:"publish_address"` + NodeName string `json:"node_name"` + PathLogs string `json:"path_logs"` + PathHome string `json:"path_home"` + + //infini system assigned id + ClusterID string `json:"cluster_id"` } //node -> binding item -func getNodesBindingToAgent(instance *model.Instance) (map[string]BindingItem, error) { +func getEnrolledNodesByAgent(instance *model.Instance) (map[string]BindingItem, error) { //get nodes settings where agent id = instance id q := orm.Query{ @@ -141,23 +158,22 @@ func getNodesBindingToAgent(instance *model.Instance) (map[string]BindingItem, e for _, row := range result.Result { v, ok := row.(map[string]interface{}) if ok { - x, ok := v["metadata"] + x, ok := v["payload"] if ok { - y, ok := x.(map[string]interface{}) + f, ok := x.(map[string]interface{}) if ok { - e, ok := y["labels"] + nodeID, ok := f["node_uuid"].(string) if ok { - f, ok := e.(map[string]interface{}) - if ok { - nodeID, ok := f["node_uuid"].(string) - if ok { - item := BindingItem{} - item.ClusterID = f["cluster_id"].(string) - item.ClusterUUID = f["cluster_uuid"].(string) - item.NodeUUID = nodeID - ids[item.NodeUUID] = item - } - } + item := BindingItem{} + item.ClusterID = util.ToString(f["cluster_id"]) + item.ClusterName = util.ToString(f["cluster_name"]) + item.ClusterUUID = util.ToString(f["cluster_uuid"]) + item.PublishAddress = util.ToString(f["publish_address"]) + item.NodeName = util.ToString(f["node_name"]) + item.PathHome = util.ToString(f["path_home"]) + item.PathLogs = util.ToString(f["path_logs"]) + item.NodeUUID = nodeID + ids[item.NodeUUID] = item } } } @@ -248,7 +264,7 @@ func GetElasticLogFileContent(ctx context.Context, instance *model.Instance, bod func (h *APIHandler) getLogFilesByNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { nodeID := ps.MustGetParameter("node_id") - inst, node, err := getAgentByNodeID(nodeID) + inst, pathLogs, err := getAgentByNodeID(nodeID) if err != nil { h.WriteError(w, err.Error(), http.StatusInternalServerError) log.Error(err) @@ -262,7 +278,7 @@ func (h *APIHandler) getLogFilesByNode(w http.ResponseWriter, req *http.Request, }, http.StatusOK) return } - logFiles, err := GetElasticLogFiles(nil, inst, node.Path.Logs) + logFiles, err := GetElasticLogFiles(nil, inst, pathLogs) if err != nil { h.WriteError(w, err.Error(), http.StatusInternalServerError) log.Error(err) @@ -276,7 +292,7 @@ func (h *APIHandler) getLogFilesByNode(w http.ResponseWriter, req *http.Request, func (h *APIHandler) getLogFileContent(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { nodeID := ps.MustGetParameter("node_id") - inst, node, err := getAgentByNodeID(nodeID) + inst, pathLogs, err := getAgentByNodeID(nodeID) if err != nil { h.WriteError(w, err.Error(), http.StatusInternalServerError) log.Error(err) @@ -299,7 +315,7 @@ func (h *APIHandler) getLogFileContent(w http.ResponseWriter, req *http.Request, log.Error(err) return } - reqBody.LogsPath = node.Path.Logs + reqBody.LogsPath = pathLogs res, err := GetElasticLogFileContent(nil, inst, reqBody) if err != nil { h.WriteError(w, err.Error(), http.StatusInternalServerError) @@ -309,45 +325,57 @@ func (h *APIHandler) getLogFileContent(w http.ResponseWriter, req *http.Request, h.WriteJSON(w, res, http.StatusOK) } -func getAgentByNodeID(nodeID string) (*model.Instance, *model.ESNodeInfo, error) { - queryDsl := util.MapStr{ - "size": 1, - "query": util.MapStr{ - "term": util.MapStr{ - "node_uuid": util.MapStr{ - "value": nodeID, - }, - }, - }, - "sort": []util.MapStr{ - { - "timestamp": util.MapStr{ - "order": "desc", - }, - }, - }, +//instance, pathLogs +func getAgentByNodeID(nodeID string) (*model.Instance, string, error) { + + q := orm.Query{ + Size: 1000, + Conds: orm.And(orm.Eq("metadata.category", "node_settings"), + orm.Eq("metadata.name", "agent"), + orm.Eq("payload.node_uuid", nodeID), + ), } - q := &orm.Query{ - RawQuery: util.MustToJSONBytes(queryDsl), - } - err, result := orm.Search(model.ESNodeInfo{}, q) + + err, result := orm.Search(model.Setting{}, &q) if err != nil { - return nil, nil, err + return nil, "", err } - if len(result.Result) > 0 { - buf := util.MustToJSONBytes(result.Result[0]) - v := &model.ESNodeInfo{} - err = util.FromJSONBytes(buf, v) - inst := &model.Instance{} - inst.ID = v.AgentID - _, err = orm.Get(inst) - if err != nil { - return nil, v, err + + for _, row := range result.Result { + v, ok := row.(map[string]interface{}) + if ok { + pathLogs := "" + payload, ok := v["payload"] + if ok { + payloadMap, ok := payload.(map[string]interface{}) + if ok { + pathLogs = util.ToString(payloadMap["path_logs"]) + } + } + + x, ok := v["metadata"] + if ok { + f, ok := x.(map[string]interface{}) + if ok { + labels, ok := f["labels"].(map[string]interface{}) + if ok { + id, ok := labels["agent_id"] + if ok { + inst := &model.Instance{} + inst.ID = util.ToString(id) + _, err = orm.Get(inst) + if err != nil { + return nil, pathLogs, err + } + if inst.Name == "" { + return nil, pathLogs, nil + } + return inst, pathLogs, nil + } + } + } + } } - if inst.Name == "" { - return nil, v, nil - } - return inst, v, nil } - return nil, nil, nil + return nil, "", nil } diff --git a/modules/agent/api/host.go b/modules/agent/api/host.go index 16419022..80f8b197 100644 --- a/modules/agent/api/host.go +++ b/modules/agent/api/host.go @@ -5,7 +5,6 @@ package api import ( - "context" "fmt" log "github.com/cihub/seelog" httprouter "infini.sh/framework/core/api/router" @@ -197,26 +196,26 @@ func (h *APIHandler) GetHostElasticProcess(w http.ResponseWriter, req *http.Requ return } - esNodesInfo, err := GetElasticsearchNodesViaAgent(context.Background(), &obj) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - var processes []util.MapStr - for _, node := range esNodesInfo { - processes = append(processes, util.MapStr{ - "pid": node.ProcessInfo.PID, - "pid_status": node.ProcessInfo.Status, - "cluster_name": node.ClusterName, - "cluster_uuid": node.ClusterUuid, - "cluster_id": node.ClusterID, - "node_id": node.NodeUUID, - "node_name": node.NodeName, - "uptime_in_ms": time.Now().UnixMilli() - node.ProcessInfo.CreateTime, - }) - } + //esNodesInfo, err := GetElasticsearchNodesViaAgent(context.Background(), &obj) + //if err != nil { + // log.Error(err) + // h.WriteError(w, err.Error(), http.StatusInternalServerError) + // return + //} + //var processes []util.MapStr + //for _, node := range esNodesInfo { + // processes = append(processes, util.MapStr{ + // "pid": node.ProcessInfo.PID, + // "pid_status": node.ProcessInfo.Status, + // "cluster_name": node.ClusterName, + // "cluster_uuid": node.ClusterUuid, + // "cluster_id": node.ClusterID, + // "node_id": node.NodeUUID, + // "node_name": node.NodeName, + // "uptime_in_ms": time.Now().UnixMilli() - node.ProcessInfo.CreateTime, + // }) + //} h.WriteJSON(w, util.MapStr{ - "elastic_processes": processes, + //"elastic_processes": processes, }, http.StatusOK) } \ No newline at end of file diff --git a/modules/agent/api/init.go b/modules/agent/api/init.go index 209c514a..166da6e6 100644 --- a/modules/agent/api/init.go +++ b/modules/agent/api/init.go @@ -16,7 +16,10 @@ func Init() { api.HandleAPIMethod(api.GET, "/host/:host_id/processes", handler.GetHostElasticProcess) api.HandleAPIMethod(api.DELETE, "/host/:host_id", handler.deleteHost) - api.HandleAPIMethod(api.POST, "/instance/:instance_id/node/_associate", handler.RequirePermission(handler.associateESNode, enum.PermissionAgentInstanceWrite)) + //bind agent with nodes + api.HandleAPIMethod(api.POST, "/instance/:instance_id/node/_enroll", handler.RequirePermission(handler.enrollESNode, enum.PermissionAgentInstanceWrite)) + api.HandleAPIMethod(api.POST, "/instance/:instance_id/node/_revoke", handler.RequirePermission(handler.revokeESNode, enum.PermissionAgentInstanceWrite)) + api.HandleAPIMethod(api.POST, "/auto_associate", handler.RequirePermission(handler.autoAssociateESNode, enum.PermissionAgentInstanceWrite)) //api.HandleAPIMethod(api.POST, "/instance/:instance_id/node/_auth", handler.RequirePermission(handler.authESNode, enum.PermissionAgentInstanceWrite)) diff --git a/modules/agent/api/tod.go b/modules/agent/api/tod.go index 72827ef4..38811fef 100644 --- a/modules/agent/api/tod.go +++ b/modules/agent/api/tod.go @@ -183,7 +183,7 @@ func (h *APIHandler) authESNode(w http.ResponseWriter, req *http.Request, ps htt h.WriteJSON(w, fmt.Sprintf("node [%s] of agent [%s] was not found", oldNodeInfo.ID, inst.Name), http.StatusInternalServerError) return } - }else{ + } else { //find out the node id with credentials cfg := reqBody.ESConfig if cfg.Endpoint == "" { @@ -222,10 +222,9 @@ func (h *APIHandler) authESNode(w http.ResponseWriter, req *http.Request, ps htt // return //} - reqBody.NodeID=nodeInfo.NodeUUID + reqBody.NodeID = nodeInfo.NodeUUID } - //nodeInfo:=elastic.NodeConfig{} //nodeInfo.ID = reqBody.NodeID //nodeInfo.AgentID = inst.ID @@ -253,7 +252,7 @@ func NewClusterSettings(clusterID string) *model.Setting { return &settings } -func NewNodeAgentSettings(clusterID, clusterUUID, nodeUUID, agentID, agentCredential string) *model.Setting { +func NewNodeAgentSettings(instanceID string, item *BindingItem) *model.Setting { settings := model.Setting{ Metadata: model.SettingsMetadata{ @@ -261,14 +260,21 @@ func NewNodeAgentSettings(clusterID, clusterUUID, nodeUUID, agentID, agentCreden Name: "agent", }, } - settings.ID = fmt.Sprintf("%v_%v_%v", settings.Metadata.Category, settings.Metadata.Name, nodeUUID) + settings.ID = fmt.Sprintf("%v_%v_%v", settings.Metadata.Category, settings.Metadata.Name, item.NodeUUID) settings.Metadata.Labels = util.MapStr{ - "cluster_id": clusterID, - "cluster_uuid": clusterUUID, - "node_uuid": nodeUUID, - "agent_id": agentID, - "agent_credential": agentCredential, + "agent_id": instanceID, + } + + settings.Payload = util.MapStr{ + "cluster_id": item.ClusterID, + "cluster_name": item.ClusterName, + "cluster_uuid": item.ClusterUUID, + "node_uuid": item.NodeUUID, + "publish_address": item.PublishAddress, + "node_name": item.NodeName, + "path_home": item.PathHome, + "path_logs": item.PathLogs, } return &settings @@ -298,33 +304,45 @@ const Cluster = "cluster_settings" const Node = "node_settings" const Index = "index_settings" -func (h *APIHandler) associateESNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { +func (h *APIHandler) revokeESNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + //agent id + instID := ps.MustGetParameter("instance_id") + item := BindingItem{} + err := h.DecodeJSON(req, &item) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + settings := NewNodeAgentSettings(instID, &item) + err = orm.Delete(&orm.Context{ + Refresh: "wait_for", + }, settings) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } +} + +func (h *APIHandler) enrollESNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { //agent id instID := ps.MustGetParameter("instance_id") //node id and cluster id - reqBody := struct { - ClusterUUID string `json:"cluster_uuid"` - NodeUUID string `json:"node_uuid"` - - //infini system assigned id - ClusterID string `json:"cluster_id"` - }{} - err := h.DecodeJSON(req, &reqBody) + item := BindingItem{} + err := h.DecodeJSON(req, &item) if err != nil { - log.Error(err) h.WriteError(w, err.Error(), http.StatusInternalServerError) return } //update node's setting - settings := NewNodeAgentSettings(reqBody.ClusterID, reqBody.ClusterUUID, reqBody.NodeUUID, instID, "node.AgentCredential") + settings := NewNodeAgentSettings(instID, &item) err = orm.Update(&orm.Context{ Refresh: "wait_for", }, settings) if err != nil { - log.Error(err) h.WriteError(w, err.Error(), http.StatusInternalServerError) return }