diff --git a/modules/agent/api/init.go b/modules/agent/api/init.go index 2937f97a..bcefa402 100644 --- a/modules/agent/api/init.go +++ b/modules/agent/api/init.go @@ -25,6 +25,7 @@ func Init() { api.HandleAPIMethod(api.DELETE, "/agent/instance/:instance_id/_nodes", handler.RequirePermission(handler.deleteESNode, enum.PermissionAgentInstanceWrite)) api.HandleAPIMethod(api.POST, "/agent/instance/:instance_id/node/_associate", handler.RequirePermission(handler.associateESNode, enum.PermissionAgentInstanceWrite)) api.HandleAPIMethod(api.POST, "/agent/instance/try_connect", handler.RequireLogin(handler.tryConnect)) + api.HandleAPIMethod(api.POST, "/agent/auto_associate", handler.RequirePermission(handler.autoAssociateESNode, enum.PermissionAgentInstanceWrite)) api.HandleAPIMethod(api.POST, "/host/_enroll", handler.enrollHost) api.HandleAPIMethod(api.GET, "/host/:host_id/agent/info",handler.GetHostAgentInfo) diff --git a/modules/agent/api/instance.go b/modules/agent/api/instance.go index bdc7ac7c..c8fc32eb 100644 --- a/modules/agent/api/instance.go +++ b/modules/agent/api/instance.go @@ -9,7 +9,9 @@ import ( "fmt" "net" "net/http" + "net/url" "strconv" + "strings" "time" log "github.com/cihub/seelog" @@ -559,6 +561,178 @@ func (h *APIHandler) associateESNode(w http.ResponseWriter, req *http.Request, p h.WriteAckOKJSON(w) } +func (h *APIHandler) autoAssociateESNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + reqBody := struct { + ClusterID string `json:"cluster_id"` + }{} + err := h.DecodeJSON(req, &reqBody) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + // query cluster basicauth + cfg := elastic.GetConfig(reqBody.ClusterID) + basicAuth, err := common.GetBasicAuth(cfg) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + // query not associated nodes info + nodesM, err := getUnAssociateNodes() + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + if len(nodesM) == 0 { + h.WriteAckOKJSON(w) + return + } + agentIds := make([]string, 0, len(nodesM)) + for agentID := range nodesM { + agentIds = append(agentIds, agentID) + } + agents, err := getAgentByIds(agentIds) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + taskSetting, err := getSettingsByClusterID(cfg.ID) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + for agentID, nodes := range nodesM { + var ( + inst *agent.Instance + ok bool + ) + if inst, ok = agents[agentID]; !ok { + log.Warnf("agent [%v] was not found", agentID) + continue + } + settings, err := common2.GetAgentSettings(agentID, 0) + if err != nil { + log.Error(err) + continue + } + for _, node := range nodes { + host := node.PublishAddress + var endpoint string + if strings.HasPrefix( host, "::"){ + instURL, err := url.Parse(inst.Endpoint) + if err != nil { + log.Error(err) + continue + } + host = instURL.Hostname() + endpoint = fmt.Sprintf("%s://%s:%s", node.Schema, host, node.HttpPort) + }else{ + endpoint = fmt.Sprintf("%s://%s", node.Schema, host) + } + escfg := elastic.ElasticsearchConfig{ + Endpoint: endpoint, + BasicAuth: &basicAuth, + } + nodeInfo, err := client.GetClient().AuthESNode(context.Background(), inst.GetEndpoint(), escfg) + if err != nil { + log.Warn(err) + continue + } + //matched + if nodeInfo.ClusterUuid == cfg.ClusterUUID { + //update node info + nodeInfo.ID = node.ID + nodeInfo.AgentID = inst.ID + nodeInfo.ClusterID = cfg.ID + err = orm.Save(nil, nodeInfo) + if err != nil { + log.Error(err) + continue + } + setting := pickAgentSettings(settings, node) + if setting == nil { + tsetting := model.TaskSetting{ + NodeStats: &model.NodeStatsTask{ + Enabled: true, + }, + Logs: &model.LogsTask{ + Enabled: true, + LogsPath: nodeInfo.Path.Logs, + }, + } + if taskSetting.IndexStats != nil { + tsetting.IndexStats = taskSetting.IndexStats + taskSetting.IndexStats = nil + } + if taskSetting.ClusterHealth != nil { + tsetting.ClusterHealth = taskSetting.ClusterHealth + taskSetting.ClusterHealth = nil + } + if taskSetting.ClusterStats != nil { + tsetting.ClusterStats = taskSetting.ClusterStats + taskSetting.ClusterStats = nil + } + setting = &agent.Setting{ + Metadata: agent.SettingsMetadata{ + Category: "agent", + Name: "task", + Labels: util.MapStr{ + "agent_id": agentID, + "cluster_uuid": nodeInfo.ClusterUuid, + "cluster_id": nodeInfo.ClusterID, + "node_uuid": nodeInfo.NodeUUID, + "endpoint": fmt.Sprintf("%s://%s", nodeInfo.Schema, nodeInfo.PublishAddress), + }, + }, + Payload: util.MapStr{ + "task": tsetting, + }, + } + err = orm.Create(nil, setting) + if err != nil { + log.Error("save agent task setting error: ", err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + } + } + } + + } + h.WriteAckOKJSON(w) +} + +func getAgentByIds(agentIDs []string)(map[string]*agent.Instance, error){ + query := util.MapStr{ + "size": len(agentIDs), + "query": util.MapStr{ + "terms": util.MapStr{ + "id": agentIDs, + }, + }, + } + q := orm.Query{ + RawQuery: util.MustToJSONBytes(query), + } + err, result := orm.Search(agent.Instance{}, &q) + if err != nil { + return nil, err + } + agents := map[string]*agent.Instance{} + for _, row := range result.Result { + inst := agent.Instance{} + buf := util.MustToJSONBytes(row) + util.MustFromJSONBytes(buf, &inst) + agents[inst.ID] = &inst + } + return agents, nil +} + func (h *APIHandler) deleteESNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { id := ps.MustGetParameter("instance_id") nodeIDs := []string{} @@ -720,7 +894,7 @@ func getNodeByPidOrUUID(nodes map[int]*agent.ESNodeInfo, pid int, uuid string) * func getNodesInfoFromES(agentID string) (map[int]*agent.ESNodeInfo, error) { query := util.MapStr{ - "size": 100, + "size": 1000, "query": util.MapStr{ "term": util.MapStr{ "agent_id": util.MapStr{ @@ -747,6 +921,39 @@ func getNodesInfoFromES(agentID string) (map[int]*agent.ESNodeInfo, error) { return nodesInfo, nil } +func getUnAssociateNodes() (map[string][]agent.ESNodeInfo, error){ + query := util.MapStr{ + "size": 1200, + "query": util.MapStr{ + "bool": util.MapStr{ + "must_not": []util.MapStr{ + { + "exists": util.MapStr{ + "field": "cluster_id", + }, + }, + }, + }, + }, + } + q := orm.Query{ + RawQuery: util.MustToJSONBytes(query), + } + + err, result := orm.Search(agent.ESNodeInfo{}, &q) + if err != nil { + return nil, err + } + nodesInfo := map[string][]agent.ESNodeInfo{} + for _, row := range result.Result { + node := agent.ESNodeInfo{} + buf := util.MustToJSONBytes(row) + util.MustFromJSONBytes(buf, &node) + nodesInfo[node.AgentID] = append(nodesInfo[node.AgentID], node) + } + return nodesInfo, nil +} + func pickAgentSettings(settings []agent.Setting, nodeInfo agent.ESNodeInfo) *agent.Setting { for _, setting := range settings { if setting.Metadata.Labels["node_uuid"] == nodeInfo.NodeUUID { @@ -785,50 +992,7 @@ func getAgentTaskSetting(agentID string, node agent.ESNodeInfo) (*agent.Setting, // getSettingsByClusterID query agent task settings with cluster id func getSettingsByClusterID(clusterID string) (*model.TaskSetting, error) { - queryDsl := util.MapStr{ - "size": 200, - "query": util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "metadata.labels.cluster_id": util.MapStr{ - "value": clusterID, - }, - }, - }, - }, - "minimum_should_match": 1, - "should": []util.MapStr{ - { - "term": util.MapStr{ - "payload.task.cluster_health.enabled": util.MapStr{ - "value": true, - }, - }, - }, - { - "term": util.MapStr{ - "payload.task.cluster_stats.enabled": util.MapStr{ - "value": true, - }, - }, - }, - { - "term": util.MapStr{ - "payload.task.index_stats.enabled": util.MapStr{ - "value": true, - }, - }, - }, - }, - }, - }, - } - q := orm.Query{ - RawQuery: util.MustToJSONBytes(queryDsl), - } - err, result := orm.Search(agent.Setting{}, &q) + err, result := querySettingsByClusterID(clusterID) if err != nil { return nil, err } @@ -879,3 +1043,50 @@ func getSettingsByClusterID(clusterID string) (*model.TaskSetting, error) { } return setting, nil } + +func querySettingsByClusterID(clusterID string)(error, orm.Result){ + queryDsl := util.MapStr{ + "size": 500, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "metadata.labels.cluster_id": util.MapStr{ + "value": clusterID, + }, + }, + }, + }, + "minimum_should_match": 1, + "should": []util.MapStr{ + { + "term": util.MapStr{ + "payload.task.cluster_health.enabled": util.MapStr{ + "value": true, + }, + }, + }, + { + "term": util.MapStr{ + "payload.task.cluster_stats.enabled": util.MapStr{ + "value": true, + }, + }, + }, + { + "term": util.MapStr{ + "payload.task.index_stats.enabled": util.MapStr{ + "value": true, + }, + }, + }, + }, + }, + }, + } + q := orm.Query{ + RawQuery: util.MustToJSONBytes(queryDsl), + } + return orm.Search(agent.Setting{}, &q) +} \ No newline at end of file diff --git a/modules/agent/common/helper.go b/modules/agent/common/helper.go index a6d26216..7e2775be 100644 --- a/modules/agent/common/helper.go +++ b/modules/agent/common/helper.go @@ -32,21 +32,23 @@ func ParseAgentSettings(settings []agent.Setting)(*model.ParseAgentSettingsResul clusterID string ok bool ) + nodeUUID := util.ToString(setting.Metadata.Labels["node_uuid"]) if clusterID, ok = setting.Metadata.Labels["cluster_id"].(string); ok && clusterID != ""{ cfg := elastic.GetConfig(clusterID) + newID := getClusterConfigReferenceName(clusterID, nodeUUID) newCfg := elastic.ElasticsearchConfig{ Enabled: true, - Name: cfg.Name, + Name: newID, BasicAuth: cfg.BasicAuth, //todo get endpoint from agent node info Endpoint: setting.Metadata.Labels["endpoint"].(string), + ClusterUUID: cfg.ClusterUUID, } - newCfg.ID = clusterID + newCfg.ID = newID clusterCfgs = append(clusterCfgs, newCfg) }else{ return nil, fmt.Errorf("got wrong cluster id [%v] from metadata labels", setting.Metadata.Labels["cluster_id"]) } - nodeUUID := util.ToString(setting.Metadata.Labels["node_uuid"]) taskCfg, err := util.MapStr(setting.Payload).GetValue("task") if err != nil { @@ -152,6 +154,10 @@ func GetAgentSettings(agentID string, timestamp int64) ([]agent.Setting, error) return settings, nil } +func getClusterConfigReferenceName(clusterID, nodeUUID string) string { + return fmt.Sprintf("%s_%s", clusterID, nodeUUID) +} + func TransformSettingsToConfig(setting *model.TaskSetting, clusterID, nodeUUID string) ([]util.MapStr, []string, error) { if setting == nil { return nil, nil, fmt.Errorf("empty setting") @@ -163,7 +169,7 @@ func TransformSettingsToConfig(setting *model.TaskSetting, clusterID, nodeUUID s if setting.ClusterStats != nil { var processorName = "es_cluster_stats" if setting.ClusterStats.Enabled { - pipelineCfg, err := newClusterMetricPipeline(processorName, clusterID) + pipelineCfg, err := newClusterMetricPipeline(processorName, clusterID, nodeUUID) if err != nil { return nil, nil, err } @@ -175,7 +181,7 @@ func TransformSettingsToConfig(setting *model.TaskSetting, clusterID, nodeUUID s if setting.IndexStats != nil { var processorName = "es_index_stats" if setting.IndexStats.Enabled { - pipelineCfg, err := newClusterMetricPipeline(processorName, clusterID) + pipelineCfg, err := newClusterMetricPipeline(processorName, clusterID, nodeUUID) if err != nil { return nil, nil, err } @@ -187,7 +193,7 @@ func TransformSettingsToConfig(setting *model.TaskSetting, clusterID, nodeUUID s if setting.ClusterHealth != nil { var processorName = "es_cluster_health" if setting.ClusterHealth.Enabled { - pipelineCfg, err := newClusterMetricPipeline(processorName, clusterID) + pipelineCfg, err := newClusterMetricPipeline(processorName, clusterID, nodeUUID) if err != nil { return nil, nil, err } @@ -200,7 +206,10 @@ func TransformSettingsToConfig(setting *model.TaskSetting, clusterID, nodeUUID s var processorName = "es_node_stats" if setting.NodeStats.Enabled { params := util.MapStr{ - "elasticsearch": clusterID, + "elasticsearch": getClusterConfigReferenceName(clusterID, nodeUUID), + "labels": util.MapStr{ + "cluster_id": clusterID, + }, } if len(setting.NodeStats.NodeIDs) > 0{ params["node_uuids"] = setting.NodeStats.NodeIDs @@ -226,8 +235,11 @@ func TransformSettingsToConfig(setting *model.TaskSetting, clusterID, nodeUUID s var processorName = "es_logs_processor" if setting.Logs.Enabled { params := util.MapStr{ - "elasticsearch": clusterID, + "elasticsearch": getClusterConfigReferenceName(clusterID, nodeUUID), "queue_name": "logs", + "labels": util.MapStr{ + "cluster_id": clusterID, + }, } if setting.Logs.LogsPath != "" { params["logs_path"] = setting.Logs.LogsPath @@ -251,10 +263,14 @@ func TransformSettingsToConfig(setting *model.TaskSetting, clusterID, nodeUUID s } -func newClusterMetricPipeline(processorName string, clusterID string)(util.MapStr, error){ +func newClusterMetricPipeline(processorName string, clusterID string, nodeUUID string)(util.MapStr, error){ + referName := getClusterConfigReferenceName(clusterID, nodeUUID) cfg := util.MapStr{ processorName: util.MapStr{ - "elasticsearch": clusterID, + "elasticsearch": referName, + "labels": util.MapStr{ + "cluster_id": clusterID, + }, }, } enabled := true diff --git a/modules/agent/state/state.go b/modules/agent/state/state.go index 7b65effd..f6d982f8 100644 --- a/modules/agent/state/state.go +++ b/modules/agent/state/state.go @@ -190,14 +190,18 @@ func (sm *StateManager) syncSettings(agentID string) { "endpoint": cfg.Endpoint, } if cfg.BasicAuth != nil && cfg.BasicAuth.Password != ""{ - err = agClient.SetKeystoreValue(context.Background(), ag.GetEndpoint(), fmt.Sprintf("%s_password", cfg.ID), cfg.BasicAuth.Password) + cid := cfg.ID + if cfg.ClusterUUID != "" { + cid = cfg.ClusterUUID + } + err = agClient.SetKeystoreValue(context.Background(), ag.GetEndpoint(), fmt.Sprintf("%s_password", cid), cfg.BasicAuth.Password) if err != nil { log.Errorf("set keystore value error: %v", err) continue } clusterCfg["basic_auth"] = util.MapStr{ "username": cfg.BasicAuth.Username, - "password": fmt.Sprintf("$[[keystore.%s_password]]", cfg.ID), + "password": fmt.Sprintf("$[[keystore.%s_password]]", cid), } } clusterCfgs = append(clusterCfgs, clusterCfg)