From 2b77b42512cf12128a4a4904062db7e6de42f05c Mon Sep 17 00:00:00 2001 From: liugq Date: Wed, 24 May 2023 14:23:50 +0800 Subject: [PATCH] split refresh es process list and associate it with registered cluster in console --- modules/agent/api/init.go | 2 + modules/agent/api/instance.go | 155 ++++++++++++++++++++++++--------- modules/agent/common/client.go | 3 + modules/agent/common/state.go | 9 +- 4 files changed, 127 insertions(+), 42 deletions(-) diff --git a/modules/agent/api/init.go b/modules/agent/api/init.go index 84fe0c9a..24b02e94 100644 --- a/modules/agent/api/init.go +++ b/modules/agent/api/init.go @@ -21,6 +21,8 @@ func Init() { api.HandleAPIMethod(api.GET, "/agent/instance/:instance_id/_nodes", handler.RequirePermission(handler.getESNodesInfo, enum.PermissionAgentInstanceRead)) api.HandleAPIMethod(api.POST, "/agent/instance/:instance_id/_nodes/_refresh", handler.RequirePermission(handler.refreshESNodesInfo, enum.PermissionAgentInstanceWrite)) api.HandleAPIMethod(api.POST, "/agent/instance/:instance_id/node/_auth", handler.RequirePermission(handler.authESNode, enum.PermissionAgentInstanceWrite)) + api.HandleAPIMethod(api.DELETE, "/agent/instance/:instance_id/_nodes", handler.RequirePermission(handler.deleteESNode, enum.PermissionAgentInstanceWrite)) + api.HandleAPIMethod(api.POST, "/agent/instance/:instance_id/node/_associate", handler.RequirePermission(handler.associateESNode, enum.PermissionAgentInstanceWrite)) api.HandleAPIMethod(api.POST, "/host/_enroll", handler.enrollHost) api.HandleAPIMethod(api.GET, "/host/:host_id/agent/info",handler.GetHostAgentInfo) diff --git a/modules/agent/api/instance.go b/modules/agent/api/instance.go index 05b6fb03..63e935f9 100644 --- a/modules/agent/api/instance.go +++ b/modules/agent/api/instance.go @@ -418,27 +418,6 @@ func (h *APIHandler) authESNode(w http.ResponseWriter, req *http.Request, ps htt } nodeInfo.ID = oldNodeInfo.ID nodeInfo.AgentID = inst.ID - clusterCfgs := getClusterConfigs() - if nodeInfo.ClusterUuid != "" && clusterCfgs[nodeInfo.ClusterUuid] != nil { - nodeInfo.ClusterID = clusterCfgs[nodeInfo.ClusterUuid].ID - settings, err := common2.GetAgentSettings(inst.ID, 0) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - setting := pickAgentSettings(settings, *nodeInfo) - if setting == nil { - setting, err = getAgentTaskSetting(inst.ID, *nodeInfo) - if err != nil { - log.Error("get agent task setting error: ", err) - } - err = orm.Create(nil, setting) - if err != nil { - log.Error("save agent task setting error: ", err) - } - } - } err = orm.Save(nil, nodeInfo) if err != nil { log.Error(err) @@ -448,6 +427,102 @@ func (h *APIHandler) authESNode(w http.ResponseWriter, req *http.Request, ps htt h.WriteJSON(w, nodeInfo, http.StatusOK) } +func (h *APIHandler) associateESNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + instID := ps.MustGetParameter("instance_id") + reqBody := struct { + ID string `json:"id"` + ClusterID string `json:"cluster_id"` + }{} + err := h.DecodeJSON(req, &reqBody) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + node := agent.ESNodeInfo{ + ID: reqBody.ID, + } + _, err = orm.Get(&node) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + if node.AgentID != instID { + errStr := fmt.Sprintf("agent id not match: %s, %s", node.AgentID, instID) + log.Error(errStr) + h.WriteError(w, errStr, http.StatusInternalServerError) + return + } + node.ClusterID = reqBody.ClusterID + err = orm.Save(&orm.Context{ + Refresh: "wait_for", + }, node) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + settings, err := common2.GetAgentSettings(instID, 0) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + setting := pickAgentSettings(settings, node) + if setting == nil { + setting, err = getAgentTaskSetting(instID, node) + if err != nil { + log.Error("get agent task setting error: ", err) + } + err = orm.Create(nil, setting) + if err != nil { + log.Error("save agent task setting error: ", err) + } + } + h.WriteAckOKJSON(w) +} + +func (h *APIHandler) deleteESNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + id := ps.MustGetParameter("instance_id") + nodeIDs := []string{} + err := h.DecodeJSON(req, &nodeIDs) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + if len(nodeIDs) > 0 { + q := util.MapStr{ + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "terms": util.MapStr{ + "id": nodeIDs, + }, + }, + { + "term": util.MapStr{ + "agent_id": util.MapStr{ + "value": id, + }, + }, + }, + }, + }, + }, + } + err = orm.DeleteBy(agent.ESNodeInfo{}, util.MustToJSONBytes(q)) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + } + h.WriteAckOKJSON(w) +} + func refreshNodesInfo(inst *agent.Instance) ([]agent.ESNodeInfo, error) { nodesInfo, err := common2.GetClient().GetElasticsearchNodes(context.Background(), inst.GetEndpoint()) if err != nil { @@ -457,10 +532,9 @@ func refreshNodesInfo(inst *agent.Instance) ([]agent.ESNodeInfo, error) { if err != nil { return nil, fmt.Errorf("get elasticsearch nodes info from es error: %w", err) } - clusterCfgs := getClusterConfigs() oldPids := map[int]struct{}{} var resultNodes []agent.ESNodeInfo - settings, err := common2.GetAgentSettings(inst.ID, 0) + //settings, err := common2.GetAgentSettings(inst.ID, 0) if err != nil { return nil, err } @@ -480,24 +554,25 @@ func refreshNodesInfo(inst *agent.Instance) ([]agent.ESNodeInfo, error) { if node.ClusterUuid != "" { if oldNode != nil && oldNode.ClusterID != "" { node.ClusterID = oldNode.ClusterID - }else{ - if cfg := clusterCfgs[node.ClusterUuid]; cfg != nil { - node.ClusterID = cfg.ID - setting := pickAgentSettings(settings, node) - if setting == nil { - setting, err = getAgentTaskSetting(inst.ID, node) - if err != nil { - log.Error() - } - err = orm.Create(nil, setting) - if err != nil { - log.Error("save agent task setting error: ", err) - } - } - }else{ - //cluster not registered in console - } } + //else{ + // if cfg := clusterCfgs[node.ClusterUuid]; cfg != nil { + // node.ClusterID = cfg.ID + // setting := pickAgentSettings(settings, node) + // if setting == nil { + // setting, err = getAgentTaskSetting(inst.ID, node) + // if err != nil { + // log.Error() + // } + // err = orm.Create(nil, setting) + // if err != nil { + // log.Error("save agent task setting error: ", err) + // } + // } + // }else{ + // //cluster not registered in console + // } + //} } node.Status = "online" diff --git a/modules/agent/common/client.go b/modules/agent/common/client.go index 249c71d9..704fdda2 100644 --- a/modules/agent/common/client.go +++ b/modules/agent/common/client.go @@ -196,5 +196,8 @@ func (client *Client) doRequest(req *util.Request, respObj interface{}) error { if result.StatusCode != 200 { return fmt.Errorf(string(result.Body)) } + if respObj == nil { + return nil + } return util.FromJSONBytes(result.Body, respObj) } diff --git a/modules/agent/common/state.go b/modules/agent/common/state.go index 4954c7ae..de26a0ee 100644 --- a/modules/agent/common/state.go +++ b/modules/agent/common/state.go @@ -18,6 +18,7 @@ import ( "infini.sh/framework/core/util" "runtime" "runtime/debug" + "strings" "sync" "time" ) @@ -87,6 +88,7 @@ func (sm *StateManager) checkAgentStatus() { } } // status change to offline + // todo validate whether agent is offline sm.agentIds[agentID] = StatusOffline sm.workerChan <- struct{}{} go func(agentID string) { @@ -148,9 +150,12 @@ func (sm *StateManager) syncSettings(agentID string) { for _, pipelineID := range parseResult.ToDeletePipelineNames { err = agClient.DeletePipeline(context.Background(), ag.GetEndpoint(), pipelineID) if err != nil { - log.Errorf("delete pipeline error: %v", err) - continue + if !strings.Contains(err.Error(), "not found") { + log.Errorf("delete pipeline error: %v", err) + continue + } } + //todo update delete pipeline state } for _, pipeline := range parseResult.Pipelines { err = agClient.CreatePipeline(context.Background(), ag.GetEndpoint(), util.MustToJSONBytes(pipeline))