diff --git a/modules/agent/api/instance.go b/modules/agent/api/instance.go index 2fca647a..79bdbaf0 100644 --- a/modules/agent/api/instance.go +++ b/modules/agent/api/instance.go @@ -625,7 +625,7 @@ func (h *APIHandler) associateESNode(w http.ResponseWriter, req *http.Request, p func (h *APIHandler) autoAssociateESNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { reqBody := struct { - ClusterID string `json:"cluster_id"` + ClusterIDs []string `json:"cluster_ids"` }{} err := h.DecodeJSON(req, &reqBody) if err != nil { @@ -633,14 +633,6 @@ func (h *APIHandler) autoAssociateESNode(w http.ResponseWriter, req *http.Reques h.WriteError(w, err.Error(), http.StatusInternalServerError) return } - // query cluster basicauth - cfg := elastic.GetConfig(reqBody.ClusterID) - basicAuth, err := common.GetBasicAuth(cfg) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } // query not associated nodes info nodesM, err := getUnAssociateNodes() if err != nil { @@ -662,109 +654,119 @@ func (h *APIHandler) autoAssociateESNode(w http.ResponseWriter, req *http.Reques h.WriteError(w, err.Error(), http.StatusInternalServerError) return } - taskSetting, err := getSettingsByClusterID(cfg.ID) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - for agentID, nodes := range nodesM { - var ( - inst *agent.Instance - ok bool - ) - if inst, ok = agents[agentID]; !ok { - log.Warnf("agent [%v] was not found", agentID) - continue - } - settings, err := common2.GetAgentSettings(agentID, 0) + for _, clusterID := range reqBody.ClusterIDs { + // query cluster basicauth + cfg := elastic.GetConfig(clusterID) + basicAuth, err := common.GetBasicAuth(cfg) if err != nil { log.Error(err) - continue + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return } - for _, node := range nodes { - host := node.PublishAddress - var endpoint string - if strings.HasPrefix( host, "::"){ - instURL, err := url.Parse(inst.Endpoint) - if err != nil { - log.Error(err) - continue - } - host = instURL.Hostname() - endpoint = fmt.Sprintf("%s://%s:%s", node.Schema, host, node.HttpPort) - }else{ - endpoint = fmt.Sprintf("%s://%s", node.Schema, host) - } - escfg := elastic.ElasticsearchConfig{ - Endpoint: endpoint, - BasicAuth: &basicAuth, - } - nodeInfo, err := client.GetClient().AuthESNode(context.Background(), inst.GetEndpoint(), escfg) - if err != nil { - log.Warn(err) + taskSetting, err := getSettingsByClusterID(cfg.ID) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + for agentID, nodes := range nodesM { + var ( + inst *agent.Instance + ok bool + ) + if inst, ok = agents[agentID]; !ok { + log.Warnf("agent [%v] was not found", agentID) continue } - //matched - if nodeInfo.ClusterUuid == cfg.ClusterUUID { - //update node info - nodeInfo.ID = node.ID - nodeInfo.AgentID = inst.ID - nodeInfo.ClusterID = cfg.ID - err = orm.Save(nil, nodeInfo) + settings, err := common2.GetAgentSettings(agentID, 0) + if err != nil { + log.Error(err) + continue + } + for _, node := range nodes { + host := node.PublishAddress + var endpoint string + if strings.HasPrefix(host, "::") { + instURL, err := url.Parse(inst.Endpoint) + if err != nil { + log.Error(err) + continue + } + host = instURL.Hostname() + endpoint = fmt.Sprintf("%s://%s:%s", node.Schema, host, node.HttpPort) + } else { + endpoint = fmt.Sprintf("%s://%s", node.Schema, host) + } + escfg := elastic.ElasticsearchConfig{ + Endpoint: endpoint, + BasicAuth: &basicAuth, + } + nodeInfo, err := client.GetClient().AuthESNode(context.Background(), inst.GetEndpoint(), escfg) if err != nil { - log.Error(err) + log.Warn(err) continue } - setting := pickAgentSettings(settings, node) - if setting == nil { - tsetting := model.TaskSetting{ - NodeStats: &model.NodeStatsTask{ - Enabled: true, - }, - Logs: &model.LogsTask{ - Enabled: true, - LogsPath: nodeInfo.Path.Logs, - }, - } - if taskSetting.IndexStats != nil { - tsetting.IndexStats = taskSetting.IndexStats - taskSetting.IndexStats = nil - } - if taskSetting.ClusterHealth != nil { - tsetting.ClusterHealth = taskSetting.ClusterHealth - taskSetting.ClusterHealth = nil - } - if taskSetting.ClusterStats != nil { - tsetting.ClusterStats = taskSetting.ClusterStats - taskSetting.ClusterStats = nil - } - setting = &agent.Setting{ - Metadata: agent.SettingsMetadata{ - Category: "agent", - Name: "task", - Labels: util.MapStr{ - "agent_id": agentID, - "cluster_uuid": nodeInfo.ClusterUuid, - "cluster_id": nodeInfo.ClusterID, - "node_uuid": nodeInfo.NodeUUID, - "endpoint": fmt.Sprintf("%s://%s", nodeInfo.Schema, nodeInfo.PublishAddress), - }, - }, - Payload: util.MapStr{ - "task": tsetting, - }, - } - err = orm.Create(nil, setting) + //matched + if nodeInfo.ClusterUuid == cfg.ClusterUUID { + //update node info + nodeInfo.ID = node.ID + nodeInfo.AgentID = inst.ID + nodeInfo.ClusterID = cfg.ID + err = orm.Save(nil, nodeInfo) if err != nil { - log.Error("save agent task setting error: ", err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return + log.Error(err) + continue + } + setting := pickAgentSettings(settings, node) + if setting == nil { + tsetting := model.TaskSetting{ + NodeStats: &model.NodeStatsTask{ + Enabled: true, + }, + Logs: &model.LogsTask{ + Enabled: true, + LogsPath: nodeInfo.Path.Logs, + }, + } + if taskSetting.IndexStats != nil { + tsetting.IndexStats = taskSetting.IndexStats + taskSetting.IndexStats = nil + } + if taskSetting.ClusterHealth != nil { + tsetting.ClusterHealth = taskSetting.ClusterHealth + taskSetting.ClusterHealth = nil + } + if taskSetting.ClusterStats != nil { + tsetting.ClusterStats = taskSetting.ClusterStats + taskSetting.ClusterStats = nil + } + setting = &agent.Setting{ + Metadata: agent.SettingsMetadata{ + Category: "agent", + Name: "task", + Labels: util.MapStr{ + "agent_id": agentID, + "cluster_uuid": nodeInfo.ClusterUuid, + "cluster_id": nodeInfo.ClusterID, + "node_uuid": nodeInfo.NodeUUID, + "endpoint": fmt.Sprintf("%s://%s", nodeInfo.Schema, nodeInfo.PublishAddress), + }, + }, + Payload: util.MapStr{ + "task": tsetting, + }, + } + err = orm.Create(nil, setting) + if err != nil { + log.Error("save agent task setting error: ", err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } } } } - } + } } h.WriteAckOKJSON(w) } @@ -985,7 +987,7 @@ func getNodesInfoFromES(agentID string) (map[int]*agent.ESNodeInfo, error) { func getUnAssociateNodes() (map[string][]agent.ESNodeInfo, error){ query := util.MapStr{ - "size": 1200, + "size": 3000, "query": util.MapStr{ "bool": util.MapStr{ "must_not": []util.MapStr{