Merge pull request 'move agent module from framework to console' (#100) from agent_v3 into master
This commit is contained in:
		
						commit
						350ef263de
					
				
							
								
								
									
										2
									
								
								main.go
								
								
								
								
							
							
						
						
									
										2
									
								
								main.go
								
								
								
								
							|  | @ -11,6 +11,7 @@ import ( | |||
| 	"infini.sh/console/model" | ||||
| 	"infini.sh/console/model/alerting" | ||||
| 	"infini.sh/console/model/insight" | ||||
| 	"infini.sh/console/modules/agent" | ||||
| 	_ "infini.sh/console/plugin" | ||||
| 	setup1 "infini.sh/console/plugin/setup" | ||||
| 	alerting2 "infini.sh/console/service/alerting" | ||||
|  | @ -22,7 +23,6 @@ import ( | |||
| 	"infini.sh/framework/core/module" | ||||
| 	"infini.sh/framework/core/orm" | ||||
| 	task1 "infini.sh/framework/core/task" | ||||
| 	"infini.sh/framework/modules/agent" | ||||
| 	_ "infini.sh/framework/modules/api" | ||||
| 	elastic2 "infini.sh/framework/modules/elastic" | ||||
| 	"infini.sh/framework/modules/metrics" | ||||
|  |  | |||
|  | @ -0,0 +1,80 @@ | |||
| /* Copyright © INFINI Ltd. All rights reserved. | ||||
|  * Web: https://infinilabs.com
 | ||||
|  * Email: hello#infini.ltd */ | ||||
| 
 | ||||
| package agent | ||||
| 
 | ||||
| import ( | ||||
| 	log "github.com/cihub/seelog" | ||||
| 	"infini.sh/console/modules/agent/api" | ||||
| 	"infini.sh/console/modules/agent/common" | ||||
| 	"infini.sh/framework/core/agent" | ||||
| 	"infini.sh/framework/core/env" | ||||
| 	"infini.sh/framework/core/host" | ||||
| 	"infini.sh/framework/core/orm" | ||||
| 	"time" | ||||
| ) | ||||
| 
 | ||||
| func (module *AgentModule) Name() string { | ||||
| 	return "agent" | ||||
| } | ||||
| 
 | ||||
| func (module *AgentModule) Setup() { | ||||
| 	module.AgentConfig.Enabled = true | ||||
| 	module.AgentConfig.StateManager.Enabled = true | ||||
| 	exists, err := env.ParseConfig("agent", &module.AgentConfig) | ||||
| 	if exists && err != nil { | ||||
| 		panic(err) | ||||
| 	} | ||||
| 	if module.AgentConfig.Enabled { | ||||
| 		api.Init() | ||||
| 	} | ||||
| } | ||||
| func (module *AgentModule) Start() error { | ||||
| 	if !module.AgentConfig.Enabled { | ||||
| 		return nil | ||||
| 	} | ||||
| 	orm.RegisterSchemaWithIndexName(agent.Instance{}, "agent") | ||||
| 	orm.RegisterSchemaWithIndexName(agent.ESNodeInfo{}, "agent-node") | ||||
| 	orm.RegisterSchemaWithIndexName(host.HostInfo{}, "host") | ||||
| 	orm.RegisterSchemaWithIndexName(agent.Setting{}, "setting") | ||||
| 	common.RegisterClient(&common.Client{}) | ||||
| 
 | ||||
| 	if module.AgentConfig.StateManager.Enabled { | ||||
| 		onlineAgentIDs, err := common.GetLatestOnlineAgentIDs(nil, 60) | ||||
| 		if err != nil { | ||||
| 			log.Error(err) | ||||
| 		} | ||||
| 		agents, err := common.LoadAgentsFromES("") | ||||
| 		if err != nil { | ||||
| 			log.Error(err) | ||||
| 		} | ||||
| 		agentIds := map[string]string{} | ||||
| 		for _, ag := range agents { | ||||
| 			if _, ok := onlineAgentIDs[ag.ID]; ok { | ||||
| 				agentIds[ag.ID] = "online" | ||||
| 			} | ||||
| 		} | ||||
| 
 | ||||
| 		sm := common.NewStateManager(time.Second*30, "agent_state", agentIds) | ||||
| 		common.RegisterStateManager(sm) | ||||
| 		go sm.LoopState() | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (module *AgentModule) Stop() error { | ||||
| 	if !module.AgentConfig.Enabled { | ||||
| 		return nil | ||||
| 	} | ||||
| 	log.Info("start to stop agent module") | ||||
| 	if module.AgentConfig.StateManager.Enabled { | ||||
| 		common.GetStateManager().Stop() | ||||
| 	} | ||||
| 	log.Info("agent module was stopped") | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| type AgentModule struct { | ||||
| 	common.AgentConfig | ||||
| } | ||||
|  | @ -0,0 +1,222 @@ | |||
| /* Copyright © INFINI Ltd. All rights reserved. | ||||
|  * Web: https://infinilabs.com
 | ||||
|  * Email: hello#infini.ltd */ | ||||
| 
 | ||||
| package api | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	common2 "infini.sh/console/modules/agent/common" | ||||
| 	httprouter "infini.sh/framework/core/api/router" | ||||
| 	"infini.sh/framework/core/host" | ||||
| 	"infini.sh/framework/core/orm" | ||||
| 	"infini.sh/framework/core/util" | ||||
| 	"net/http" | ||||
| 	log "github.com/cihub/seelog" | ||||
| 	"time" | ||||
| ) | ||||
| 
 | ||||
| func (h *APIHandler) enrollHost(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { | ||||
| 	var reqBody []struct { | ||||
| 		AgentID  string `json:"agent_id"` | ||||
| 		HostName string `json:"host_name"` | ||||
| 		IP       string `json:"ip"` | ||||
| 		Source   string `json:"source"` | ||||
| 		OSName string `json:"os_name"` | ||||
| 		OSArch string `json:"os_arch"` | ||||
| 		NodeID string `json:"node_uuid"` | ||||
| 	} | ||||
| 	err := h.DecodeJSON(req, &reqBody) | ||||
| 	if err != nil { | ||||
| 		log.Error(err) | ||||
| 		h.WriteError(w, err.Error(), http.StatusInternalServerError) | ||||
| 		return | ||||
| 	} | ||||
| 	errors := util.MapStr{} | ||||
| 	for _, hi := range reqBody { | ||||
| 		var ( | ||||
| 			hostInfo *host.HostInfo | ||||
| 		) | ||||
| 		switch hi.Source { | ||||
| 		case "agent": | ||||
| 			hostInfo, err = enrollHostFromAgent(hi.AgentID) | ||||
| 			if err != nil { | ||||
| 				errors[hi.IP] = util.MapStr{ | ||||
| 					"error": err.Error(), | ||||
| 				} | ||||
| 				log.Error(err) | ||||
| 				continue | ||||
| 			} | ||||
| 			hostInfo.IP = hi.IP | ||||
| 			hostInfo.AgentID = hi.AgentID | ||||
| 			err = orm.Create(nil, hostInfo) | ||||
| 			if err != nil { | ||||
| 				errors[hi.IP] = util.MapStr{ | ||||
| 					"error": err.Error(), | ||||
| 				} | ||||
| 				log.Error(err) | ||||
| 				continue | ||||
| 			} | ||||
| 		case "es_node": | ||||
| 			hostInfo = &host.HostInfo{ | ||||
| 				IP: hi.IP, | ||||
| 				OSInfo: host.OS{ | ||||
| 					Platform: hi.OSName, | ||||
| 					KernelArch: hi.OSArch, | ||||
| 				}, | ||||
| 				NodeID: hi.NodeID, | ||||
| 			} | ||||
| 		default: | ||||
| 			errors[hi.IP] = util.MapStr{ | ||||
| 				"error": fmt.Errorf("unkonow source type"), | ||||
| 			} | ||||
| 			continue | ||||
| 		} | ||||
| 		hostInfo.Timestamp = time.Now() | ||||
| 		err = orm.Create(nil, hostInfo) | ||||
| 		if err != nil { | ||||
| 			errors[hi.IP] = util.MapStr{ | ||||
| 				"error": err.Error(), | ||||
| 			} | ||||
| 			log.Error(err) | ||||
| 			continue | ||||
| 		} | ||||
| 		if hi.Source == "agent" { | ||||
| 			sm := common2.GetStateManager() | ||||
| 			ag, _  := sm.GetAgent(hostInfo.AgentID) | ||||
| 			err = sm.GetAgentClient().DiscoveredHost(nil, ag.GetEndpoint(), util.MapStr{ | ||||
| 				"host_id": hostInfo.ID, | ||||
| 			}) | ||||
| 			if err != nil { | ||||
| 				log.Error(err) | ||||
| 			} | ||||
| 		} | ||||
| 
 | ||||
| 	} | ||||
| 	resBody :=  util.MapStr{ | ||||
| 		"success": true, | ||||
| 	} | ||||
| 	if len(errors) > 0 { | ||||
| 		resBody["errors"] = errors | ||||
| 		resBody["success"] = false | ||||
| 	} | ||||
| 
 | ||||
| 	h.WriteJSON(w, resBody, http.StatusOK) | ||||
| } | ||||
| 
 | ||||
| func (h *APIHandler) GetHostAgentInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { | ||||
| 	hostID := ps.MustGetParameter("host_id") | ||||
| 	hostInfo, err := getHost(hostID) | ||||
| 	if err != nil { | ||||
| 		log.Error(err) | ||||
| 		h.WriteError(w, err.Error(), http.StatusInternalServerError) | ||||
| 		return | ||||
| 	} | ||||
| 	if hostInfo.AgentID == "" { | ||||
| 		h.WriteJSON(w, util.MapStr{}, http.StatusOK) | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	sm := common2.GetStateManager() | ||||
| 	ag, err := sm.GetAgent(hostInfo.AgentID) | ||||
| 	if err != nil { | ||||
| 		log.Error(err) | ||||
| 		h.WriteJSON(w, util.MapStr{}, http.StatusOK) | ||||
| 		return | ||||
| 	} | ||||
| 	aversion, err := ag.GetVersion() | ||||
| 	if err == nil { | ||||
| 		ag.Version = aversion | ||||
| 		orm.Save(nil, ag) | ||||
| 	} | ||||
| 	h.WriteJSON(w, util.MapStr{ | ||||
| 		"host_id": hostID, | ||||
| 		"agent_id": ag.ID, | ||||
| 		"version": ag.Version, | ||||
| 		"status": ag.Status, | ||||
| 		"endpoint": ag.GetEndpoint(), | ||||
| 	}, http.StatusOK) | ||||
| } | ||||
| 
 | ||||
| func getHost(hostID string) (*host.HostInfo, error){ | ||||
| 	hostInfo := &host.HostInfo{} | ||||
| 	hostInfo.ID = hostID | ||||
| 	exists, err := orm.Get(hostInfo) | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf("get host info error: %w", err) | ||||
| 	} | ||||
| 	if !exists { | ||||
| 		return nil, fmt.Errorf("host [%s] not found", hostID) | ||||
| 	} | ||||
| 	return hostInfo, nil | ||||
| } | ||||
| 
 | ||||
| func (h *APIHandler) GetHostElasticProcess(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { | ||||
| 	hostID := ps.MustGetParameter("host_id") | ||||
| 	hostInfo := &host.HostInfo{} | ||||
| 	hostInfo.ID = hostID | ||||
| 	exists, err := orm.Get(hostInfo) | ||||
| 	if err != nil { | ||||
| 		log.Error(err) | ||||
| 		h.WriteError(w, err.Error(), http.StatusInternalServerError) | ||||
| 		return | ||||
| 	} | ||||
| 	if !exists { | ||||
| 		h.WriteError(w, fmt.Sprintf("host [%s] not found", hostID), http.StatusNotFound) | ||||
| 		return | ||||
| 	} | ||||
| 	if hostInfo.AgentID == "" { | ||||
| 		h.WriteJSON(w, util.MapStr{}, http.StatusOK) | ||||
| 		return | ||||
| 	} | ||||
| 	sm := common2.GetStateManager() | ||||
| 	ag, err := sm.GetAgent(hostInfo.AgentID) | ||||
| 	if err != nil { | ||||
| 		log.Error(err) | ||||
| 		h.WriteJSON(w, util.MapStr{}, http.StatusOK) | ||||
| 		return | ||||
| 	} | ||||
| 	ctx,cancel := context.WithTimeout(context.Background(), time.Second * 10) | ||||
| 	defer cancel() | ||||
| 	esNodesInfo, err := sm.GetAgentClient().GetElasticsearchNodes(ctx, ag.GetEndpoint()) | ||||
| 	if err != nil { | ||||
| 		log.Error(err) | ||||
| 		h.WriteError(w, err.Error(), http.StatusInternalServerError) | ||||
| 		return | ||||
| 	} | ||||
| 	var processes []util.MapStr | ||||
| 	for _, node := range esNodesInfo { | ||||
| 		processes = append(processes, util.MapStr{ | ||||
| 			"pid":          node.ProcessInfo.PID, | ||||
| 			"pid_status":   node.ProcessInfo.Status, | ||||
| 			"cluster_name": node.ClusterName, | ||||
| 			"cluster_uuid": node.ClusterUuid, | ||||
| 			"cluster_id":  node.ClusterID, | ||||
| 			"node_id":      node.NodeUUID, | ||||
| 			"node_name":    node.NodeName, | ||||
| 			"uptime_in_ms": time.Now().UnixMilli() - node.ProcessInfo.CreateTime, | ||||
| 		}) | ||||
| 	} | ||||
| 	h.WriteJSON(w, util.MapStr{ | ||||
| 		"elastic_processes": processes, | ||||
| 	}, http.StatusOK) | ||||
| } | ||||
| 
 | ||||
| func enrollHostFromAgent(agentID string) (*host.HostInfo, error){ | ||||
| 	sm := common2.GetStateManager() | ||||
| 	ag, err := sm.GetAgent(agentID) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	if ag == nil { | ||||
| 		return nil, fmt.Errorf("can not found agent [%s]", agentID) | ||||
| 	} | ||||
| 	agentClient := sm.GetAgentClient() | ||||
| 	hostInfo, err :=  agentClient.GetHostInfo(nil, ag.GetEndpoint()) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	hostInfo.AgentStatus = ag.Status | ||||
| 	return hostInfo, nil | ||||
| } | ||||
|  | @ -0,0 +1,28 @@ | |||
| /* Copyright © INFINI Ltd. All rights reserved. | ||||
|  * Web: https://infinilabs.com
 | ||||
|  * Email: hello#infini.ltd */ | ||||
| 
 | ||||
| package api | ||||
| 
 | ||||
| import ( | ||||
| 	"infini.sh/framework/core/api" | ||||
| 	"infini.sh/framework/core/api/rbac/enum" | ||||
| ) | ||||
| 
 | ||||
| func Init() { | ||||
| 	handler := APIHandler{} | ||||
| 	api.HandleAPIMethod(api.POST, "/agent/instance", handler.createInstance) | ||||
| 	api.HandleAPIMethod(api.GET, "/agent/instance/_search", handler.RequirePermission(handler.searchInstance, enum.PermissionAgentInstanceRead)) | ||||
| 	api.HandleAPIMethod(api.GET, "/agent/instance/:instance_id", handler.getInstance) | ||||
| 	api.HandleAPIMethod(api.DELETE, "/agent/instance/:instance_id", handler.RequirePermission(handler.deleteInstance, enum.PermissionAgentInstanceWrite)) | ||||
| 	api.HandleAPIMethod(api.POST, "/agent/instance/_stats", handler.RequirePermission(handler.getInstanceStats, enum.PermissionAgentInstanceRead)) | ||||
| 	api.HandleAPIMethod(api.GET, "/agent/log/node/:node_id/files", handler.RequirePermission(handler.getLogFilesByNode, enum.PermissionAgentInstanceRead)) | ||||
| 	api.HandleAPIMethod(api.POST, "/agent/log/node/:node_id/_scroll", handler.RequirePermission(handler.getLogFileContent, enum.PermissionAgentInstanceRead)) | ||||
| 	api.HandleAPIMethod(api.GET, "/agent/instance/:instance_id/_nodes", handler.RequirePermission(handler.getESNodesInfo, enum.PermissionAgentInstanceRead)) | ||||
| 	api.HandleAPIMethod(api.POST, "/agent/instance/:instance_id/_nodes/_refresh", handler.RequirePermission(handler.refreshESNodesInfo, enum.PermissionAgentInstanceWrite)) | ||||
| 	api.HandleAPIMethod(api.POST, "/agent/instance/:instance_id/node/_auth", handler.RequirePermission(handler.authESNode, enum.PermissionAgentInstanceWrite)) | ||||
| 
 | ||||
| 	api.HandleAPIMethod(api.POST, "/host/_enroll", handler.enrollHost) | ||||
| 	api.HandleAPIMethod(api.GET, "/host/:host_id/agent/info",handler.GetHostAgentInfo) | ||||
| 	api.HandleAPIMethod(api.GET, "/host/:host_id/processes",handler.GetHostElasticProcess) | ||||
| } | ||||
|  | @ -0,0 +1,716 @@ | |||
| /* Copyright © INFINI Ltd. All rights reserved. | ||||
|  * Web: https://infinilabs.com
 | ||||
|  * Email: hello#infini.ltd */ | ||||
| 
 | ||||
| package api | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	log "github.com/cihub/seelog" | ||||
| 	"infini.sh/framework/core/agent" | ||||
| 	"infini.sh/framework/core/api" | ||||
| 	httprouter "infini.sh/framework/core/api/router" | ||||
| 	"infini.sh/framework/core/elastic" | ||||
| 	"infini.sh/framework/core/event" | ||||
| 	"infini.sh/framework/core/host" | ||||
| 	"infini.sh/framework/core/orm" | ||||
| 	"infini.sh/framework/core/util" | ||||
| 	common2 "infini.sh/console/modules/agent/common" | ||||
| 	elastic2 "infini.sh/framework/modules/elastic" | ||||
| 	"infini.sh/framework/modules/elastic/common" | ||||
| 	"net/http" | ||||
| 	"strconv" | ||||
| ) | ||||
| 
 | ||||
| type APIHandler struct { | ||||
| 	api.Handler | ||||
| } | ||||
| 
 | ||||
| func (h *APIHandler) createInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { | ||||
| 	var obj = &agent.Instance{} | ||||
| 	err := h.DecodeJSON(req, obj) | ||||
| 	if err != nil { | ||||
| 		h.WriteError(w, err.Error(), http.StatusInternalServerError) | ||||
| 		log.Error(err) | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	oldInst := &agent.Instance{} | ||||
| 	oldInst.ID = obj.ID | ||||
| 	exists, err := orm.Get(oldInst) | ||||
| 
 | ||||
| 	if err != nil && err != elastic2.ErrNotFound { | ||||
| 		h.WriteError(w, err.Error(), http.StatusInternalServerError) | ||||
| 		log.Error(err) | ||||
| 		return | ||||
| 	} | ||||
| 	if exists { | ||||
| 		errMsg := fmt.Sprintf("agent [%s] already exists", obj.ID) | ||||
| 		h.WriteError(w, errMsg, http.StatusInternalServerError) | ||||
| 		log.Error(errMsg) | ||||
| 		return | ||||
| 	} | ||||
| 	//fetch more information of agent instance
 | ||||
| 	res, err := common2.GetClient().GetInstanceBasicInfo(context.Background(), obj.GetEndpoint()) | ||||
| 	if err != nil { | ||||
| 		errStr := fmt.Sprintf("get agent instance basic info error: %s", err.Error()) | ||||
| 		h.WriteError(w,errStr , http.StatusInternalServerError) | ||||
| 		log.Error(errStr) | ||||
| 		return | ||||
| 	} | ||||
| 	if res.ID == "" { | ||||
| 		errStr :=fmt.Sprintf("got unexpected response of agent instance basic info: %s", util.MustToJSON(res)) | ||||
| 		h.WriteError(w, errStr , http.StatusInternalServerError) | ||||
| 		log.Error(errStr) | ||||
| 		return | ||||
| 	}else{ | ||||
| 		obj.ID = res.ID | ||||
| 		obj.Version = res.Version | ||||
| 		obj.MajorIP = res.MajorIP | ||||
| 		obj.Host = res.Host | ||||
| 		obj.IPS = res.IPS | ||||
| 	} | ||||
| 
 | ||||
| 	err = orm.Create(nil, obj) | ||||
| 	if err != nil { | ||||
| 		h.WriteError(w, err.Error(), http.StatusInternalServerError) | ||||
| 		log.Error(err) | ||||
| 		return | ||||
| 	} | ||||
| 	_, err = refreshNodesInfo(obj) | ||||
| 	if err != nil { | ||||
| 		log.Error(err) | ||||
| 	} | ||||
| 
 | ||||
| 	h.WriteCreatedOKJSON(w, obj.ID) | ||||
| 
 | ||||
| } | ||||
| 
 | ||||
| func bindAgentToHostByIP(ag *agent.Instance) error{ | ||||
| 	err, result := orm.GetBy("ip", ag.MajorIP, host.HostInfo{}) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	if len(result.Result) > 0 { | ||||
| 		buf := util.MustToJSONBytes(result.Result[0]) | ||||
| 		hostInfo := &host.HostInfo{} | ||||
| 		err = util.FromJSONBytes(buf, hostInfo) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		sm := common2.GetStateManager() | ||||
| 		if ag.Status == "" { | ||||
| 			_, err1 := sm.GetAgentClient().GetHostInfo(nil, ag.GetEndpoint()) | ||||
| 			if err1 == nil { | ||||
| 				ag.Status = "online" | ||||
| 			}else{ | ||||
| 				ag.Status = "offline" | ||||
| 			} | ||||
| 		} | ||||
| 
 | ||||
| 		hostInfo.AgentStatus = ag.Status | ||||
| 		hostInfo.AgentID = ag.ID | ||||
| 		err = orm.Update(nil, hostInfo) | ||||
| 		if err != nil { | ||||
| 			return  err | ||||
| 		} | ||||
| 
 | ||||
| 		err = sm.GetAgentClient().DiscoveredHost(nil, ag.GetEndpoint(), util.MapStr{ | ||||
| 			"host_id": hostInfo.ID, | ||||
| 		}) | ||||
| 		if err != nil { | ||||
| 			return  err | ||||
| 		} | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (h *APIHandler) getInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { | ||||
| 	id := ps.MustGetParameter("instance_id") | ||||
| 
 | ||||
| 	obj := agent.Instance{} | ||||
| 	obj.ID = id | ||||
| 
 | ||||
| 	exists, err := orm.Get(&obj) | ||||
| 	if !exists || err != nil { | ||||
| 		h.WriteJSON(w, util.MapStr{ | ||||
| 			"_id":   id, | ||||
| 			"found": false, | ||||
| 		}, http.StatusNotFound) | ||||
| 		return | ||||
| 	} | ||||
| 	if err != nil { | ||||
| 		h.WriteError(w, err.Error(), http.StatusInternalServerError) | ||||
| 		log.Error(err) | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	h.WriteJSON(w, util.MapStr{ | ||||
| 		"found":   true, | ||||
| 		"_id":     id, | ||||
| 		"_source": obj, | ||||
| 	}, 200) | ||||
| } | ||||
| 
 | ||||
| func (h *APIHandler) deleteInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { | ||||
| 	id := ps.MustGetParameter("instance_id") | ||||
| 
 | ||||
| 	obj := agent.Instance{} | ||||
| 	obj.ID = id | ||||
| 
 | ||||
| 	exists, err := orm.Get(&obj) | ||||
| 	if !exists || err != nil { | ||||
| 		h.WriteJSON(w, util.MapStr{ | ||||
| 			"_id":    id, | ||||
| 			"result": "not_found", | ||||
| 		}, http.StatusNotFound) | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	err = orm.Delete(nil, &obj) | ||||
| 	if err != nil { | ||||
| 		h.WriteError(w, err.Error(), http.StatusInternalServerError) | ||||
| 		log.Error(err) | ||||
| 		return | ||||
| 	} | ||||
| 	if sm := common2.GetStateManager(); sm != nil { | ||||
| 		sm.DeleteAgent(obj.ID) | ||||
| 	} | ||||
| 
 | ||||
| 	h.WriteJSON(w, util.MapStr{ | ||||
| 		"_id":    obj.ID, | ||||
| 		"result": "deleted", | ||||
| 	}, 200) | ||||
| } | ||||
| 
 | ||||
| func (h *APIHandler) getInstanceStats(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { | ||||
| 	var instanceIDs = []string{} | ||||
| 	err := h.DecodeJSON(req, &instanceIDs) | ||||
| 	if err != nil { | ||||
| 		log.Error(err) | ||||
| 		h.WriteError(w, err.Error(), http.StatusInternalServerError) | ||||
| 		return | ||||
| 	} | ||||
| 	if len(instanceIDs) == 0 { | ||||
| 		h.WriteJSON(w, util.MapStr{}, http.StatusOK) | ||||
| 		return | ||||
| 	} | ||||
| 	q := orm.Query{ | ||||
| 		WildcardIndex: true, | ||||
| 	} | ||||
| 	queryDSL := util.MapStr{ | ||||
| 		"sort": []util.MapStr{ | ||||
| 			{ | ||||
| 				"timestamp": util.MapStr{ | ||||
| 					"order": "desc", | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
| 		"collapse": util.MapStr{ | ||||
| 			"field": "agent.id", | ||||
| 		}, | ||||
| 		"query": util.MapStr{ | ||||
| 			"bool": util.MapStr{ | ||||
| 				"filter": []util.MapStr{ | ||||
| 					{ | ||||
| 						"range": util.MapStr{ | ||||
| 							"timestamp": util.MapStr{ | ||||
| 								"gte": "now-1m", | ||||
| 							}, | ||||
| 						}, | ||||
| 					}, | ||||
| 				}, | ||||
| 				"must": []util.MapStr{ | ||||
| 					{ | ||||
| 						"term": util.MapStr{ | ||||
| 							"metadata.name": util.MapStr{ | ||||
| 								"value": "agent", | ||||
| 							}, | ||||
| 						}, | ||||
| 					}, { | ||||
| 						"terms": util.MapStr{ | ||||
| 							"agent.id": instanceIDs, | ||||
| 						}, | ||||
| 					}, | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
| 	q.RawQuery = util.MustToJSONBytes(queryDSL) | ||||
| 
 | ||||
| 	err, res := orm.Search(event.Event{}, &q) | ||||
| 	if err != nil { | ||||
| 		log.Error(err) | ||||
| 		h.WriteError(w, err.Error(), http.StatusInternalServerError) | ||||
| 		return | ||||
| 	} | ||||
| 	result := util.MapStr{} | ||||
| 	for _, item := range res.Result { | ||||
| 		if itemV, ok := item.(map[string]interface{}); ok { | ||||
| 			if agentID, ok := util.GetMapValueByKeys([]string{"agent", "id"}, itemV); ok { | ||||
| 				if v, ok := agentID.(string); ok { | ||||
| 					if ab, ok := util.GetMapValueByKeys([]string{"payload","instance", "system"}, itemV); ok{ | ||||
| 						if abV, ok := ab.(map[string]interface{}); ok { | ||||
| 							result[v] = util.MapStr{ | ||||
| 								"timestamp": itemV["timestamp"], | ||||
| 								"system": util.MapStr{ | ||||
| 									"cpu": abV["cpu"], | ||||
| 									"mem": abV["mem"], | ||||
| 									"uptime_in_ms": abV["uptime_in_ms"], | ||||
| 									"status": "online", | ||||
| 								}, | ||||
| 							} | ||||
| 						} | ||||
| 					} | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 
 | ||||
| 	} | ||||
| 	h.WriteJSON(w, result, http.StatusOK) | ||||
| } | ||||
| 
 | ||||
| 
 | ||||
| func (h *APIHandler) searchInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { | ||||
| 
 | ||||
| 	var ( | ||||
| 		keyword = h.GetParameterOrDefault(req, "keyword", "") | ||||
| 		//queryDSL    = `{"query":{"bool":{"must":[%s]}}, "size": %d, "from": %d}`
 | ||||
| 		strSize      = h.GetParameterOrDefault(req, "size", "20") | ||||
| 		strFrom      = h.GetParameterOrDefault(req, "from", "0") | ||||
| 	) | ||||
| 
 | ||||
| 	var ( | ||||
| 		mustQ       []interface{} | ||||
| 	) | ||||
| 
 | ||||
| 	if keyword != "" { | ||||
| 		mustQ = append(mustQ, util.MapStr{ | ||||
| 			"query_string": util.MapStr{ | ||||
| 				"default_field": "*", | ||||
| 				"query":         keyword, | ||||
| 			}, | ||||
| 		}) | ||||
| 	} | ||||
| 	size, _ := strconv.Atoi(strSize) | ||||
| 	if size <= 0 { | ||||
| 		size = 20 | ||||
| 	} | ||||
| 	from, _ := strconv.Atoi(strFrom) | ||||
| 	if from < 0 { | ||||
| 		from = 0 | ||||
| 	} | ||||
| 
 | ||||
| 	queryDSL := util.MapStr{ | ||||
| 		"size": size, | ||||
| 		"from": from, | ||||
| 	} | ||||
| 	if len(mustQ) > 0 { | ||||
| 		queryDSL["query"] = util.MapStr{ | ||||
| 			"bool": util.MapStr{ | ||||
| 				"must": mustQ, | ||||
| 			}, | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	q := orm.Query{} | ||||
| 	q.RawQuery = util.MustToJSONBytes(queryDSL) | ||||
| 
 | ||||
| 	err, res := orm.Search(&agent.Instance{}, &q) | ||||
| 	if err != nil { | ||||
| 		log.Error(err) | ||||
| 		h.WriteError(w, err.Error(), http.StatusInternalServerError) | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	h.Write(w, res.Raw) | ||||
| } | ||||
| 
 | ||||
| func (h *APIHandler) getESNodesInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { | ||||
| 	id := ps.MustGetParameter("instance_id") | ||||
| 	obj := agent.Instance{} | ||||
| 	obj.ID = id | ||||
| 	exists, err := orm.Get(&obj) | ||||
| 	if !exists || err != nil { | ||||
| 		h.WriteJSON(w, util.MapStr{ | ||||
| 			"_id":   id, | ||||
| 			"found": false, | ||||
| 		}, http.StatusNotFound) | ||||
| 		return | ||||
| 	} | ||||
| 	nodesM, err := getNodesInfoFromES(obj.ID) | ||||
| 	if err != nil { | ||||
| 		log.Error(err) | ||||
| 		h.WriteError(w, err.Error(), http.StatusInternalServerError) | ||||
| 		return | ||||
| 	} | ||||
| 	var nodes []*agent.ESNodeInfo | ||||
| 	for _, node := range nodesM { | ||||
| 		nodes = append(nodes, node) | ||||
| 	} | ||||
| 	h.WriteJSON(w, nodes, http.StatusOK) | ||||
| } | ||||
| 
 | ||||
| func (h *APIHandler) refreshESNodesInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ | ||||
| 	id := ps.MustGetParameter("instance_id") | ||||
| 	obj := agent.Instance{} | ||||
| 	obj.ID = id | ||||
| 	exists, err := orm.Get(&obj) | ||||
| 	if !exists || err != nil { | ||||
| 		h.WriteJSON(w, util.MapStr{ | ||||
| 			"_id":   id, | ||||
| 			"found": false, | ||||
| 		}, http.StatusNotFound) | ||||
| 		return | ||||
| 	} | ||||
| 	_, err = refreshNodesInfo(&obj) | ||||
| 	if err != nil { | ||||
| 		log.Error(err) | ||||
| 		h.WriteError(w, err.Error(), http.StatusInternalServerError) | ||||
| 		return | ||||
| 	} | ||||
| 	h.WriteAckOKJSON(w) | ||||
| } | ||||
| 
 | ||||
| func (h *APIHandler) authESNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { | ||||
| 	id := ps.MustGetParameter("instance_id") | ||||
| 	inst := agent.Instance{} | ||||
| 	inst.ID = id | ||||
| 	exists, err := orm.Get(&inst) | ||||
| 	if !exists || err != nil { | ||||
| 		h.WriteJSON(w, util.MapStr{ | ||||
| 			"_id":   id, | ||||
| 			"found": false, | ||||
| 		}, http.StatusNotFound) | ||||
| 		return | ||||
| 	} | ||||
| 	reqBody := struct { | ||||
| 		NodeID string `json:"node_id"` | ||||
| 		ESConfig *elastic.ElasticsearchConfig `json:"es_config"` | ||||
| 	}{} | ||||
| 	err = h.DecodeJSON(req, &reqBody) | ||||
| 	if err != nil { | ||||
| 		log.Error(err) | ||||
| 		h.WriteError(w, err.Error(), http.StatusInternalServerError) | ||||
| 		return | ||||
| 	} | ||||
| 	oldNodeInfo := &agent.ESNodeInfo{ | ||||
| 		ID: reqBody.NodeID, | ||||
| 	} | ||||
| 	exists, err = orm.Get(oldNodeInfo) | ||||
| 	if !exists || err != nil { | ||||
| 		h.WriteJSON(w, fmt.Sprintf("node [%s] of agent [%s] was not found", oldNodeInfo.ID, inst.Name), http.StatusInternalServerError) | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	cfg := reqBody.ESConfig | ||||
| 	if cfg.Endpoint == "" { | ||||
| 		cfg.Endpoint = fmt.Sprintf("%s://%s", cfg.Schema, cfg.Host) | ||||
| 	} | ||||
| 	basicAuth, err := common.GetBasicAuth(cfg) | ||||
| 	if err != nil { | ||||
| 		log.Error(err) | ||||
| 		h.WriteError(w, err.Error(), http.StatusInternalServerError) | ||||
| 		return | ||||
| 	} | ||||
| 	cfg.BasicAuth = &basicAuth | ||||
| 	nodeInfo, err := common2.GetClient().AuthESNode(context.Background(), inst.GetEndpoint(), *cfg) | ||||
| 	if err != nil { | ||||
| 		log.Error(err) | ||||
| 		h.WriteError(w, err.Error(), http.StatusInternalServerError) | ||||
| 		return | ||||
| 	} | ||||
| 	nodeInfo.ID = oldNodeInfo.ID | ||||
| 	nodeInfo.AgentID = inst.ID | ||||
| 	clusterCfgs := getClusterConfigs() | ||||
| 	if nodeInfo.ClusterUuid != "" && clusterCfgs[nodeInfo.ClusterUuid] != nil { | ||||
| 		nodeInfo.ClusterID = clusterCfgs[nodeInfo.ClusterUuid].ID | ||||
| 		settings, err := common2.GetAgentSettings(inst.ID, 0) | ||||
| 		if err != nil { | ||||
| 			log.Error(err) | ||||
| 			h.WriteError(w, err.Error(), http.StatusInternalServerError) | ||||
| 			return | ||||
| 		} | ||||
| 		setting := pickAgentSettings(settings, *nodeInfo) | ||||
| 		if setting == nil { | ||||
| 			setting, err = getAgentTaskSetting(inst.ID, *nodeInfo) | ||||
| 			if err != nil { | ||||
| 				log.Error("get agent task setting error: ", err) | ||||
| 			} | ||||
| 			err = orm.Create(nil, setting) | ||||
| 			if err != nil { | ||||
| 				log.Error("save agent task setting error: ", err) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 	err = orm.Save(nil, nodeInfo) | ||||
| 	if err != nil { | ||||
| 		log.Error(err) | ||||
| 		h.WriteError(w, err.Error(), http.StatusInternalServerError) | ||||
| 		return | ||||
| 	} | ||||
| 	h.WriteJSON(w, nodeInfo, http.StatusOK) | ||||
| } | ||||
| 
 | ||||
| func refreshNodesInfo(inst *agent.Instance) ([]agent.ESNodeInfo, error) { | ||||
| 	nodesInfo, err := common2.GetClient().GetElasticsearchNodes(context.Background(), inst.GetEndpoint()) | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf("get elasticsearch nodes error: %w", err) | ||||
| 	} | ||||
| 	oldNodesInfo, err := getNodesInfoFromES(inst.ID) | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf("get elasticsearch nodes info from es error: %w", err) | ||||
| 	} | ||||
| 	clusterCfgs := getClusterConfigs() | ||||
| 	oldPids := map[int]struct{}{} | ||||
| 	var resultNodes []agent.ESNodeInfo | ||||
| 	settings, err := common2.GetAgentSettings(inst.ID, 0) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	for _, node := range nodesInfo { | ||||
| 		oldNode := getNodeByPidOrUUID(oldNodesInfo, node.ProcessInfo.PID, node.NodeUUID) | ||||
| 		node.AgentID = inst.ID | ||||
| 		if oldNode != nil { | ||||
| 			node.ID = oldNode.ID | ||||
| 			//keep old validate info
 | ||||
| 			if node.ClusterUuid == "" && oldNode.ClusterUuid != "" { | ||||
| 				node = *oldNode | ||||
| 			} | ||||
| 			oldPids[oldNode.ProcessInfo.PID] = struct{}{} | ||||
| 		}else{ | ||||
| 			node.ID = util.GetUUID() | ||||
| 		} | ||||
| 		if node.ClusterUuid != "" { | ||||
| 			if oldNode != nil && oldNode.ClusterID != "" { | ||||
| 				node.ClusterID = oldNode.ClusterID | ||||
| 			}else{ | ||||
| 				if cfg := clusterCfgs[node.ClusterUuid]; cfg != nil { | ||||
| 					node.ClusterID = cfg.ID | ||||
| 					setting := pickAgentSettings(settings, node) | ||||
| 					if setting == nil { | ||||
| 						setting, err = getAgentTaskSetting(inst.ID, node) | ||||
| 						if err != nil { | ||||
| 							log.Error() | ||||
| 						} | ||||
| 						err = orm.Create(nil, setting) | ||||
| 						if err != nil { | ||||
| 							log.Error("save agent task setting error: ", err) | ||||
| 						} | ||||
| 					} | ||||
| 				}else{ | ||||
| 					//cluster not registered in console
 | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 
 | ||||
| 		node.Status = "online" | ||||
| 		err = orm.Save(nil, node) | ||||
| 		if err != nil { | ||||
| 			log.Error("save node info error: ", err) | ||||
| 		} | ||||
| 		resultNodes = append(resultNodes, node) | ||||
| 	} | ||||
| 	for k, node := range oldNodesInfo { | ||||
| 		if _, ok := oldPids[k]; !ok { | ||||
| 			//auto delete not associated cluster
 | ||||
| 			if node.ClusterID == "" { | ||||
| 				log.Info("delete node with pid: ", node.ProcessInfo.PID) | ||||
| 				err = orm.Delete(nil, node) | ||||
| 				if err != nil { | ||||
| 					log.Error("delete node info error: ", err) | ||||
| 				} | ||||
| 				continue | ||||
| 			} | ||||
| 			node.Status = "offline" | ||||
| 			err = orm.Save(nil, node) | ||||
| 			if err != nil { | ||||
| 				log.Error("save node info error: ", err) | ||||
| 			} | ||||
| 			resultNodes = append(resultNodes, *node) | ||||
| 		} | ||||
| 	} | ||||
| 	return resultNodes, nil | ||||
| } | ||||
| 
 | ||||
| func getNodeByPidOrUUID(nodes map[int]*agent.ESNodeInfo, pid int, uuid string) *agent.ESNodeInfo{ | ||||
| 	if nodes[pid] != nil { | ||||
| 		return nodes[pid] | ||||
| 	} | ||||
| 	for _, node := range nodes { | ||||
| 		if node.NodeUUID != "" && node.NodeUUID == uuid { | ||||
| 			return node | ||||
| 		} | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func getNodesInfoFromES(agentID string) (map[int]*agent.ESNodeInfo, error){ | ||||
| 	query := util.MapStr{ | ||||
| 		"size": 100, | ||||
| 		"query": util.MapStr{ | ||||
| 			"term": util.MapStr{ | ||||
| 				"agent_id": util.MapStr{ | ||||
| 					"value": agentID, | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
| 	q := orm.Query{ | ||||
| 		RawQuery: util.MustToJSONBytes(query), | ||||
| 	} | ||||
| 
 | ||||
| 	err, result := orm.Search(agent.ESNodeInfo{}, &q) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	nodesInfo := map[int]*agent.ESNodeInfo{} | ||||
| 	for _, row := range result.Result { | ||||
| 		node := agent.ESNodeInfo{} | ||||
| 		buf := util.MustToJSONBytes(row) | ||||
| 		util.MustFromJSONBytes(buf, &node) | ||||
| 		nodesInfo[node.ProcessInfo.PID] = &node | ||||
| 	} | ||||
| 	return nodesInfo, nil | ||||
| } | ||||
| 
 | ||||
| func getClusterConfigs() map[string]*elastic.ElasticsearchConfig { | ||||
| 	cfgs := map[string]*elastic.ElasticsearchConfig{} | ||||
| 	elastic.WalkConfigs(func(key, value interface{}) bool { | ||||
| 		if cfg, ok := value.(*elastic.ElasticsearchConfig); ok { | ||||
| 			//todo handle clusterUUID is empty
 | ||||
| 			cfgs[cfg.ClusterUUID] = cfg | ||||
| 		} | ||||
| 		return true | ||||
| 	}) | ||||
| 	return cfgs | ||||
| } | ||||
| 
 | ||||
| func pickAgentSettings(settings []agent.Setting, nodeInfo agent.ESNodeInfo) *agent.Setting { | ||||
| 	for _, setting := range settings { | ||||
| 		if setting.Metadata.Labels["node_uuid"] == nodeInfo.NodeUUID { | ||||
| 			return &setting | ||||
| 		} | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func getAgentTaskSetting(agentID string, node agent.ESNodeInfo) (*agent.Setting, error){ | ||||
| 	taskSetting, err := getSettingsByClusterID(node.ClusterID) | ||||
| 	if err != nil { | ||||
| 		return  nil, err | ||||
| 	} | ||||
| 	return &agent.Setting{ | ||||
| 		Metadata: agent.SettingsMetadata{ | ||||
| 			Category: "agent", | ||||
| 			Name: "task", | ||||
| 			Labels: util.MapStr{ | ||||
| 				"agent_id": agentID, | ||||
| 				"cluster_uuid": node.ClusterUuid, | ||||
| 				"cluster_id": node.ClusterID, | ||||
| 				"node_uuid": node.NodeUUID, | ||||
| 				"endpoint": fmt.Sprintf("%s://%s", node.Schema, node.PublishAddress), | ||||
| 			}, | ||||
| 		}, | ||||
| 		Payload: util.MapStr{ | ||||
| 			"task": taskSetting, | ||||
| 		}, | ||||
| 	}, nil | ||||
| } | ||||
| 
 | ||||
| // getSettingsByClusterID query agent task settings with cluster id
 | ||||
| func getSettingsByClusterID(clusterID string) (*common2.TaskSetting, error) { | ||||
| 	queryDsl := util.MapStr{ | ||||
| 		"size": 200, | ||||
| 		"query": util.MapStr{ | ||||
| 			"bool": util.MapStr{ | ||||
| 				"must": []util.MapStr{ | ||||
| 					{ | ||||
| 						"term": util.MapStr{ | ||||
| 							"metadata.labels.cluster_id": util.MapStr{ | ||||
| 								"value": clusterID, | ||||
| 							}, | ||||
| 						}, | ||||
| 					}, | ||||
| 				}, | ||||
| 				"should": []util.MapStr{ | ||||
| 					{ | ||||
| 						"term": util.MapStr{ | ||||
| 							"payload.task.cluster_health": util.MapStr{ | ||||
| 								"value": true, | ||||
| 							}, | ||||
| 						}, | ||||
| 					}, | ||||
| 					{ | ||||
| 						"term": util.MapStr{ | ||||
| 							"payload.task.cluster_stats": util.MapStr{ | ||||
| 								"value": true, | ||||
| 							}, | ||||
| 						}, | ||||
| 					}, | ||||
| 					{ | ||||
| 						"term": util.MapStr{ | ||||
| 							"payload.task.index_stats": util.MapStr{ | ||||
| 								"value": true, | ||||
| 							}, | ||||
| 						}, | ||||
| 					}, | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
| 	q := orm.Query{ | ||||
| 		RawQuery: util.MustToJSONBytes(queryDsl), | ||||
| 	} | ||||
| 	err, result := orm.Search(agent.Setting{}, &q) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	setting := &common2.TaskSetting{ | ||||
| 		NodeStats: &common2.NodeStatsTask{ | ||||
| 			Enabled: true, | ||||
| 		}, | ||||
| 	} | ||||
| 	var ( | ||||
| 		clusterStats = true | ||||
| 		indexStats = true | ||||
| 		clusterHealth = true | ||||
| 	) | ||||
| 	keys := []string{"payload.task.cluster_stats", "payload.task.cluster_health", "payload.task.index_stats"} | ||||
| 	for _, row := range result.Result { | ||||
| 		if v, ok := row.(map[string]interface{}); ok { | ||||
| 			vm := util.MapStr(v) | ||||
| 			for _, key := range keys { | ||||
| 				tv, _ := vm.GetValue(key) | ||||
| 				if tv  == true { | ||||
| 					switch key { | ||||
| 					case "payload.task.cluster_stats": | ||||
| 						clusterStats = false | ||||
| 					case "payload.task.index_stats": | ||||
| 						indexStats = false | ||||
| 					case "payload.task.cluster_health": | ||||
| 						clusterHealth = false | ||||
| 					} | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 	if clusterStats { | ||||
| 		setting.ClusterStats = &common2.ClusterStatsTask{ | ||||
| 			Enabled: true, | ||||
| 		} | ||||
| 	} | ||||
| 	if indexStats { | ||||
| 		setting.IndexStats = &common2.IndexStatsTask{ | ||||
| 			Enabled: true, | ||||
| 		} | ||||
| 	} | ||||
| 	if clusterHealth { | ||||
| 		setting.ClusterHealth = &common2.ClusterHealthTask{ | ||||
| 			Enabled: true, | ||||
| 		} | ||||
| 	} | ||||
| 	return setting, nil | ||||
| } | ||||
|  | @ -0,0 +1,115 @@ | |||
| /* Copyright © INFINI Ltd. All rights reserved. | ||||
|  * Web: https://infinilabs.com
 | ||||
|  * Email: hello#infini.ltd */ | ||||
| 
 | ||||
| package api | ||||
| 
 | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	log "github.com/cihub/seelog" | ||||
| 	"infini.sh/console/modules/agent/common" | ||||
| 	"infini.sh/framework/core/agent" | ||||
| 	httprouter "infini.sh/framework/core/api/router" | ||||
| 	"infini.sh/framework/core/orm" | ||||
| 	"infini.sh/framework/core/util" | ||||
| 	"net/http" | ||||
| ) | ||||
| 
 | ||||
| func (h *APIHandler) getLogFilesByNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { | ||||
| 	nodeID := ps.MustGetParameter("node_id") | ||||
| 	inst, node, err := getAgentByNodeID(nodeID) | ||||
| 	if err != nil { | ||||
| 		h.WriteError(w, err.Error(), http.StatusInternalServerError) | ||||
| 		log.Error(err) | ||||
| 		return | ||||
| 	} | ||||
| 	if inst == nil { | ||||
| 		log.Error(fmt.Sprintf("can not find agent by node [%s]", nodeID)) | ||||
| 		h.WriteJSON(w, util.MapStr{ | ||||
| 			"success": false, | ||||
| 			"reason": "AGENT_NOT_FOUND", | ||||
| 		}, http.StatusOK) | ||||
| 		return | ||||
| 	} | ||||
| 	logFiles, err := common.GetClient().GetElasticLogFiles(nil, inst.GetEndpoint(), node.Path.Logs) | ||||
| 	if err != nil { | ||||
| 		h.WriteError(w, err.Error(), http.StatusInternalServerError) | ||||
| 		log.Error(err) | ||||
| 		return | ||||
| 	} | ||||
| 	h.WriteJSON(w, util.MapStr{ | ||||
| 		"success": true, | ||||
| 		"log_files": logFiles, | ||||
| 	}, http.StatusOK) | ||||
| } | ||||
| 
 | ||||
| func (h *APIHandler) getLogFileContent(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { | ||||
| 	nodeID := ps.MustGetParameter("node_id") | ||||
| 	inst, node, err := getAgentByNodeID(nodeID) | ||||
| 	if err != nil { | ||||
| 		h.WriteError(w, err.Error(), http.StatusInternalServerError) | ||||
| 		log.Error(err) | ||||
| 		return | ||||
| 	} | ||||
| 	if inst == nil { | ||||
| 		h.WriteError(w, fmt.Sprintf("can not find agent by node [%s]", nodeID), http.StatusInternalServerError) | ||||
| 		return | ||||
| 	} | ||||
| 	reqBody := struct { | ||||
| 		FileName string `json:"file_name"` | ||||
| 		LogsPath string `json:"logs_path"` | ||||
| 		Offset int `json:"offset"` | ||||
| 		Lines int `json:"lines"` | ||||
| 	}{} | ||||
| 	err = h.DecodeJSON(req, &reqBody) | ||||
| 	if err != nil { | ||||
| 		h.WriteError(w, err.Error(), http.StatusInternalServerError) | ||||
| 		log.Error(err) | ||||
| 		return | ||||
| 	} | ||||
| 	reqBody.LogsPath = node.Path.Logs | ||||
| 	sm := common.GetStateManager() | ||||
| 	res, err := sm.GetAgentClient().GetElasticLogFileContent(nil, inst.GetEndpoint(), reqBody) | ||||
| 	if err != nil { | ||||
| 		h.WriteError(w, err.Error(), http.StatusInternalServerError) | ||||
| 		log.Error(err) | ||||
| 		return | ||||
| 	} | ||||
| 	h.WriteJSON(w, res, http.StatusOK) | ||||
| } | ||||
| 
 | ||||
| func getAgentByNodeID(nodeID string) (*agent.Instance, *agent.ESNodeInfo, error){ | ||||
| 	queryDsl := util.MapStr{ | ||||
| 		"size":1, | ||||
| 		"query": util.MapStr{ | ||||
| 			"term": util.MapStr{ | ||||
| 				"node_uuid": util.MapStr{ | ||||
| 					"value": nodeID, | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
| 	q := &orm.Query{ | ||||
| 		RawQuery: util.MustToJSONBytes(queryDsl), | ||||
| 	} | ||||
| 	err, result := orm.Search(agent.ESNodeInfo{}, q) | ||||
| 	if err != nil { | ||||
| 		return nil,nil, err | ||||
| 	} | ||||
| 	if len(result.Result) > 0 { | ||||
| 		buf := util.MustToJSONBytes(result.Result[0]) | ||||
| 		node := &agent.ESNodeInfo{} | ||||
| 		err = util.FromJSONBytes(buf, node) | ||||
| 		inst := &agent.Instance{} | ||||
| 		inst.ID = node.AgentID | ||||
| 		_, err = orm.Get(inst) | ||||
| 		if err != nil { | ||||
| 			return nil, node, err | ||||
| 		} | ||||
| 		if inst.Name == "" { | ||||
| 			return nil, node, nil | ||||
| 		} | ||||
| 		return inst, node, nil | ||||
| 	} | ||||
| 	return nil, nil, nil | ||||
| } | ||||
|  | @ -0,0 +1,222 @@ | |||
| /* Copyright © INFINI Ltd. All rights reserved. | ||||
|  * Web: https://infinilabs.com
 | ||||
|  * Email: hello#infini.ltd */ | ||||
| 
 | ||||
| package common | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"infini.sh/framework/core/agent" | ||||
| 	"infini.sh/framework/core/elastic" | ||||
| 	"infini.sh/framework/core/host" | ||||
| 	"infini.sh/framework/core/util" | ||||
| 	"net/http" | ||||
| ) | ||||
| 
 | ||||
| type Client struct { | ||||
| } | ||||
| 
 | ||||
| func (client *Client) GetHostInfo(ctx context.Context, agentBaseURL string) (*host.HostInfo, error) { | ||||
| 	req := &util.Request{ | ||||
| 		Method:  http.MethodGet, | ||||
| 		Url:     fmt.Sprintf("%s/agent/host/_basic", agentBaseURL), | ||||
| 		Context: ctx, | ||||
| 	} | ||||
| 	resBody := struct { | ||||
| 		Success bool `json:"success"` | ||||
| 		Error string `json:"error"` | ||||
| 		HostInfo *host.HostInfo `json:"result"` | ||||
| 	}{} | ||||
| 	err := client.doRequest(req, &resBody) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	if resBody.Success != true { | ||||
| 		return nil, fmt.Errorf("enroll error from client: %v", resBody.Error) | ||||
| 	} | ||||
| 	return resBody.HostInfo, nil | ||||
| } | ||||
| 
 | ||||
| func (client *Client) DiscoveredHost(ctx context.Context, agentBaseURL string, body interface{}) error { | ||||
| 	req := &util.Request{ | ||||
| 		Method:  http.MethodPut, | ||||
| 		Url:     fmt.Sprintf("%s/host/discover", agentBaseURL), | ||||
| 		Context: ctx, | ||||
| 	} | ||||
| 	reqBody, err := util.ToJSONBytes(body) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	req.Body = reqBody | ||||
| 	resBody := map[string]interface{}{} | ||||
| 	err = client.doRequest(req, &resBody) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	if resBody["success"] != true { | ||||
| 		return fmt.Errorf("discover host callback error: %v", resBody["error"]) | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (client *Client) GetElasticProcess(ctx context.Context, agentBaseURL string, agentID string)(interface{}, error) { | ||||
| 	req := &util.Request{ | ||||
| 		Method:  http.MethodGet, | ||||
| 		Url:     fmt.Sprintf("%s/elasticsearch/%s/process/_elastic", agentBaseURL, agentID), | ||||
| 		Context: ctx, | ||||
| 	} | ||||
| 	resBody := map[string]interface{}{} | ||||
| 	err := client.doRequest(req, &resBody) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	if resBody["success"] != true { | ||||
| 		return nil, fmt.Errorf("discover host callback error: %v", resBody["error"]) | ||||
| 	} | ||||
| 	return resBody["elastic_process"], nil | ||||
| } | ||||
| 
 | ||||
| func (client *Client) GetElasticLogFiles(ctx context.Context, agentBaseURL string, logsPath string)(interface{}, error) { | ||||
| 	reqBody := util.MustToJSONBytes(util.MapStr{ | ||||
| 		"logs_path": logsPath, | ||||
| 	}) | ||||
| 	req := &util.Request{ | ||||
| 		Method:  http.MethodPost, | ||||
| 		Url:     fmt.Sprintf("%s/agent/logs/elastic/list", agentBaseURL), | ||||
| 		Context: ctx, | ||||
| 		Body: reqBody, | ||||
| 	} | ||||
| 	resBody := map[string]interface{}{} | ||||
| 	err := client.doRequest(req, &resBody) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	if resBody["success"] != true { | ||||
| 		return nil, fmt.Errorf("get elasticsearch log files error: %v", resBody["error"]) | ||||
| 	} | ||||
| 	return resBody["result"], nil | ||||
| } | ||||
| 
 | ||||
| func (client *Client) GetElasticLogFileContent(ctx context.Context, agentBaseURL string, body interface{})(interface{}, error) { | ||||
| 	req := &util.Request{ | ||||
| 		Method:  http.MethodPost, | ||||
| 		Url:     fmt.Sprintf("%s/agent/logs/elastic/_read", agentBaseURL), | ||||
| 		Context: ctx, | ||||
| 		Body: util.MustToJSONBytes(body), | ||||
| 	} | ||||
| 	resBody := map[string]interface{}{} | ||||
| 	err := client.doRequest(req, &resBody) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	if resBody["success"] != true { | ||||
| 		return nil, fmt.Errorf("get elasticsearch log files error: %v", resBody["error"]) | ||||
| 	} | ||||
| 	var hasMore bool | ||||
| 	if v, ok := resBody["EOF"].(bool); ok && !v { | ||||
| 		hasMore = true | ||||
| 	} | ||||
| 	return map[string]interface{}{ | ||||
| 		"lines": resBody["result"], | ||||
| 		"has_more": hasMore, | ||||
| 	} , nil | ||||
| } | ||||
| 
 | ||||
| func (client *Client) GetInstanceBasicInfo(ctx context.Context, agentBaseURL string) (*agent.Instance, error){ | ||||
| 	req := &util.Request{ | ||||
| 		Method:  http.MethodGet, | ||||
| 		Url:     fmt.Sprintf("%s/agent/_info", agentBaseURL ), | ||||
| 		Context: ctx, | ||||
| 	} | ||||
| 	resBody := &agent.Instance{} | ||||
| 	err := client.doRequest(req, &resBody) | ||||
| 	return resBody, err | ||||
| } | ||||
| 
 | ||||
| func (client *Client) RegisterElasticsearch(ctx context.Context, agentBaseURL string, cfgs []elastic.ElasticsearchConfig) error { | ||||
| 	reqBody, err := util.ToJSONBytes(cfgs) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	req := &util.Request{ | ||||
| 		Method:  http.MethodPost, | ||||
| 		Url:     fmt.Sprintf("%s/elasticsearch/_register", agentBaseURL ), | ||||
| 		Context: ctx, | ||||
| 		Body: reqBody, | ||||
| 	} | ||||
| 	resBody := util.MapStr{} | ||||
| 	err = client.doRequest(req, &resBody) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	if resBody["acknowledged"] != true { | ||||
| 		return fmt.Errorf("%v", resBody["error"]) | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (client *Client) GetElasticsearchNodes(ctx context.Context, agentBaseURL string) ([]agent.ESNodeInfo, error) { | ||||
| 	req := &util.Request{ | ||||
| 		Method:  http.MethodGet, | ||||
| 		Url:     fmt.Sprintf("%s/elasticsearch/_nodes", agentBaseURL ), | ||||
| 		Context: ctx, | ||||
| 	} | ||||
| 	resBody := []agent.ESNodeInfo{} | ||||
| 	err := client.doRequest(req, &resBody) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	return resBody, nil | ||||
| } | ||||
| 
 | ||||
| func (client *Client) AuthESNode(ctx context.Context, agentBaseURL string, cfg elastic.ElasticsearchConfig) (*agent.ESNodeInfo, error) { | ||||
| 	reqBody, err := util.ToJSONBytes(cfg) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	req := &util.Request{ | ||||
| 		Method:  http.MethodPost, | ||||
| 		Url:     fmt.Sprintf("%s/elasticsearch/_auth", agentBaseURL ), | ||||
| 		Context: ctx, | ||||
| 		Body: reqBody, | ||||
| 	} | ||||
| 	resBody := &agent.ESNodeInfo{} | ||||
| 	err = client.doRequest(req, resBody) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	return resBody, nil | ||||
| } | ||||
| 
 | ||||
| func (client *Client) CreatePipeline(ctx context.Context, agentBaseURL string, body []byte) error{ | ||||
| 	req := &util.Request{ | ||||
| 		Method: http.MethodPost, | ||||
| 		Url:    agentBaseURL + "/pipeline/tasks/", | ||||
| 		Body: body, | ||||
| 		Context: ctx, | ||||
| 	} | ||||
| 	resBody := util.MapStr{} | ||||
| 	return client.doRequest(req, &resBody) | ||||
| } | ||||
| 
 | ||||
| func (client *Client) DeletePipeline(ctx context.Context, agentBaseURL, pipelineID string) error{ | ||||
| 	req := &util.Request{ | ||||
| 		Method: http.MethodDelete, | ||||
| 		Url:     fmt.Sprintf("%s/pipeline/task/%s", agentBaseURL, pipelineID), | ||||
| 		Context: ctx, | ||||
| 	} | ||||
| 	return client.doRequest(req, nil) | ||||
| } | ||||
| 
 | ||||
| func (client *Client) doRequest(req *util.Request, respObj interface{}) error { | ||||
| 	result, err := util.ExecuteRequest(req) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	if result.StatusCode != 200 { | ||||
| 		return fmt.Errorf(string(result.Body)) | ||||
| 	} | ||||
| 	return util.FromJSONBytes(result.Body, respObj) | ||||
| } | ||||
|  | @ -0,0 +1,21 @@ | |||
| /* Copyright © INFINI Ltd. All rights reserved. | ||||
|  * Web: https://infinilabs.com
 | ||||
|  * Email: hello#infini.ltd */ | ||||
| 
 | ||||
| package common | ||||
| 
 | ||||
| type AgentConfig struct { | ||||
| 	Enabled bool `config:"enabled"` | ||||
| 	StateManager struct{ | ||||
| 		Enabled bool `config:"enabled"` | ||||
| 	} `config:"state_manager"` | ||||
| 	Setup SetupConfig `config:"setup"` | ||||
| } | ||||
| 
 | ||||
| type SetupConfig struct { | ||||
| 	DownloadURL string `config:"download_url"` | ||||
| 	Version string `config:"version"` | ||||
| 	CACertFile string `config:"ca_cert"` | ||||
| 	CAKeyFile string `config:"ca_key"` | ||||
| 	ScriptEndpoint string `config:"script_endpoint"` | ||||
| } | ||||
|  | @ -0,0 +1,229 @@ | |||
| /* Copyright © INFINI Ltd. All rights reserved. | ||||
|  * Web: https://infinilabs.com
 | ||||
|  * Email: hello#infini.ltd */ | ||||
| 
 | ||||
| package common | ||||
| 
 | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"infini.sh/framework/core/agent" | ||||
| 	"infini.sh/framework/core/elastic" | ||||
| 	"infini.sh/framework/core/orm" | ||||
| 	"infini.sh/framework/core/util" | ||||
| ) | ||||
| 
 | ||||
| func ParseAgentSettings(settings []agent.Setting)(*ParseAgentSettingsResult, error){ | ||||
| 	var clusterCfgs []elastic.ElasticsearchConfig | ||||
| 	var ( | ||||
| 		pipelines []util.MapStr | ||||
| 		toDeletePipelineNames []string | ||||
| 	) | ||||
| 	for _, setting := range settings { | ||||
| 		if setting.Metadata.Labels == nil { | ||||
| 			return nil, fmt.Errorf("empty metadata labels of setting [%s]", setting.ID) | ||||
| 		} | ||||
| 		var ( | ||||
| 			clusterID string | ||||
| 			ok bool | ||||
| 		) | ||||
| 		if clusterID, ok = setting.Metadata.Labels["cluster_id"].(string); ok && clusterID != ""{ | ||||
| 			cfg := elastic.GetConfig(clusterID) | ||||
| 			newCfg := elastic.ElasticsearchConfig{ | ||||
| 				Enabled: true, | ||||
| 				Name: cfg.Name, | ||||
| 				BasicAuth: cfg.BasicAuth, | ||||
| 				Endpoint: setting.Metadata.Labels["endpoint"].(string), | ||||
| 			} | ||||
| 			newCfg.ID = clusterID | ||||
| 			clusterCfgs = append(clusterCfgs, newCfg) | ||||
| 		}else{ | ||||
| 			return nil, fmt.Errorf("got wrong cluster id [%v] from metadata labels", setting.Metadata.Labels["cluster_id"]) | ||||
| 		} | ||||
| 		nodeUUID := util.ToString(setting.Metadata.Labels["node_uuid"]) | ||||
| 
 | ||||
| 		taskCfg, err := util.MapStr(setting.Payload).GetValue("task") | ||||
| 		if err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 		vBytes, err := util.ToJSONBytes(taskCfg) | ||||
| 		if err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 		taskSetting := TaskSetting{} | ||||
| 		err = util.FromJSONBytes(vBytes, &taskSetting) | ||||
| 		if err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 		partPipelines, partDeletePipelineNames, err := TransformSettingsToConfig(&taskSetting, clusterID, nodeUUID) | ||||
| 		if err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 		pipelines = append(pipelines, partPipelines...) | ||||
| 		toDeletePipelineNames = append(toDeletePipelineNames, partDeletePipelineNames...) | ||||
| 	} | ||||
| 	return &ParseAgentSettingsResult{ | ||||
| 		ClusterConfigs: clusterCfgs, | ||||
| 		Pipelines: pipelines, | ||||
| 		ToDeletePipelineNames: toDeletePipelineNames, | ||||
| 	}, nil | ||||
| } | ||||
| 
 | ||||
| func GetAgentSettings(agentID string, timestamp int64) ([]agent.Setting, error) { | ||||
| 	queryDsl := util.MapStr{ | ||||
| 		"size": 500, | ||||
| 		"query": util.MapStr{ | ||||
| 			"bool": util.MapStr{ | ||||
| 				"must": []util.MapStr{ | ||||
| 					{ | ||||
| 						"term": util.MapStr{ | ||||
| 							"metadata.category": util.MapStr{ | ||||
| 								"value": "agent", | ||||
| 							}, | ||||
| 						}, | ||||
| 					}, | ||||
| 					{ | ||||
| 						"term": util.MapStr{ | ||||
| 							"metadata.name": util.MapStr{ | ||||
| 								"value": "task", | ||||
| 							}, | ||||
| 						}, | ||||
| 					}, | ||||
| 					{ | ||||
| 						"term": util.MapStr{ | ||||
| 							"metadata.labels.agent_id": util.MapStr{ | ||||
| 								"value": agentID, | ||||
| 							}, | ||||
| 						}, | ||||
| 					}, | ||||
| 					{ | ||||
| 						"range": util.MapStr{ | ||||
| 							"updated": util.MapStr{ | ||||
| 								"gt": timestamp, | ||||
| 							}, | ||||
| 						}, | ||||
| 					}, | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
| 	q := orm.Query{ | ||||
| 		RawQuery: util.MustToJSONBytes(queryDsl), | ||||
| 	} | ||||
| 	err, result := orm.Search(agent.Setting{}, &q) | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf("search settings error: %w", err) | ||||
| 	} | ||||
| 	if len(result.Result) == 0 { | ||||
| 		return nil, nil | ||||
| 	} | ||||
| 	var settings []agent.Setting | ||||
| 	for _, row := range result.Result { | ||||
| 		setting := agent.Setting{} | ||||
| 		buf, err := util.ToJSONBytes(row) | ||||
| 		if err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 		err = util.FromJSONBytes(buf, &setting) | ||||
| 		if err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 		settings = append(settings, setting) | ||||
| 	} | ||||
| 	return settings, nil | ||||
| } | ||||
| 
 | ||||
| func TransformSettingsToConfig(setting *TaskSetting, clusterID, nodeUUID string) ([]util.MapStr, []string, error) { | ||||
| 	if setting == nil { | ||||
| 		return nil, nil, fmt.Errorf("empty setting") | ||||
| 	} | ||||
| 	var ( | ||||
| 		pipelines []util.MapStr | ||||
| 		toDeletePipelineNames []string | ||||
| 	) | ||||
| 	if setting.ClusterStats != nil { | ||||
| 		var processorName = "es_cluster_stats" | ||||
| 		if setting.ClusterStats.Enabled { | ||||
| 			pipelineCfg, err := newClusterMetricPipeline(processorName, clusterID) | ||||
| 			if err != nil { | ||||
| 				return nil, nil, err | ||||
| 			} | ||||
| 			pipelines = append(pipelines, pipelineCfg) | ||||
| 		}else{ | ||||
| 			toDeletePipelineNames = append(toDeletePipelineNames, getMetricPipelineName(clusterID, processorName)) | ||||
| 		} | ||||
| 	} | ||||
| 	if setting.IndexStats != nil { | ||||
| 		var processorName = "es_index_stats" | ||||
| 		if setting.IndexStats.Enabled { | ||||
| 			pipelineCfg, err := newClusterMetricPipeline(processorName, clusterID) | ||||
| 			if err != nil { | ||||
| 				return nil, nil, err | ||||
| 			} | ||||
| 			pipelines = append(pipelines, pipelineCfg) | ||||
| 		}else{ | ||||
| 			toDeletePipelineNames = append(toDeletePipelineNames, getMetricPipelineName(clusterID, processorName)) | ||||
| 		} | ||||
| 	} | ||||
| 	if setting.ClusterHealth != nil { | ||||
| 		var processorName = "es_cluster_health" | ||||
| 		if setting.ClusterHealth.Enabled { | ||||
| 			pipelineCfg, err := newClusterMetricPipeline(processorName, clusterID) | ||||
| 			if err != nil { | ||||
| 				return nil, nil, err | ||||
| 			} | ||||
| 			pipelines = append(pipelines, pipelineCfg) | ||||
| 		}else{ | ||||
| 			toDeletePipelineNames = append(toDeletePipelineNames, getMetricPipelineName(clusterID, processorName)) | ||||
| 		} | ||||
| 	} | ||||
| 	if setting.NodeStats != nil { | ||||
| 		var processorName = "es_node_stats" | ||||
| 		if setting.NodeStats.Enabled { | ||||
| 			params := util.MapStr{ | ||||
| 				"elasticsearch": clusterID, | ||||
| 			} | ||||
| 			if len(setting.NodeStats.NodeIDs) > 0{ | ||||
| 				params["node_uuids"] = setting.NodeStats.NodeIDs | ||||
| 			} | ||||
| 			cfg := util.MapStr{ | ||||
| 				processorName: params, | ||||
| 			} | ||||
| 			enabled := true | ||||
| 			pipelineCfg := util.MapStr{ | ||||
| 				"enabled": &enabled, | ||||
| 				"name": getMetricPipelineName(nodeUUID, processorName), | ||||
| 				"auto_start": true, | ||||
| 				"keep_running": true, | ||||
| 				"retry_delay_in_ms": 10000, | ||||
| 				"processor": []util.MapStr{cfg}, | ||||
| 			} | ||||
| 			pipelines = append(pipelines, pipelineCfg) | ||||
| 		}else{ | ||||
| 			toDeletePipelineNames = append(toDeletePipelineNames, getMetricPipelineName(nodeUUID, processorName)) | ||||
| 		} | ||||
| 	} | ||||
| 	return pipelines, toDeletePipelineNames, nil | ||||
| } | ||||
| 
 | ||||
| func newClusterMetricPipeline(processorName string, clusterID string)(util.MapStr, error){ | ||||
| 	cfg := util.MapStr{ | ||||
| 		processorName: util.MapStr{ | ||||
| 			"elasticsearch": clusterID, | ||||
| 		}, | ||||
| 	} | ||||
| 	enabled := true | ||||
| 	pipelineCfg := util.MapStr{ | ||||
| 		"enabled": &enabled, | ||||
| 		"name": getMetricPipelineName(clusterID, processorName), | ||||
| 		"auto_start": true, | ||||
| 		"keep_running": true, | ||||
| 		"retry_delay_in_ms": 10000, | ||||
| 		"processor": []util.MapStr{cfg}, | ||||
| 	} | ||||
| 	return pipelineCfg, nil | ||||
| } | ||||
| 
 | ||||
| func getMetricPipelineName(clusterID, processorName string) string{ | ||||
| 	return fmt.Sprintf("collect_%s_%s", clusterID, processorName) | ||||
| } | ||||
| 
 | ||||
|  | @ -0,0 +1,41 @@ | |||
| /* Copyright © INFINI Ltd. All rights reserved. | ||||
|  * Web: https://infinilabs.com
 | ||||
|  * Email: hello#infini.ltd */ | ||||
| 
 | ||||
| package common | ||||
| 
 | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"gopkg.in/yaml.v2" | ||||
| 	"infini.sh/framework/core/util" | ||||
| 	"testing" | ||||
| ) | ||||
| 
 | ||||
| func TestTransformSettingsToConfig(t *testing.T) { | ||||
| 	setting := TaskSetting{ | ||||
| 		ClusterHealth: ClusterHealthTask{ | ||||
| 			Enabled: true, | ||||
| 		}, | ||||
| 		ClusterStats: ClusterStatsTask { | ||||
| 			Enabled: true, | ||||
| 		}, | ||||
| 		IndexStats: IndexStatsTask{ | ||||
| 			Enabled: true, | ||||
| 		}, | ||||
| 		NodeStats: NodeStatsTask{ | ||||
| 			Enabled: true, | ||||
| 			NodeIDs: []string{"ddddnnnn"}, | ||||
| 		}, | ||||
| 	} | ||||
| 	pipelines, err := transformSettingsToConfig(&setting, "testxxx") | ||||
| 	if err !=nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 	buf, err := yaml.Marshal(util.MapStr{ | ||||
| 		"pipeline": pipelines, | ||||
| 	}) | ||||
| 	if err !=nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 	fmt.Println(string(buf)) | ||||
| } | ||||
|  | @ -0,0 +1,66 @@ | |||
| /* Copyright © INFINI Ltd. All rights reserved. | ||||
|  * Web: https://infinilabs.com
 | ||||
|  * Email: hello#infini.ltd */ | ||||
| 
 | ||||
| package common | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"infini.sh/framework/core/agent" | ||||
| 	"infini.sh/framework/core/elastic" | ||||
| 	"infini.sh/framework/core/host" | ||||
| ) | ||||
| 
 | ||||
| var defaultClient ClientAPI | ||||
| 
 | ||||
| func GetClient() ClientAPI { | ||||
| 	if defaultClient == nil { | ||||
| 		panic("agent client not init") | ||||
| 	} | ||||
| 	return defaultClient | ||||
| } | ||||
| 
 | ||||
| func RegisterClient(client ClientAPI) { | ||||
| 	defaultClient = client | ||||
| } | ||||
| type ClientAPI interface { | ||||
| 	GetHostInfo(ctx context.Context, agentBaseURL string) (*host.HostInfo, error) | ||||
| 	DiscoveredHost(ctx context.Context, agentBaseURL string, body interface{}) error | ||||
| 	GetElasticProcess(ctx context.Context, agentBaseURL string, agentID string)(interface{}, error) | ||||
| 	GetElasticLogFiles(ctx context.Context, agentBaseURL string, logsPath string)(interface{}, error) | ||||
| 	GetElasticLogFileContent(ctx context.Context, agentBaseURL string, body interface{})(interface{}, error) | ||||
| 	GetInstanceBasicInfo(ctx context.Context, agentBaseURL string) (*agent.Instance, error) | ||||
| 	RegisterElasticsearch(ctx context.Context, agentBaseURL string, cfgs []elastic.ElasticsearchConfig) error | ||||
| 	GetElasticsearchNodes(ctx context.Context, agentBaseURL string) ([]agent.ESNodeInfo, error) | ||||
| 	AuthESNode(ctx context.Context, agentBaseURL string, cfg elastic.ElasticsearchConfig) (*agent.ESNodeInfo, error) | ||||
| 	CreatePipeline(ctx context.Context, agentBaseURL string, body []byte) error | ||||
| 	DeletePipeline(ctx context.Context, agentBaseURL, pipelineID string) error | ||||
| } | ||||
| 
 | ||||
| 
 | ||||
| var stateManager IStateManager | ||||
| 
 | ||||
| func GetStateManager() IStateManager { | ||||
| 	if stateManager == nil { | ||||
| 		panic("agent state manager not init") | ||||
| 	} | ||||
| 	return stateManager | ||||
| } | ||||
| 
 | ||||
| func RegisterStateManager(sm IStateManager) { | ||||
| 	stateManager = sm | ||||
| } | ||||
| 
 | ||||
| func IsEnabled() bool { | ||||
| 	return stateManager != nil | ||||
| } | ||||
| 
 | ||||
| type IStateManager interface { | ||||
| 	GetAgent(ID string) (*agent.Instance, error) | ||||
| 	UpdateAgent(inst *agent.Instance, syncToES bool) (*agent.Instance, error) | ||||
| 	GetTaskAgent(clusterID string) (*agent.Instance, error) | ||||
| 	DeleteAgent(agentID string) error | ||||
| 	LoopState() | ||||
| 	Stop() | ||||
| 	GetAgentClient() ClientAPI | ||||
| } | ||||
|  | @ -0,0 +1,45 @@ | |||
| /* Copyright © INFINI Ltd. All rights reserved. | ||||
|  * Web: https://infinilabs.com
 | ||||
|  * Email: hello#infini.ltd */ | ||||
| 
 | ||||
| package common | ||||
| 
 | ||||
| import ( | ||||
| 	"infini.sh/framework/core/elastic" | ||||
| 	"infini.sh/framework/core/util" | ||||
| ) | ||||
| 
 | ||||
| type TaskSetting struct { | ||||
| 	ClusterHealth *ClusterHealthTask `json:"cluster_health,omitempty"` | ||||
| 	ClusterStats *ClusterStatsTask `json:"cluster_stats,omitempty"` | ||||
| 	IndexStats *IndexStatsTask `json:"index_stats,omitempty"` | ||||
| 	NodeStats *NodeStatsTask `json:"node_stats,omitempty"` | ||||
| 	Logs *LogsTask `json:"logs,omitempty"` | ||||
| } | ||||
| 
 | ||||
| type ClusterHealthTask struct { | ||||
| 	Enabled bool `json:"enabled"` | ||||
| } | ||||
| 
 | ||||
| type ClusterStatsTask struct { | ||||
| 	Enabled bool `json:"enabled"` | ||||
| } | ||||
| 
 | ||||
| type IndexStatsTask struct { | ||||
| 	Enabled bool `json:"enabled"` | ||||
| } | ||||
| 
 | ||||
| type NodeStatsTask struct { | ||||
| 	Enabled bool `json:"enabled"` | ||||
| 	NodeIDs []string `json:"node_ids,omitempty"` | ||||
| } | ||||
| 
 | ||||
| type LogsTask struct { | ||||
| 	Enabled bool `json:"enabled"` | ||||
| } | ||||
| 
 | ||||
| type ParseAgentSettingsResult struct { | ||||
| 	ClusterConfigs []elastic.ElasticsearchConfig | ||||
| 	Pipelines []util.MapStr | ||||
| 	ToDeletePipelineNames []string | ||||
| } | ||||
|  | @ -0,0 +1,366 @@ | |||
| /* Copyright © INFINI Ltd. All rights reserved. | ||||
|  * Web: https://infinilabs.com
 | ||||
|  * Email: hello#infini.ltd */ | ||||
| 
 | ||||
| package common | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"github.com/buger/jsonparser" | ||||
| 	log "github.com/cihub/seelog" | ||||
| 	"infini.sh/framework/core/agent" | ||||
| 	"infini.sh/framework/core/elastic" | ||||
| 	"infini.sh/framework/core/event" | ||||
| 	"infini.sh/framework/core/host" | ||||
| 	"infini.sh/framework/core/kv" | ||||
| 	"infini.sh/framework/core/orm" | ||||
| 	"infini.sh/framework/core/util" | ||||
| 	"runtime" | ||||
| 	"runtime/debug" | ||||
| 	"sync" | ||||
| 	"time" | ||||
| ) | ||||
| 
 | ||||
| const ( | ||||
| 	StatusOnline string = "online" | ||||
| 	StatusOffline = "offline" | ||||
| ) | ||||
| 
 | ||||
| type StateManager struct { | ||||
| 	TTL           time.Duration // kv ttl
 | ||||
| 	KVKey         string | ||||
| 	stopC         chan struct{} | ||||
| 	stopCompleteC chan struct{} | ||||
| 	agentClient   *Client | ||||
| 	agentIds      map[string]string | ||||
| 	agentMutex    sync.Mutex | ||||
| 	workerChan    chan struct{} | ||||
| 	timestamps map[string]int64 | ||||
| } | ||||
| 
 | ||||
| func NewStateManager(TTL time.Duration, kvKey string, agentIds map[string]string) *StateManager { | ||||
| 	return &StateManager{ | ||||
| 		TTL:           TTL, | ||||
| 		KVKey:         kvKey, | ||||
| 		stopC:         make(chan struct{}), | ||||
| 		stopCompleteC: make(chan struct{}), | ||||
| 		agentClient:   &Client{}, | ||||
| 		agentIds:      agentIds, | ||||
| 		workerChan:    make(chan struct{}, runtime.NumCPU()), | ||||
| 		timestamps: map[string]int64{}, | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (sm *StateManager) checkAgentStatus() { | ||||
| 	onlineAgentIDs, err := GetLatestOnlineAgentIDs(nil, int(sm.TTL.Seconds())) | ||||
| 	if err != nil { | ||||
| 		log.Error(err) | ||||
| 		return | ||||
| 	} | ||||
| 	//add new agent to state
 | ||||
| 	sm.agentMutex.Lock() | ||||
| 	for agentID := range onlineAgentIDs { | ||||
| 		if _, ok := sm.agentIds[agentID]; !ok { | ||||
| 			log.Infof("status of agent [%s] changed to online", agentID) | ||||
| 			sm.agentIds[agentID] = StatusOnline | ||||
| 		} | ||||
| 	} | ||||
| 	sm.agentMutex.Unlock() | ||||
| 	for agentID, status := range sm.agentIds { | ||||
| 		if _, ok := onlineAgentIDs[agentID]; ok { | ||||
| 			sm.syncSettings(agentID) | ||||
| 			if status == StatusOnline { | ||||
| 				continue | ||||
| 			} | ||||
| 			// status change to online
 | ||||
| 			sm.agentIds[agentID] = StatusOnline | ||||
| 			log.Infof("status of agent [%s] changed to online", agentID) | ||||
| 			//set timestamp equals 0 to create pipeline
 | ||||
| 			sm.timestamps[agentID] = 0 | ||||
| 			continue | ||||
| 		}else{ | ||||
| 			// already offline
 | ||||
| 			if status == StatusOffline { | ||||
| 				continue | ||||
| 			} | ||||
| 		} | ||||
| 		// status change to offline
 | ||||
| 		sm.agentIds[agentID] = StatusOffline | ||||
| 		sm.workerChan <- struct{}{} | ||||
| 		go func(agentID string) { | ||||
| 			defer func() { | ||||
| 				if err := recover(); err != nil { | ||||
| 					log.Errorf("check agent [%s] status recover form panic error: %v", agentID, err) | ||||
| 					debug.PrintStack() | ||||
| 				} | ||||
| 				<-sm.workerChan | ||||
| 			}() | ||||
| 			ag, err := sm.GetAgent(agentID) | ||||
| 			if err != nil { | ||||
| 				log.Error(err) | ||||
| 				return | ||||
| 			} | ||||
| 			ag.Status = StatusOffline | ||||
| 			log.Infof("agent [%s] is offline", ag.Endpoint) | ||||
| 			_, err = sm.UpdateAgent(ag, true) | ||||
| 			if err != nil { | ||||
| 				log.Error(err) | ||||
| 				return | ||||
| 			} | ||||
| 			//update host agent status
 | ||||
| 			host.UpdateHostAgentStatus(ag.ID, StatusOffline) | ||||
| 		}(agentID) | ||||
| 
 | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (sm *StateManager) syncSettings(agentID string) { | ||||
| 	newTimestamp := time.Now().UnixMilli() | ||||
| 	settings, err := GetAgentSettings(agentID, sm.timestamps[agentID]) | ||||
| 	if err != nil { | ||||
| 		log.Errorf("query agent settings error: %v", err) | ||||
| 		return | ||||
| 	} | ||||
| 	if len(settings) == 0 { | ||||
| 		log.Debugf("got no settings of agent [%s]", agentID) | ||||
| 		return | ||||
| 	} | ||||
| 	parseResult, err := ParseAgentSettings(settings) | ||||
| 	if err != nil { | ||||
| 		log.Errorf("parse agent settings error: %v", err) | ||||
| 		return | ||||
| 	} | ||||
| 	ag, err := sm.GetAgent(agentID) | ||||
| 	if err != nil { | ||||
| 		log.Errorf("get agent error: %v", err) | ||||
| 		return | ||||
| 	} | ||||
| 	agClient := sm.GetAgentClient() | ||||
| 	if len(parseResult.ClusterConfigs) > 0 { | ||||
| 		err = agClient.RegisterElasticsearch(nil, ag.GetEndpoint(), parseResult.ClusterConfigs) | ||||
| 		if err != nil { | ||||
| 			log.Errorf("register elasticsearch config error: %v", err) | ||||
| 			return | ||||
| 		} | ||||
| 	} | ||||
| 	for _, pipelineID := range parseResult.ToDeletePipelineNames { | ||||
| 		err = agClient.DeletePipeline(context.Background(), ag.GetEndpoint(), pipelineID) | ||||
| 		if err != nil { | ||||
| 			log.Errorf("delete pipeline error: %v", err) | ||||
| 			continue | ||||
| 		} | ||||
| 	} | ||||
| 	for _, pipeline := range parseResult.Pipelines { | ||||
| 		err = agClient.CreatePipeline(context.Background(), ag.GetEndpoint(), util.MustToJSONBytes(pipeline)) | ||||
| 		if err != nil { | ||||
| 			log.Errorf("create pipeline error: %v", err) | ||||
| 			return | ||||
| 		} | ||||
| 	} | ||||
| 	sm.timestamps[agentID] = newTimestamp | ||||
| } | ||||
| 
 | ||||
| func (sm *StateManager) getAvailableAgent(clusterID string) (*agent.Instance, error) { | ||||
| 	agents, err := LoadAgentsFromES(clusterID) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	if len(agents) == 0 { | ||||
| 		return nil, nil | ||||
| 	} | ||||
| 	for _, ag := range agents { | ||||
| 		if ag.Status == "offline" { | ||||
| 			continue | ||||
| 		} | ||||
| 	} | ||||
| 	return nil, nil | ||||
| } | ||||
| 
 | ||||
| func (sm *StateManager) LoopState() { | ||||
| 	t := time.NewTicker(30 * time.Second) | ||||
| 	defer t.Stop() | ||||
| MAINLOOP: | ||||
| 	for { | ||||
| 		select { | ||||
| 		case <-sm.stopC: | ||||
| 			sm.stopCompleteC <- struct{}{} | ||||
| 			close(sm.workerChan) | ||||
| 			break MAINLOOP | ||||
| 		case <-t.C: | ||||
| 			sm.checkAgentStatus() | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (sm *StateManager) Stop() { | ||||
| 	sm.stopC <- struct{}{} | ||||
| 	<-sm.stopCompleteC | ||||
| } | ||||
| 
 | ||||
| func (sm *StateManager) GetAgent(ID string) (*agent.Instance, error) { | ||||
| 	buf, err := kv.GetValue(sm.KVKey, []byte(ID)) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	strTime, _ := jsonparser.GetString(buf, "timestamp") | ||||
| 	timestamp, _ := time.Parse(time.RFC3339, strTime) | ||||
| 	inst := &agent.Instance{} | ||||
| 	inst.ID = ID | ||||
| 	if time.Since(timestamp) > sm.TTL { | ||||
| 		exists, err := orm.Get(inst) | ||||
| 		if err != nil { | ||||
| 			return nil, fmt.Errorf("get agent [%s] error: %w", ID, err) | ||||
| 		} | ||||
| 		if !exists { | ||||
| 			return nil, fmt.Errorf("can not found agent [%s]", ID) | ||||
| 		} | ||||
| 		//inst.Timestamp = time.Now()
 | ||||
| 		err = kv.AddValue(sm.KVKey, []byte(ID), util.MustToJSONBytes(inst)) | ||||
| 		if err != nil { | ||||
| 			log.Errorf("save agent [%s] to kv error: %v", ID, err) | ||||
| 		} | ||||
| 		return inst, nil | ||||
| 	} | ||||
| 	err = util.FromJSONBytes(buf, inst) | ||||
| 	return inst, err | ||||
| } | ||||
| 
 | ||||
| func (sm *StateManager) UpdateAgent(inst *agent.Instance, syncToES bool) (*agent.Instance, error) { | ||||
| 	//inst.Timestamp = time.Now()
 | ||||
| 	err := kv.AddValue(sm.KVKey, []byte(inst.ID), util.MustToJSONBytes(inst)) | ||||
| 	if syncToES { | ||||
| 		ctx := orm.Context{ | ||||
| 			Refresh: "wait_for", | ||||
| 		} | ||||
| 		err = orm.Update(&ctx, inst) | ||||
| 		if err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 	} | ||||
| 	return inst, err | ||||
| } | ||||
| 
 | ||||
| func (sm *StateManager) GetTaskAgent(clusterID string) (*agent.Instance, error) { | ||||
| 	return nil, nil | ||||
| } | ||||
| 
 | ||||
| 
 | ||||
| func (sm *StateManager) DeleteAgent(agentID string) error { | ||||
| 	sm.agentMutex.Lock() | ||||
| 	delete(sm.agentIds, agentID) | ||||
| 	sm.agentMutex.Unlock() | ||||
| 	log.Infof("delete agent [%s] from state", agentID) | ||||
| 
 | ||||
| 	return kv.DeleteKey(sm.KVKey, []byte(agentID)) | ||||
| } | ||||
| 
 | ||||
| func (sm *StateManager) GetAgentClient() ClientAPI { | ||||
| 	return sm.agentClient | ||||
| } | ||||
| 
 | ||||
| func LoadAgentsFromES(clusterID string) ([]agent.Instance, error) { | ||||
| 	q := orm.Query{ | ||||
| 		Size: 1000, | ||||
| 	} | ||||
| 	if clusterID != "" { | ||||
| 		q.Conds = orm.And(orm.Eq("id", clusterID)) | ||||
| 	} | ||||
| 	err, result := orm.Search(agent.Instance{}, &q) | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf("query agent error: %w", err) | ||||
| 	} | ||||
| 
 | ||||
| 	if len(result.Result) > 0 { | ||||
| 		var agents = make([]agent.Instance, 0, len(result.Result)) | ||||
| 		for _, row := range result.Result { | ||||
| 			ag := agent.Instance{} | ||||
| 			bytes := util.MustToJSONBytes(row) | ||||
| 			err = util.FromJSONBytes(bytes, &ag) | ||||
| 			if err != nil { | ||||
| 				log.Errorf("got unexpected agent: %s, error: %v", string(bytes), err) | ||||
| 				continue | ||||
| 			} | ||||
| 			agents = append(agents, ag) | ||||
| 		} | ||||
| 		return agents, nil | ||||
| 	} | ||||
| 	return nil, nil | ||||
| } | ||||
| 
 | ||||
| func GetLatestOnlineAgentIDs(agentIds []string, lastSeconds int) (map[string]struct{}, error) { | ||||
| 	q := orm.Query{ | ||||
| 		WildcardIndex: true, | ||||
| 	} | ||||
| 	mustQ := []util.MapStr{ | ||||
| 		{ | ||||
| 			"term": util.MapStr{ | ||||
| 				"metadata.name": util.MapStr{ | ||||
| 					"value": "agent", | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			"term": util.MapStr{ | ||||
| 				"metadata.category": util.MapStr{ | ||||
| 					"value": "instance", | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
| 	if len(agentIds) > 0 { | ||||
| 		mustQ = append(mustQ, util.MapStr{ | ||||
| 			"terms": util.MapStr{ | ||||
| 				"agent.id": agentIds, | ||||
| 			}, | ||||
| 		}) | ||||
| 	} | ||||
| 	queryDSL := util.MapStr{ | ||||
| 		"_source": "agent.id", | ||||
| 		"sort": []util.MapStr{ | ||||
| 			{ | ||||
| 				"timestamp": util.MapStr{ | ||||
| 					"order": "desc", | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
| 		"collapse": util.MapStr{ | ||||
| 			"field": "agent.id", | ||||
| 		}, | ||||
| 		"query": util.MapStr{ | ||||
| 			"bool": util.MapStr{ | ||||
| 				"filter": []util.MapStr{ | ||||
| 					{ | ||||
| 						"range": util.MapStr{ | ||||
| 							"timestamp": util.MapStr{ | ||||
| 								"gte": fmt.Sprintf("now-%ds", lastSeconds), | ||||
| 							}, | ||||
| 						}, | ||||
| 					}, | ||||
| 				}, | ||||
| 				"must": mustQ, | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
| 	q.RawQuery = util.MustToJSONBytes(queryDSL) | ||||
| 	err, result := orm.Search(event.Event{}, &q) | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf("query agent instance metric error: %w", err) | ||||
| 	} | ||||
| 	agentIDs := map[string]struct{}{} | ||||
| 	if len(result.Result) > 0 { | ||||
| 		searchRes := elastic.SearchResponse{} | ||||
| 		err = util.FromJSONBytes(result.Raw, &searchRes) | ||||
| 		if err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 		agentIDKeyPath := []string{"agent", "id"} | ||||
| 		for _, hit := range searchRes.Hits.Hits { | ||||
| 			agentID, _ := util.GetMapValueByKeys(agentIDKeyPath, hit.Source) | ||||
| 			if v, ok := agentID.(string); ok { | ||||
| 				agentIDs[v] = struct{}{} | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 	return agentIDs, nil | ||||
| } | ||||
		Loading…
	
		Reference in New Issue