diff --git a/main.go b/main.go index 89ca7d52..2d382650 100644 --- a/main.go +++ b/main.go @@ -11,6 +11,7 @@ import ( "infini.sh/console/model" "infini.sh/console/model/alerting" "infini.sh/console/model/insight" + "infini.sh/console/modules/agent" _ "infini.sh/console/plugin" setup1 "infini.sh/console/plugin/setup" alerting2 "infini.sh/console/service/alerting" @@ -22,7 +23,6 @@ import ( "infini.sh/framework/core/module" "infini.sh/framework/core/orm" task1 "infini.sh/framework/core/task" - "infini.sh/framework/modules/agent" _ "infini.sh/framework/modules/api" elastic2 "infini.sh/framework/modules/elastic" "infini.sh/framework/modules/metrics" diff --git a/modules/agent/agent.go b/modules/agent/agent.go new file mode 100644 index 00000000..aa6e3f93 --- /dev/null +++ b/modules/agent/agent.go @@ -0,0 +1,80 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package agent + +import ( + log "github.com/cihub/seelog" + "infini.sh/console/modules/agent/api" + "infini.sh/console/modules/agent/common" + "infini.sh/framework/core/agent" + "infini.sh/framework/core/env" + "infini.sh/framework/core/host" + "infini.sh/framework/core/orm" + "time" +) + +func (module *AgentModule) Name() string { + return "agent" +} + +func (module *AgentModule) Setup() { + module.AgentConfig.Enabled = true + module.AgentConfig.StateManager.Enabled = true + exists, err := env.ParseConfig("agent", &module.AgentConfig) + if exists && err != nil { + panic(err) + } + if module.AgentConfig.Enabled { + api.Init() + } +} +func (module *AgentModule) Start() error { + if !module.AgentConfig.Enabled { + return nil + } + orm.RegisterSchemaWithIndexName(agent.Instance{}, "agent") + orm.RegisterSchemaWithIndexName(agent.ESNodeInfo{}, "agent-node") + orm.RegisterSchemaWithIndexName(host.HostInfo{}, "host") + orm.RegisterSchemaWithIndexName(agent.Setting{}, "setting") + common.RegisterClient(&common.Client{}) + + if module.AgentConfig.StateManager.Enabled { + onlineAgentIDs, err := common.GetLatestOnlineAgentIDs(nil, 60) + if err != nil { + log.Error(err) + } + agents, err := common.LoadAgentsFromES("") + if err != nil { + log.Error(err) + } + agentIds := map[string]string{} + for _, ag := range agents { + if _, ok := onlineAgentIDs[ag.ID]; ok { + agentIds[ag.ID] = "online" + } + } + + sm := common.NewStateManager(time.Second*30, "agent_state", agentIds) + common.RegisterStateManager(sm) + go sm.LoopState() + } + return nil +} + +func (module *AgentModule) Stop() error { + if !module.AgentConfig.Enabled { + return nil + } + log.Info("start to stop agent module") + if module.AgentConfig.StateManager.Enabled { + common.GetStateManager().Stop() + } + log.Info("agent module was stopped") + return nil +} + +type AgentModule struct { + common.AgentConfig +} diff --git a/modules/agent/api/host.go b/modules/agent/api/host.go new file mode 100644 index 00000000..609cd938 --- /dev/null +++ b/modules/agent/api/host.go @@ -0,0 +1,222 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package api + +import ( + "context" + "fmt" + common2 "infini.sh/console/modules/agent/common" + httprouter "infini.sh/framework/core/api/router" + "infini.sh/framework/core/host" + "infini.sh/framework/core/orm" + "infini.sh/framework/core/util" + "net/http" + log "github.com/cihub/seelog" + "time" +) + +func (h *APIHandler) enrollHost(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + var reqBody []struct { + AgentID string `json:"agent_id"` + HostName string `json:"host_name"` + IP string `json:"ip"` + Source string `json:"source"` + OSName string `json:"os_name"` + OSArch string `json:"os_arch"` + NodeID string `json:"node_uuid"` + } + err := h.DecodeJSON(req, &reqBody) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + errors := util.MapStr{} + for _, hi := range reqBody { + var ( + hostInfo *host.HostInfo + ) + switch hi.Source { + case "agent": + hostInfo, err = enrollHostFromAgent(hi.AgentID) + if err != nil { + errors[hi.IP] = util.MapStr{ + "error": err.Error(), + } + log.Error(err) + continue + } + hostInfo.IP = hi.IP + hostInfo.AgentID = hi.AgentID + err = orm.Create(nil, hostInfo) + if err != nil { + errors[hi.IP] = util.MapStr{ + "error": err.Error(), + } + log.Error(err) + continue + } + case "es_node": + hostInfo = &host.HostInfo{ + IP: hi.IP, + OSInfo: host.OS{ + Platform: hi.OSName, + KernelArch: hi.OSArch, + }, + NodeID: hi.NodeID, + } + default: + errors[hi.IP] = util.MapStr{ + "error": fmt.Errorf("unkonow source type"), + } + continue + } + hostInfo.Timestamp = time.Now() + err = orm.Create(nil, hostInfo) + if err != nil { + errors[hi.IP] = util.MapStr{ + "error": err.Error(), + } + log.Error(err) + continue + } + if hi.Source == "agent" { + sm := common2.GetStateManager() + ag, _ := sm.GetAgent(hostInfo.AgentID) + err = sm.GetAgentClient().DiscoveredHost(nil, ag.GetEndpoint(), util.MapStr{ + "host_id": hostInfo.ID, + }) + if err != nil { + log.Error(err) + } + } + + } + resBody := util.MapStr{ + "success": true, + } + if len(errors) > 0 { + resBody["errors"] = errors + resBody["success"] = false + } + + h.WriteJSON(w, resBody, http.StatusOK) +} + +func (h *APIHandler) GetHostAgentInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + hostID := ps.MustGetParameter("host_id") + hostInfo, err := getHost(hostID) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + if hostInfo.AgentID == "" { + h.WriteJSON(w, util.MapStr{}, http.StatusOK) + return + } + + sm := common2.GetStateManager() + ag, err := sm.GetAgent(hostInfo.AgentID) + if err != nil { + log.Error(err) + h.WriteJSON(w, util.MapStr{}, http.StatusOK) + return + } + aversion, err := ag.GetVersion() + if err == nil { + ag.Version = aversion + orm.Save(nil, ag) + } + h.WriteJSON(w, util.MapStr{ + "host_id": hostID, + "agent_id": ag.ID, + "version": ag.Version, + "status": ag.Status, + "endpoint": ag.GetEndpoint(), + }, http.StatusOK) +} + +func getHost(hostID string) (*host.HostInfo, error){ + hostInfo := &host.HostInfo{} + hostInfo.ID = hostID + exists, err := orm.Get(hostInfo) + if err != nil { + return nil, fmt.Errorf("get host info error: %w", err) + } + if !exists { + return nil, fmt.Errorf("host [%s] not found", hostID) + } + return hostInfo, nil +} + +func (h *APIHandler) GetHostElasticProcess(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + hostID := ps.MustGetParameter("host_id") + hostInfo := &host.HostInfo{} + hostInfo.ID = hostID + exists, err := orm.Get(hostInfo) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + if !exists { + h.WriteError(w, fmt.Sprintf("host [%s] not found", hostID), http.StatusNotFound) + return + } + if hostInfo.AgentID == "" { + h.WriteJSON(w, util.MapStr{}, http.StatusOK) + return + } + sm := common2.GetStateManager() + ag, err := sm.GetAgent(hostInfo.AgentID) + if err != nil { + log.Error(err) + h.WriteJSON(w, util.MapStr{}, http.StatusOK) + return + } + ctx,cancel := context.WithTimeout(context.Background(), time.Second * 10) + defer cancel() + esNodesInfo, err := sm.GetAgentClient().GetElasticsearchNodes(ctx, ag.GetEndpoint()) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + var processes []util.MapStr + for _, node := range esNodesInfo { + processes = append(processes, util.MapStr{ + "pid": node.ProcessInfo.PID, + "pid_status": node.ProcessInfo.Status, + "cluster_name": node.ClusterName, + "cluster_uuid": node.ClusterUuid, + "cluster_id": node.ClusterID, + "node_id": node.NodeUUID, + "node_name": node.NodeName, + "uptime_in_ms": time.Now().UnixMilli() - node.ProcessInfo.CreateTime, + }) + } + h.WriteJSON(w, util.MapStr{ + "elastic_processes": processes, + }, http.StatusOK) +} + +func enrollHostFromAgent(agentID string) (*host.HostInfo, error){ + sm := common2.GetStateManager() + ag, err := sm.GetAgent(agentID) + if err != nil { + return nil, err + } + if ag == nil { + return nil, fmt.Errorf("can not found agent [%s]", agentID) + } + agentClient := sm.GetAgentClient() + hostInfo, err := agentClient.GetHostInfo(nil, ag.GetEndpoint()) + if err != nil { + return nil, err + } + hostInfo.AgentStatus = ag.Status + return hostInfo, nil +} \ No newline at end of file diff --git a/modules/agent/api/init.go b/modules/agent/api/init.go new file mode 100644 index 00000000..84fe0c9a --- /dev/null +++ b/modules/agent/api/init.go @@ -0,0 +1,28 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package api + +import ( + "infini.sh/framework/core/api" + "infini.sh/framework/core/api/rbac/enum" +) + +func Init() { + handler := APIHandler{} + api.HandleAPIMethod(api.POST, "/agent/instance", handler.createInstance) + api.HandleAPIMethod(api.GET, "/agent/instance/_search", handler.RequirePermission(handler.searchInstance, enum.PermissionAgentInstanceRead)) + api.HandleAPIMethod(api.GET, "/agent/instance/:instance_id", handler.getInstance) + api.HandleAPIMethod(api.DELETE, "/agent/instance/:instance_id", handler.RequirePermission(handler.deleteInstance, enum.PermissionAgentInstanceWrite)) + api.HandleAPIMethod(api.POST, "/agent/instance/_stats", handler.RequirePermission(handler.getInstanceStats, enum.PermissionAgentInstanceRead)) + api.HandleAPIMethod(api.GET, "/agent/log/node/:node_id/files", handler.RequirePermission(handler.getLogFilesByNode, enum.PermissionAgentInstanceRead)) + api.HandleAPIMethod(api.POST, "/agent/log/node/:node_id/_scroll", handler.RequirePermission(handler.getLogFileContent, enum.PermissionAgentInstanceRead)) + api.HandleAPIMethod(api.GET, "/agent/instance/:instance_id/_nodes", handler.RequirePermission(handler.getESNodesInfo, enum.PermissionAgentInstanceRead)) + api.HandleAPIMethod(api.POST, "/agent/instance/:instance_id/_nodes/_refresh", handler.RequirePermission(handler.refreshESNodesInfo, enum.PermissionAgentInstanceWrite)) + api.HandleAPIMethod(api.POST, "/agent/instance/:instance_id/node/_auth", handler.RequirePermission(handler.authESNode, enum.PermissionAgentInstanceWrite)) + + api.HandleAPIMethod(api.POST, "/host/_enroll", handler.enrollHost) + api.HandleAPIMethod(api.GET, "/host/:host_id/agent/info",handler.GetHostAgentInfo) + api.HandleAPIMethod(api.GET, "/host/:host_id/processes",handler.GetHostElasticProcess) +} diff --git a/modules/agent/api/instance.go b/modules/agent/api/instance.go new file mode 100644 index 00000000..ea6ad323 --- /dev/null +++ b/modules/agent/api/instance.go @@ -0,0 +1,716 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package api + +import ( + "context" + "fmt" + log "github.com/cihub/seelog" + "infini.sh/framework/core/agent" + "infini.sh/framework/core/api" + httprouter "infini.sh/framework/core/api/router" + "infini.sh/framework/core/elastic" + "infini.sh/framework/core/event" + "infini.sh/framework/core/host" + "infini.sh/framework/core/orm" + "infini.sh/framework/core/util" + common2 "infini.sh/console/modules/agent/common" + elastic2 "infini.sh/framework/modules/elastic" + "infini.sh/framework/modules/elastic/common" + "net/http" + "strconv" +) + +type APIHandler struct { + api.Handler +} + +func (h *APIHandler) createInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + var obj = &agent.Instance{} + err := h.DecodeJSON(req, obj) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + + oldInst := &agent.Instance{} + oldInst.ID = obj.ID + exists, err := orm.Get(oldInst) + + if err != nil && err != elastic2.ErrNotFound { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + if exists { + errMsg := fmt.Sprintf("agent [%s] already exists", obj.ID) + h.WriteError(w, errMsg, http.StatusInternalServerError) + log.Error(errMsg) + return + } + //fetch more information of agent instance + res, err := common2.GetClient().GetInstanceBasicInfo(context.Background(), obj.GetEndpoint()) + if err != nil { + errStr := fmt.Sprintf("get agent instance basic info error: %s", err.Error()) + h.WriteError(w,errStr , http.StatusInternalServerError) + log.Error(errStr) + return + } + if res.ID == "" { + errStr :=fmt.Sprintf("got unexpected response of agent instance basic info: %s", util.MustToJSON(res)) + h.WriteError(w, errStr , http.StatusInternalServerError) + log.Error(errStr) + return + }else{ + obj.ID = res.ID + obj.Version = res.Version + obj.MajorIP = res.MajorIP + obj.Host = res.Host + obj.IPS = res.IPS + } + + err = orm.Create(nil, obj) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + _, err = refreshNodesInfo(obj) + if err != nil { + log.Error(err) + } + + h.WriteCreatedOKJSON(w, obj.ID) + +} + +func bindAgentToHostByIP(ag *agent.Instance) error{ + err, result := orm.GetBy("ip", ag.MajorIP, host.HostInfo{}) + if err != nil { + return err + } + if len(result.Result) > 0 { + buf := util.MustToJSONBytes(result.Result[0]) + hostInfo := &host.HostInfo{} + err = util.FromJSONBytes(buf, hostInfo) + if err != nil { + return err + } + sm := common2.GetStateManager() + if ag.Status == "" { + _, err1 := sm.GetAgentClient().GetHostInfo(nil, ag.GetEndpoint()) + if err1 == nil { + ag.Status = "online" + }else{ + ag.Status = "offline" + } + } + + hostInfo.AgentStatus = ag.Status + hostInfo.AgentID = ag.ID + err = orm.Update(nil, hostInfo) + if err != nil { + return err + } + + err = sm.GetAgentClient().DiscoveredHost(nil, ag.GetEndpoint(), util.MapStr{ + "host_id": hostInfo.ID, + }) + if err != nil { + return err + } + } + return nil +} + +func (h *APIHandler) getInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + id := ps.MustGetParameter("instance_id") + + obj := agent.Instance{} + obj.ID = id + + exists, err := orm.Get(&obj) + if !exists || err != nil { + h.WriteJSON(w, util.MapStr{ + "_id": id, + "found": false, + }, http.StatusNotFound) + return + } + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + + h.WriteJSON(w, util.MapStr{ + "found": true, + "_id": id, + "_source": obj, + }, 200) +} + +func (h *APIHandler) deleteInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + id := ps.MustGetParameter("instance_id") + + obj := agent.Instance{} + obj.ID = id + + exists, err := orm.Get(&obj) + if !exists || err != nil { + h.WriteJSON(w, util.MapStr{ + "_id": id, + "result": "not_found", + }, http.StatusNotFound) + return + } + + err = orm.Delete(nil, &obj) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + if sm := common2.GetStateManager(); sm != nil { + sm.DeleteAgent(obj.ID) + } + + h.WriteJSON(w, util.MapStr{ + "_id": obj.ID, + "result": "deleted", + }, 200) +} + +func (h *APIHandler) getInstanceStats(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + var instanceIDs = []string{} + err := h.DecodeJSON(req, &instanceIDs) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + if len(instanceIDs) == 0 { + h.WriteJSON(w, util.MapStr{}, http.StatusOK) + return + } + q := orm.Query{ + WildcardIndex: true, + } + queryDSL := util.MapStr{ + "sort": []util.MapStr{ + { + "timestamp": util.MapStr{ + "order": "desc", + }, + }, + }, + "collapse": util.MapStr{ + "field": "agent.id", + }, + "query": util.MapStr{ + "bool": util.MapStr{ + "filter": []util.MapStr{ + { + "range": util.MapStr{ + "timestamp": util.MapStr{ + "gte": "now-1m", + }, + }, + }, + }, + "must": []util.MapStr{ + { + "term": util.MapStr{ + "metadata.name": util.MapStr{ + "value": "agent", + }, + }, + }, { + "terms": util.MapStr{ + "agent.id": instanceIDs, + }, + }, + }, + }, + }, + } + q.RawQuery = util.MustToJSONBytes(queryDSL) + + err, res := orm.Search(event.Event{}, &q) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + result := util.MapStr{} + for _, item := range res.Result { + if itemV, ok := item.(map[string]interface{}); ok { + if agentID, ok := util.GetMapValueByKeys([]string{"agent", "id"}, itemV); ok { + if v, ok := agentID.(string); ok { + if ab, ok := util.GetMapValueByKeys([]string{"payload","instance", "system"}, itemV); ok{ + if abV, ok := ab.(map[string]interface{}); ok { + result[v] = util.MapStr{ + "timestamp": itemV["timestamp"], + "system": util.MapStr{ + "cpu": abV["cpu"], + "mem": abV["mem"], + "uptime_in_ms": abV["uptime_in_ms"], + "status": "online", + }, + } + } + } + } + } + } + + } + h.WriteJSON(w, result, http.StatusOK) +} + + +func (h *APIHandler) searchInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + + var ( + keyword = h.GetParameterOrDefault(req, "keyword", "") + //queryDSL = `{"query":{"bool":{"must":[%s]}}, "size": %d, "from": %d}` + strSize = h.GetParameterOrDefault(req, "size", "20") + strFrom = h.GetParameterOrDefault(req, "from", "0") + ) + + var ( + mustQ []interface{} + ) + + if keyword != "" { + mustQ = append(mustQ, util.MapStr{ + "query_string": util.MapStr{ + "default_field": "*", + "query": keyword, + }, + }) + } + size, _ := strconv.Atoi(strSize) + if size <= 0 { + size = 20 + } + from, _ := strconv.Atoi(strFrom) + if from < 0 { + from = 0 + } + + queryDSL := util.MapStr{ + "size": size, + "from": from, + } + if len(mustQ) > 0 { + queryDSL["query"] = util.MapStr{ + "bool": util.MapStr{ + "must": mustQ, + }, + } + } + + q := orm.Query{} + q.RawQuery = util.MustToJSONBytes(queryDSL) + + err, res := orm.Search(&agent.Instance{}, &q) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + h.Write(w, res.Raw) +} + +func (h *APIHandler) getESNodesInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + id := ps.MustGetParameter("instance_id") + obj := agent.Instance{} + obj.ID = id + exists, err := orm.Get(&obj) + if !exists || err != nil { + h.WriteJSON(w, util.MapStr{ + "_id": id, + "found": false, + }, http.StatusNotFound) + return + } + nodesM, err := getNodesInfoFromES(obj.ID) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + var nodes []*agent.ESNodeInfo + for _, node := range nodesM { + nodes = append(nodes, node) + } + h.WriteJSON(w, nodes, http.StatusOK) +} + +func (h *APIHandler) refreshESNodesInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ + id := ps.MustGetParameter("instance_id") + obj := agent.Instance{} + obj.ID = id + exists, err := orm.Get(&obj) + if !exists || err != nil { + h.WriteJSON(w, util.MapStr{ + "_id": id, + "found": false, + }, http.StatusNotFound) + return + } + _, err = refreshNodesInfo(&obj) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + h.WriteAckOKJSON(w) +} + +func (h *APIHandler) authESNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + id := ps.MustGetParameter("instance_id") + inst := agent.Instance{} + inst.ID = id + exists, err := orm.Get(&inst) + if !exists || err != nil { + h.WriteJSON(w, util.MapStr{ + "_id": id, + "found": false, + }, http.StatusNotFound) + return + } + reqBody := struct { + NodeID string `json:"node_id"` + ESConfig *elastic.ElasticsearchConfig `json:"es_config"` + }{} + err = h.DecodeJSON(req, &reqBody) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + oldNodeInfo := &agent.ESNodeInfo{ + ID: reqBody.NodeID, + } + exists, err = orm.Get(oldNodeInfo) + if !exists || err != nil { + h.WriteJSON(w, fmt.Sprintf("node [%s] of agent [%s] was not found", oldNodeInfo.ID, inst.Name), http.StatusInternalServerError) + return + } + + cfg := reqBody.ESConfig + if cfg.Endpoint == "" { + cfg.Endpoint = fmt.Sprintf("%s://%s", cfg.Schema, cfg.Host) + } + basicAuth, err := common.GetBasicAuth(cfg) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + cfg.BasicAuth = &basicAuth + nodeInfo, err := common2.GetClient().AuthESNode(context.Background(), inst.GetEndpoint(), *cfg) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + nodeInfo.ID = oldNodeInfo.ID + nodeInfo.AgentID = inst.ID + clusterCfgs := getClusterConfigs() + if nodeInfo.ClusterUuid != "" && clusterCfgs[nodeInfo.ClusterUuid] != nil { + nodeInfo.ClusterID = clusterCfgs[nodeInfo.ClusterUuid].ID + settings, err := common2.GetAgentSettings(inst.ID, 0) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + setting := pickAgentSettings(settings, *nodeInfo) + if setting == nil { + setting, err = getAgentTaskSetting(inst.ID, *nodeInfo) + if err != nil { + log.Error("get agent task setting error: ", err) + } + err = orm.Create(nil, setting) + if err != nil { + log.Error("save agent task setting error: ", err) + } + } + } + err = orm.Save(nil, nodeInfo) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + h.WriteJSON(w, nodeInfo, http.StatusOK) +} + +func refreshNodesInfo(inst *agent.Instance) ([]agent.ESNodeInfo, error) { + nodesInfo, err := common2.GetClient().GetElasticsearchNodes(context.Background(), inst.GetEndpoint()) + if err != nil { + return nil, fmt.Errorf("get elasticsearch nodes error: %w", err) + } + oldNodesInfo, err := getNodesInfoFromES(inst.ID) + if err != nil { + return nil, fmt.Errorf("get elasticsearch nodes info from es error: %w", err) + } + clusterCfgs := getClusterConfigs() + oldPids := map[int]struct{}{} + var resultNodes []agent.ESNodeInfo + settings, err := common2.GetAgentSettings(inst.ID, 0) + if err != nil { + return nil, err + } + for _, node := range nodesInfo { + oldNode := getNodeByPidOrUUID(oldNodesInfo, node.ProcessInfo.PID, node.NodeUUID) + node.AgentID = inst.ID + if oldNode != nil { + node.ID = oldNode.ID + //keep old validate info + if node.ClusterUuid == "" && oldNode.ClusterUuid != "" { + node = *oldNode + } + oldPids[oldNode.ProcessInfo.PID] = struct{}{} + }else{ + node.ID = util.GetUUID() + } + if node.ClusterUuid != "" { + if oldNode != nil && oldNode.ClusterID != "" { + node.ClusterID = oldNode.ClusterID + }else{ + if cfg := clusterCfgs[node.ClusterUuid]; cfg != nil { + node.ClusterID = cfg.ID + setting := pickAgentSettings(settings, node) + if setting == nil { + setting, err = getAgentTaskSetting(inst.ID, node) + if err != nil { + log.Error() + } + err = orm.Create(nil, setting) + if err != nil { + log.Error("save agent task setting error: ", err) + } + } + }else{ + //cluster not registered in console + } + } + } + + node.Status = "online" + err = orm.Save(nil, node) + if err != nil { + log.Error("save node info error: ", err) + } + resultNodes = append(resultNodes, node) + } + for k, node := range oldNodesInfo { + if _, ok := oldPids[k]; !ok { + //auto delete not associated cluster + if node.ClusterID == "" { + log.Info("delete node with pid: ", node.ProcessInfo.PID) + err = orm.Delete(nil, node) + if err != nil { + log.Error("delete node info error: ", err) + } + continue + } + node.Status = "offline" + err = orm.Save(nil, node) + if err != nil { + log.Error("save node info error: ", err) + } + resultNodes = append(resultNodes, *node) + } + } + return resultNodes, nil +} + +func getNodeByPidOrUUID(nodes map[int]*agent.ESNodeInfo, pid int, uuid string) *agent.ESNodeInfo{ + if nodes[pid] != nil { + return nodes[pid] + } + for _, node := range nodes { + if node.NodeUUID != "" && node.NodeUUID == uuid { + return node + } + } + return nil +} + +func getNodesInfoFromES(agentID string) (map[int]*agent.ESNodeInfo, error){ + query := util.MapStr{ + "size": 100, + "query": util.MapStr{ + "term": util.MapStr{ + "agent_id": util.MapStr{ + "value": agentID, + }, + }, + }, + } + q := orm.Query{ + RawQuery: util.MustToJSONBytes(query), + } + + err, result := orm.Search(agent.ESNodeInfo{}, &q) + if err != nil { + return nil, err + } + nodesInfo := map[int]*agent.ESNodeInfo{} + for _, row := range result.Result { + node := agent.ESNodeInfo{} + buf := util.MustToJSONBytes(row) + util.MustFromJSONBytes(buf, &node) + nodesInfo[node.ProcessInfo.PID] = &node + } + return nodesInfo, nil +} + +func getClusterConfigs() map[string]*elastic.ElasticsearchConfig { + cfgs := map[string]*elastic.ElasticsearchConfig{} + elastic.WalkConfigs(func(key, value interface{}) bool { + if cfg, ok := value.(*elastic.ElasticsearchConfig); ok { + //todo handle clusterUUID is empty + cfgs[cfg.ClusterUUID] = cfg + } + return true + }) + return cfgs +} + +func pickAgentSettings(settings []agent.Setting, nodeInfo agent.ESNodeInfo) *agent.Setting { + for _, setting := range settings { + if setting.Metadata.Labels["node_uuid"] == nodeInfo.NodeUUID { + return &setting + } + } + return nil +} + +func getAgentTaskSetting(agentID string, node agent.ESNodeInfo) (*agent.Setting, error){ + taskSetting, err := getSettingsByClusterID(node.ClusterID) + if err != nil { + return nil, err + } + return &agent.Setting{ + Metadata: agent.SettingsMetadata{ + Category: "agent", + Name: "task", + Labels: util.MapStr{ + "agent_id": agentID, + "cluster_uuid": node.ClusterUuid, + "cluster_id": node.ClusterID, + "node_uuid": node.NodeUUID, + "endpoint": fmt.Sprintf("%s://%s", node.Schema, node.PublishAddress), + }, + }, + Payload: util.MapStr{ + "task": taskSetting, + }, + }, nil +} + +// getSettingsByClusterID query agent task settings with cluster id +func getSettingsByClusterID(clusterID string) (*common2.TaskSetting, error) { + queryDsl := util.MapStr{ + "size": 200, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "metadata.labels.cluster_id": util.MapStr{ + "value": clusterID, + }, + }, + }, + }, + "should": []util.MapStr{ + { + "term": util.MapStr{ + "payload.task.cluster_health": util.MapStr{ + "value": true, + }, + }, + }, + { + "term": util.MapStr{ + "payload.task.cluster_stats": util.MapStr{ + "value": true, + }, + }, + }, + { + "term": util.MapStr{ + "payload.task.index_stats": util.MapStr{ + "value": true, + }, + }, + }, + }, + }, + }, + } + q := orm.Query{ + RawQuery: util.MustToJSONBytes(queryDsl), + } + err, result := orm.Search(agent.Setting{}, &q) + if err != nil { + return nil, err + } + + setting := &common2.TaskSetting{ + NodeStats: &common2.NodeStatsTask{ + Enabled: true, + }, + } + var ( + clusterStats = true + indexStats = true + clusterHealth = true + ) + keys := []string{"payload.task.cluster_stats", "payload.task.cluster_health", "payload.task.index_stats"} + for _, row := range result.Result { + if v, ok := row.(map[string]interface{}); ok { + vm := util.MapStr(v) + for _, key := range keys { + tv, _ := vm.GetValue(key) + if tv == true { + switch key { + case "payload.task.cluster_stats": + clusterStats = false + case "payload.task.index_stats": + indexStats = false + case "payload.task.cluster_health": + clusterHealth = false + } + } + } + } + } + if clusterStats { + setting.ClusterStats = &common2.ClusterStatsTask{ + Enabled: true, + } + } + if indexStats { + setting.IndexStats = &common2.IndexStatsTask{ + Enabled: true, + } + } + if clusterHealth { + setting.ClusterHealth = &common2.ClusterHealthTask{ + Enabled: true, + } + } + return setting, nil +} \ No newline at end of file diff --git a/modules/agent/api/log.go b/modules/agent/api/log.go new file mode 100644 index 00000000..d564f521 --- /dev/null +++ b/modules/agent/api/log.go @@ -0,0 +1,115 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package api + +import ( + "fmt" + log "github.com/cihub/seelog" + "infini.sh/console/modules/agent/common" + "infini.sh/framework/core/agent" + httprouter "infini.sh/framework/core/api/router" + "infini.sh/framework/core/orm" + "infini.sh/framework/core/util" + "net/http" +) + +func (h *APIHandler) getLogFilesByNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + nodeID := ps.MustGetParameter("node_id") + inst, node, err := getAgentByNodeID(nodeID) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + if inst == nil { + log.Error(fmt.Sprintf("can not find agent by node [%s]", nodeID)) + h.WriteJSON(w, util.MapStr{ + "success": false, + "reason": "AGENT_NOT_FOUND", + }, http.StatusOK) + return + } + logFiles, err := common.GetClient().GetElasticLogFiles(nil, inst.GetEndpoint(), node.Path.Logs) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + h.WriteJSON(w, util.MapStr{ + "success": true, + "log_files": logFiles, + }, http.StatusOK) +} + +func (h *APIHandler) getLogFileContent(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + nodeID := ps.MustGetParameter("node_id") + inst, node, err := getAgentByNodeID(nodeID) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + if inst == nil { + h.WriteError(w, fmt.Sprintf("can not find agent by node [%s]", nodeID), http.StatusInternalServerError) + return + } + reqBody := struct { + FileName string `json:"file_name"` + LogsPath string `json:"logs_path"` + Offset int `json:"offset"` + Lines int `json:"lines"` + }{} + err = h.DecodeJSON(req, &reqBody) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + reqBody.LogsPath = node.Path.Logs + sm := common.GetStateManager() + res, err := sm.GetAgentClient().GetElasticLogFileContent(nil, inst.GetEndpoint(), reqBody) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + h.WriteJSON(w, res, http.StatusOK) +} + +func getAgentByNodeID(nodeID string) (*agent.Instance, *agent.ESNodeInfo, error){ + queryDsl := util.MapStr{ + "size":1, + "query": util.MapStr{ + "term": util.MapStr{ + "node_uuid": util.MapStr{ + "value": nodeID, + }, + }, + }, + } + q := &orm.Query{ + RawQuery: util.MustToJSONBytes(queryDsl), + } + err, result := orm.Search(agent.ESNodeInfo{}, q) + if err != nil { + return nil,nil, err + } + if len(result.Result) > 0 { + buf := util.MustToJSONBytes(result.Result[0]) + node := &agent.ESNodeInfo{} + err = util.FromJSONBytes(buf, node) + inst := &agent.Instance{} + inst.ID = node.AgentID + _, err = orm.Get(inst) + if err != nil { + return nil, node, err + } + if inst.Name == "" { + return nil, node, nil + } + return inst, node, nil + } + return nil, nil, nil +} diff --git a/modules/agent/common/client.go b/modules/agent/common/client.go new file mode 100644 index 00000000..c2f7e6c3 --- /dev/null +++ b/modules/agent/common/client.go @@ -0,0 +1,222 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package common + +import ( + "context" + "fmt" + "infini.sh/framework/core/agent" + "infini.sh/framework/core/elastic" + "infini.sh/framework/core/host" + "infini.sh/framework/core/util" + "net/http" +) + +type Client struct { +} + +func (client *Client) GetHostInfo(ctx context.Context, agentBaseURL string) (*host.HostInfo, error) { + req := &util.Request{ + Method: http.MethodGet, + Url: fmt.Sprintf("%s/agent/host/_basic", agentBaseURL), + Context: ctx, + } + resBody := struct { + Success bool `json:"success"` + Error string `json:"error"` + HostInfo *host.HostInfo `json:"result"` + }{} + err := client.doRequest(req, &resBody) + if err != nil { + return nil, err + } + if resBody.Success != true { + return nil, fmt.Errorf("enroll error from client: %v", resBody.Error) + } + return resBody.HostInfo, nil +} + +func (client *Client) DiscoveredHost(ctx context.Context, agentBaseURL string, body interface{}) error { + req := &util.Request{ + Method: http.MethodPut, + Url: fmt.Sprintf("%s/host/discover", agentBaseURL), + Context: ctx, + } + reqBody, err := util.ToJSONBytes(body) + if err != nil { + return err + } + req.Body = reqBody + resBody := map[string]interface{}{} + err = client.doRequest(req, &resBody) + if err != nil { + return err + } + if resBody["success"] != true { + return fmt.Errorf("discover host callback error: %v", resBody["error"]) + } + return nil +} + +func (client *Client) GetElasticProcess(ctx context.Context, agentBaseURL string, agentID string)(interface{}, error) { + req := &util.Request{ + Method: http.MethodGet, + Url: fmt.Sprintf("%s/elasticsearch/%s/process/_elastic", agentBaseURL, agentID), + Context: ctx, + } + resBody := map[string]interface{}{} + err := client.doRequest(req, &resBody) + if err != nil { + return nil, err + } + if resBody["success"] != true { + return nil, fmt.Errorf("discover host callback error: %v", resBody["error"]) + } + return resBody["elastic_process"], nil +} + +func (client *Client) GetElasticLogFiles(ctx context.Context, agentBaseURL string, logsPath string)(interface{}, error) { + reqBody := util.MustToJSONBytes(util.MapStr{ + "logs_path": logsPath, + }) + req := &util.Request{ + Method: http.MethodPost, + Url: fmt.Sprintf("%s/agent/logs/elastic/list", agentBaseURL), + Context: ctx, + Body: reqBody, + } + resBody := map[string]interface{}{} + err := client.doRequest(req, &resBody) + if err != nil { + return nil, err + } + if resBody["success"] != true { + return nil, fmt.Errorf("get elasticsearch log files error: %v", resBody["error"]) + } + return resBody["result"], nil +} + +func (client *Client) GetElasticLogFileContent(ctx context.Context, agentBaseURL string, body interface{})(interface{}, error) { + req := &util.Request{ + Method: http.MethodPost, + Url: fmt.Sprintf("%s/agent/logs/elastic/_read", agentBaseURL), + Context: ctx, + Body: util.MustToJSONBytes(body), + } + resBody := map[string]interface{}{} + err := client.doRequest(req, &resBody) + if err != nil { + return nil, err + } + if resBody["success"] != true { + return nil, fmt.Errorf("get elasticsearch log files error: %v", resBody["error"]) + } + var hasMore bool + if v, ok := resBody["EOF"].(bool); ok && !v { + hasMore = true + } + return map[string]interface{}{ + "lines": resBody["result"], + "has_more": hasMore, + } , nil +} + +func (client *Client) GetInstanceBasicInfo(ctx context.Context, agentBaseURL string) (*agent.Instance, error){ + req := &util.Request{ + Method: http.MethodGet, + Url: fmt.Sprintf("%s/agent/_info", agentBaseURL ), + Context: ctx, + } + resBody := &agent.Instance{} + err := client.doRequest(req, &resBody) + return resBody, err +} + +func (client *Client) RegisterElasticsearch(ctx context.Context, agentBaseURL string, cfgs []elastic.ElasticsearchConfig) error { + reqBody, err := util.ToJSONBytes(cfgs) + if err != nil { + return err + } + req := &util.Request{ + Method: http.MethodPost, + Url: fmt.Sprintf("%s/elasticsearch/_register", agentBaseURL ), + Context: ctx, + Body: reqBody, + } + resBody := util.MapStr{} + err = client.doRequest(req, &resBody) + if err != nil { + return err + } + if resBody["acknowledged"] != true { + return fmt.Errorf("%v", resBody["error"]) + } + return nil +} + +func (client *Client) GetElasticsearchNodes(ctx context.Context, agentBaseURL string) ([]agent.ESNodeInfo, error) { + req := &util.Request{ + Method: http.MethodGet, + Url: fmt.Sprintf("%s/elasticsearch/_nodes", agentBaseURL ), + Context: ctx, + } + resBody := []agent.ESNodeInfo{} + err := client.doRequest(req, &resBody) + if err != nil { + return nil, err + } + + return resBody, nil +} + +func (client *Client) AuthESNode(ctx context.Context, agentBaseURL string, cfg elastic.ElasticsearchConfig) (*agent.ESNodeInfo, error) { + reqBody, err := util.ToJSONBytes(cfg) + if err != nil { + return nil, err + } + req := &util.Request{ + Method: http.MethodPost, + Url: fmt.Sprintf("%s/elasticsearch/_auth", agentBaseURL ), + Context: ctx, + Body: reqBody, + } + resBody := &agent.ESNodeInfo{} + err = client.doRequest(req, resBody) + if err != nil { + return nil, err + } + return resBody, nil +} + +func (client *Client) CreatePipeline(ctx context.Context, agentBaseURL string, body []byte) error{ + req := &util.Request{ + Method: http.MethodPost, + Url: agentBaseURL + "/pipeline/tasks/", + Body: body, + Context: ctx, + } + resBody := util.MapStr{} + return client.doRequest(req, &resBody) +} + +func (client *Client) DeletePipeline(ctx context.Context, agentBaseURL, pipelineID string) error{ + req := &util.Request{ + Method: http.MethodDelete, + Url: fmt.Sprintf("%s/pipeline/task/%s", agentBaseURL, pipelineID), + Context: ctx, + } + return client.doRequest(req, nil) +} + +func (client *Client) doRequest(req *util.Request, respObj interface{}) error { + result, err := util.ExecuteRequest(req) + if err != nil { + return err + } + if result.StatusCode != 200 { + return fmt.Errorf(string(result.Body)) + } + return util.FromJSONBytes(result.Body, respObj) +} diff --git a/modules/agent/common/config.go b/modules/agent/common/config.go new file mode 100644 index 00000000..36b5568c --- /dev/null +++ b/modules/agent/common/config.go @@ -0,0 +1,21 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package common + +type AgentConfig struct { + Enabled bool `config:"enabled"` + StateManager struct{ + Enabled bool `config:"enabled"` + } `config:"state_manager"` + Setup SetupConfig `config:"setup"` +} + +type SetupConfig struct { + DownloadURL string `config:"download_url"` + Version string `config:"version"` + CACertFile string `config:"ca_cert"` + CAKeyFile string `config:"ca_key"` + ScriptEndpoint string `config:"script_endpoint"` +} \ No newline at end of file diff --git a/modules/agent/common/helper.go b/modules/agent/common/helper.go new file mode 100644 index 00000000..5763fa7c --- /dev/null +++ b/modules/agent/common/helper.go @@ -0,0 +1,229 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package common + +import ( + "fmt" + "infini.sh/framework/core/agent" + "infini.sh/framework/core/elastic" + "infini.sh/framework/core/orm" + "infini.sh/framework/core/util" +) + +func ParseAgentSettings(settings []agent.Setting)(*ParseAgentSettingsResult, error){ + var clusterCfgs []elastic.ElasticsearchConfig + var ( + pipelines []util.MapStr + toDeletePipelineNames []string + ) + for _, setting := range settings { + if setting.Metadata.Labels == nil { + return nil, fmt.Errorf("empty metadata labels of setting [%s]", setting.ID) + } + var ( + clusterID string + ok bool + ) + if clusterID, ok = setting.Metadata.Labels["cluster_id"].(string); ok && clusterID != ""{ + cfg := elastic.GetConfig(clusterID) + newCfg := elastic.ElasticsearchConfig{ + Enabled: true, + Name: cfg.Name, + BasicAuth: cfg.BasicAuth, + Endpoint: setting.Metadata.Labels["endpoint"].(string), + } + newCfg.ID = clusterID + clusterCfgs = append(clusterCfgs, newCfg) + }else{ + return nil, fmt.Errorf("got wrong cluster id [%v] from metadata labels", setting.Metadata.Labels["cluster_id"]) + } + nodeUUID := util.ToString(setting.Metadata.Labels["node_uuid"]) + + taskCfg, err := util.MapStr(setting.Payload).GetValue("task") + if err != nil { + return nil, err + } + vBytes, err := util.ToJSONBytes(taskCfg) + if err != nil { + return nil, err + } + taskSetting := TaskSetting{} + err = util.FromJSONBytes(vBytes, &taskSetting) + if err != nil { + return nil, err + } + partPipelines, partDeletePipelineNames, err := TransformSettingsToConfig(&taskSetting, clusterID, nodeUUID) + if err != nil { + return nil, err + } + pipelines = append(pipelines, partPipelines...) + toDeletePipelineNames = append(toDeletePipelineNames, partDeletePipelineNames...) + } + return &ParseAgentSettingsResult{ + ClusterConfigs: clusterCfgs, + Pipelines: pipelines, + ToDeletePipelineNames: toDeletePipelineNames, + }, nil +} + +func GetAgentSettings(agentID string, timestamp int64) ([]agent.Setting, error) { + queryDsl := util.MapStr{ + "size": 500, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "metadata.category": util.MapStr{ + "value": "agent", + }, + }, + }, + { + "term": util.MapStr{ + "metadata.name": util.MapStr{ + "value": "task", + }, + }, + }, + { + "term": util.MapStr{ + "metadata.labels.agent_id": util.MapStr{ + "value": agentID, + }, + }, + }, + { + "range": util.MapStr{ + "updated": util.MapStr{ + "gt": timestamp, + }, + }, + }, + }, + }, + }, + } + q := orm.Query{ + RawQuery: util.MustToJSONBytes(queryDsl), + } + err, result := orm.Search(agent.Setting{}, &q) + if err != nil { + return nil, fmt.Errorf("search settings error: %w", err) + } + if len(result.Result) == 0 { + return nil, nil + } + var settings []agent.Setting + for _, row := range result.Result { + setting := agent.Setting{} + buf, err := util.ToJSONBytes(row) + if err != nil { + return nil, err + } + err = util.FromJSONBytes(buf, &setting) + if err != nil { + return nil, err + } + settings = append(settings, setting) + } + return settings, nil +} + +func TransformSettingsToConfig(setting *TaskSetting, clusterID, nodeUUID string) ([]util.MapStr, []string, error) { + if setting == nil { + return nil, nil, fmt.Errorf("empty setting") + } + var ( + pipelines []util.MapStr + toDeletePipelineNames []string + ) + if setting.ClusterStats != nil { + var processorName = "es_cluster_stats" + if setting.ClusterStats.Enabled { + pipelineCfg, err := newClusterMetricPipeline(processorName, clusterID) + if err != nil { + return nil, nil, err + } + pipelines = append(pipelines, pipelineCfg) + }else{ + toDeletePipelineNames = append(toDeletePipelineNames, getMetricPipelineName(clusterID, processorName)) + } + } + if setting.IndexStats != nil { + var processorName = "es_index_stats" + if setting.IndexStats.Enabled { + pipelineCfg, err := newClusterMetricPipeline(processorName, clusterID) + if err != nil { + return nil, nil, err + } + pipelines = append(pipelines, pipelineCfg) + }else{ + toDeletePipelineNames = append(toDeletePipelineNames, getMetricPipelineName(clusterID, processorName)) + } + } + if setting.ClusterHealth != nil { + var processorName = "es_cluster_health" + if setting.ClusterHealth.Enabled { + pipelineCfg, err := newClusterMetricPipeline(processorName, clusterID) + if err != nil { + return nil, nil, err + } + pipelines = append(pipelines, pipelineCfg) + }else{ + toDeletePipelineNames = append(toDeletePipelineNames, getMetricPipelineName(clusterID, processorName)) + } + } + if setting.NodeStats != nil { + var processorName = "es_node_stats" + if setting.NodeStats.Enabled { + params := util.MapStr{ + "elasticsearch": clusterID, + } + if len(setting.NodeStats.NodeIDs) > 0{ + params["node_uuids"] = setting.NodeStats.NodeIDs + } + cfg := util.MapStr{ + processorName: params, + } + enabled := true + pipelineCfg := util.MapStr{ + "enabled": &enabled, + "name": getMetricPipelineName(nodeUUID, processorName), + "auto_start": true, + "keep_running": true, + "retry_delay_in_ms": 10000, + "processor": []util.MapStr{cfg}, + } + pipelines = append(pipelines, pipelineCfg) + }else{ + toDeletePipelineNames = append(toDeletePipelineNames, getMetricPipelineName(nodeUUID, processorName)) + } + } + return pipelines, toDeletePipelineNames, nil +} + +func newClusterMetricPipeline(processorName string, clusterID string)(util.MapStr, error){ + cfg := util.MapStr{ + processorName: util.MapStr{ + "elasticsearch": clusterID, + }, + } + enabled := true + pipelineCfg := util.MapStr{ + "enabled": &enabled, + "name": getMetricPipelineName(clusterID, processorName), + "auto_start": true, + "keep_running": true, + "retry_delay_in_ms": 10000, + "processor": []util.MapStr{cfg}, + } + return pipelineCfg, nil +} + +func getMetricPipelineName(clusterID, processorName string) string{ + return fmt.Sprintf("collect_%s_%s", clusterID, processorName) +} + diff --git a/modules/agent/common/helper_test.go b/modules/agent/common/helper_test.go new file mode 100644 index 00000000..285df64a --- /dev/null +++ b/modules/agent/common/helper_test.go @@ -0,0 +1,41 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package common + +import ( + "fmt" + "gopkg.in/yaml.v2" + "infini.sh/framework/core/util" + "testing" +) + +func TestTransformSettingsToConfig(t *testing.T) { + setting := TaskSetting{ + ClusterHealth: ClusterHealthTask{ + Enabled: true, + }, + ClusterStats: ClusterStatsTask { + Enabled: true, + }, + IndexStats: IndexStatsTask{ + Enabled: true, + }, + NodeStats: NodeStatsTask{ + Enabled: true, + NodeIDs: []string{"ddddnnnn"}, + }, + } + pipelines, err := transformSettingsToConfig(&setting, "testxxx") + if err !=nil { + t.Fatal(err) + } + buf, err := yaml.Marshal(util.MapStr{ + "pipeline": pipelines, + }) + if err !=nil { + t.Fatal(err) + } + fmt.Println(string(buf)) +} diff --git a/modules/agent/common/interface.go b/modules/agent/common/interface.go new file mode 100644 index 00000000..fa04d384 --- /dev/null +++ b/modules/agent/common/interface.go @@ -0,0 +1,66 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package common + +import ( + "context" + "infini.sh/framework/core/agent" + "infini.sh/framework/core/elastic" + "infini.sh/framework/core/host" +) + +var defaultClient ClientAPI + +func GetClient() ClientAPI { + if defaultClient == nil { + panic("agent client not init") + } + return defaultClient +} + +func RegisterClient(client ClientAPI) { + defaultClient = client +} +type ClientAPI interface { + GetHostInfo(ctx context.Context, agentBaseURL string) (*host.HostInfo, error) + DiscoveredHost(ctx context.Context, agentBaseURL string, body interface{}) error + GetElasticProcess(ctx context.Context, agentBaseURL string, agentID string)(interface{}, error) + GetElasticLogFiles(ctx context.Context, agentBaseURL string, logsPath string)(interface{}, error) + GetElasticLogFileContent(ctx context.Context, agentBaseURL string, body interface{})(interface{}, error) + GetInstanceBasicInfo(ctx context.Context, agentBaseURL string) (*agent.Instance, error) + RegisterElasticsearch(ctx context.Context, agentBaseURL string, cfgs []elastic.ElasticsearchConfig) error + GetElasticsearchNodes(ctx context.Context, agentBaseURL string) ([]agent.ESNodeInfo, error) + AuthESNode(ctx context.Context, agentBaseURL string, cfg elastic.ElasticsearchConfig) (*agent.ESNodeInfo, error) + CreatePipeline(ctx context.Context, agentBaseURL string, body []byte) error + DeletePipeline(ctx context.Context, agentBaseURL, pipelineID string) error +} + + +var stateManager IStateManager + +func GetStateManager() IStateManager { + if stateManager == nil { + panic("agent state manager not init") + } + return stateManager +} + +func RegisterStateManager(sm IStateManager) { + stateManager = sm +} + +func IsEnabled() bool { + return stateManager != nil +} + +type IStateManager interface { + GetAgent(ID string) (*agent.Instance, error) + UpdateAgent(inst *agent.Instance, syncToES bool) (*agent.Instance, error) + GetTaskAgent(clusterID string) (*agent.Instance, error) + DeleteAgent(agentID string) error + LoopState() + Stop() + GetAgentClient() ClientAPI +} \ No newline at end of file diff --git a/modules/agent/common/model.go b/modules/agent/common/model.go new file mode 100644 index 00000000..e46452be --- /dev/null +++ b/modules/agent/common/model.go @@ -0,0 +1,45 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package common + +import ( + "infini.sh/framework/core/elastic" + "infini.sh/framework/core/util" +) + +type TaskSetting struct { + ClusterHealth *ClusterHealthTask `json:"cluster_health,omitempty"` + ClusterStats *ClusterStatsTask `json:"cluster_stats,omitempty"` + IndexStats *IndexStatsTask `json:"index_stats,omitempty"` + NodeStats *NodeStatsTask `json:"node_stats,omitempty"` + Logs *LogsTask `json:"logs,omitempty"` +} + +type ClusterHealthTask struct { + Enabled bool `json:"enabled"` +} + +type ClusterStatsTask struct { + Enabled bool `json:"enabled"` +} + +type IndexStatsTask struct { + Enabled bool `json:"enabled"` +} + +type NodeStatsTask struct { + Enabled bool `json:"enabled"` + NodeIDs []string `json:"node_ids,omitempty"` +} + +type LogsTask struct { + Enabled bool `json:"enabled"` +} + +type ParseAgentSettingsResult struct { + ClusterConfigs []elastic.ElasticsearchConfig + Pipelines []util.MapStr + ToDeletePipelineNames []string +} diff --git a/modules/agent/common/state.go b/modules/agent/common/state.go new file mode 100644 index 00000000..0ce2af65 --- /dev/null +++ b/modules/agent/common/state.go @@ -0,0 +1,366 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package common + +import ( + "context" + "fmt" + "github.com/buger/jsonparser" + log "github.com/cihub/seelog" + "infini.sh/framework/core/agent" + "infini.sh/framework/core/elastic" + "infini.sh/framework/core/event" + "infini.sh/framework/core/host" + "infini.sh/framework/core/kv" + "infini.sh/framework/core/orm" + "infini.sh/framework/core/util" + "runtime" + "runtime/debug" + "sync" + "time" +) + +const ( + StatusOnline string = "online" + StatusOffline = "offline" +) + +type StateManager struct { + TTL time.Duration // kv ttl + KVKey string + stopC chan struct{} + stopCompleteC chan struct{} + agentClient *Client + agentIds map[string]string + agentMutex sync.Mutex + workerChan chan struct{} + timestamps map[string]int64 +} + +func NewStateManager(TTL time.Duration, kvKey string, agentIds map[string]string) *StateManager { + return &StateManager{ + TTL: TTL, + KVKey: kvKey, + stopC: make(chan struct{}), + stopCompleteC: make(chan struct{}), + agentClient: &Client{}, + agentIds: agentIds, + workerChan: make(chan struct{}, runtime.NumCPU()), + timestamps: map[string]int64{}, + } +} + +func (sm *StateManager) checkAgentStatus() { + onlineAgentIDs, err := GetLatestOnlineAgentIDs(nil, int(sm.TTL.Seconds())) + if err != nil { + log.Error(err) + return + } + //add new agent to state + sm.agentMutex.Lock() + for agentID := range onlineAgentIDs { + if _, ok := sm.agentIds[agentID]; !ok { + log.Infof("status of agent [%s] changed to online", agentID) + sm.agentIds[agentID] = StatusOnline + } + } + sm.agentMutex.Unlock() + for agentID, status := range sm.agentIds { + if _, ok := onlineAgentIDs[agentID]; ok { + sm.syncSettings(agentID) + if status == StatusOnline { + continue + } + // status change to online + sm.agentIds[agentID] = StatusOnline + log.Infof("status of agent [%s] changed to online", agentID) + //set timestamp equals 0 to create pipeline + sm.timestamps[agentID] = 0 + continue + }else{ + // already offline + if status == StatusOffline { + continue + } + } + // status change to offline + sm.agentIds[agentID] = StatusOffline + sm.workerChan <- struct{}{} + go func(agentID string) { + defer func() { + if err := recover(); err != nil { + log.Errorf("check agent [%s] status recover form panic error: %v", agentID, err) + debug.PrintStack() + } + <-sm.workerChan + }() + ag, err := sm.GetAgent(agentID) + if err != nil { + log.Error(err) + return + } + ag.Status = StatusOffline + log.Infof("agent [%s] is offline", ag.Endpoint) + _, err = sm.UpdateAgent(ag, true) + if err != nil { + log.Error(err) + return + } + //update host agent status + host.UpdateHostAgentStatus(ag.ID, StatusOffline) + }(agentID) + + } +} + +func (sm *StateManager) syncSettings(agentID string) { + newTimestamp := time.Now().UnixMilli() + settings, err := GetAgentSettings(agentID, sm.timestamps[agentID]) + if err != nil { + log.Errorf("query agent settings error: %v", err) + return + } + if len(settings) == 0 { + log.Debugf("got no settings of agent [%s]", agentID) + return + } + parseResult, err := ParseAgentSettings(settings) + if err != nil { + log.Errorf("parse agent settings error: %v", err) + return + } + ag, err := sm.GetAgent(agentID) + if err != nil { + log.Errorf("get agent error: %v", err) + return + } + agClient := sm.GetAgentClient() + if len(parseResult.ClusterConfigs) > 0 { + err = agClient.RegisterElasticsearch(nil, ag.GetEndpoint(), parseResult.ClusterConfigs) + if err != nil { + log.Errorf("register elasticsearch config error: %v", err) + return + } + } + for _, pipelineID := range parseResult.ToDeletePipelineNames { + err = agClient.DeletePipeline(context.Background(), ag.GetEndpoint(), pipelineID) + if err != nil { + log.Errorf("delete pipeline error: %v", err) + continue + } + } + for _, pipeline := range parseResult.Pipelines { + err = agClient.CreatePipeline(context.Background(), ag.GetEndpoint(), util.MustToJSONBytes(pipeline)) + if err != nil { + log.Errorf("create pipeline error: %v", err) + return + } + } + sm.timestamps[agentID] = newTimestamp +} + +func (sm *StateManager) getAvailableAgent(clusterID string) (*agent.Instance, error) { + agents, err := LoadAgentsFromES(clusterID) + if err != nil { + return nil, err + } + if len(agents) == 0 { + return nil, nil + } + for _, ag := range agents { + if ag.Status == "offline" { + continue + } + } + return nil, nil +} + +func (sm *StateManager) LoopState() { + t := time.NewTicker(30 * time.Second) + defer t.Stop() +MAINLOOP: + for { + select { + case <-sm.stopC: + sm.stopCompleteC <- struct{}{} + close(sm.workerChan) + break MAINLOOP + case <-t.C: + sm.checkAgentStatus() + } + } +} + +func (sm *StateManager) Stop() { + sm.stopC <- struct{}{} + <-sm.stopCompleteC +} + +func (sm *StateManager) GetAgent(ID string) (*agent.Instance, error) { + buf, err := kv.GetValue(sm.KVKey, []byte(ID)) + if err != nil { + return nil, err + } + strTime, _ := jsonparser.GetString(buf, "timestamp") + timestamp, _ := time.Parse(time.RFC3339, strTime) + inst := &agent.Instance{} + inst.ID = ID + if time.Since(timestamp) > sm.TTL { + exists, err := orm.Get(inst) + if err != nil { + return nil, fmt.Errorf("get agent [%s] error: %w", ID, err) + } + if !exists { + return nil, fmt.Errorf("can not found agent [%s]", ID) + } + //inst.Timestamp = time.Now() + err = kv.AddValue(sm.KVKey, []byte(ID), util.MustToJSONBytes(inst)) + if err != nil { + log.Errorf("save agent [%s] to kv error: %v", ID, err) + } + return inst, nil + } + err = util.FromJSONBytes(buf, inst) + return inst, err +} + +func (sm *StateManager) UpdateAgent(inst *agent.Instance, syncToES bool) (*agent.Instance, error) { + //inst.Timestamp = time.Now() + err := kv.AddValue(sm.KVKey, []byte(inst.ID), util.MustToJSONBytes(inst)) + if syncToES { + ctx := orm.Context{ + Refresh: "wait_for", + } + err = orm.Update(&ctx, inst) + if err != nil { + return nil, err + } + } + return inst, err +} + +func (sm *StateManager) GetTaskAgent(clusterID string) (*agent.Instance, error) { + return nil, nil +} + + +func (sm *StateManager) DeleteAgent(agentID string) error { + sm.agentMutex.Lock() + delete(sm.agentIds, agentID) + sm.agentMutex.Unlock() + log.Infof("delete agent [%s] from state", agentID) + + return kv.DeleteKey(sm.KVKey, []byte(agentID)) +} + +func (sm *StateManager) GetAgentClient() ClientAPI { + return sm.agentClient +} + +func LoadAgentsFromES(clusterID string) ([]agent.Instance, error) { + q := orm.Query{ + Size: 1000, + } + if clusterID != "" { + q.Conds = orm.And(orm.Eq("id", clusterID)) + } + err, result := orm.Search(agent.Instance{}, &q) + if err != nil { + return nil, fmt.Errorf("query agent error: %w", err) + } + + if len(result.Result) > 0 { + var agents = make([]agent.Instance, 0, len(result.Result)) + for _, row := range result.Result { + ag := agent.Instance{} + bytes := util.MustToJSONBytes(row) + err = util.FromJSONBytes(bytes, &ag) + if err != nil { + log.Errorf("got unexpected agent: %s, error: %v", string(bytes), err) + continue + } + agents = append(agents, ag) + } + return agents, nil + } + return nil, nil +} + +func GetLatestOnlineAgentIDs(agentIds []string, lastSeconds int) (map[string]struct{}, error) { + q := orm.Query{ + WildcardIndex: true, + } + mustQ := []util.MapStr{ + { + "term": util.MapStr{ + "metadata.name": util.MapStr{ + "value": "agent", + }, + }, + }, + { + "term": util.MapStr{ + "metadata.category": util.MapStr{ + "value": "instance", + }, + }, + }, + } + if len(agentIds) > 0 { + mustQ = append(mustQ, util.MapStr{ + "terms": util.MapStr{ + "agent.id": agentIds, + }, + }) + } + queryDSL := util.MapStr{ + "_source": "agent.id", + "sort": []util.MapStr{ + { + "timestamp": util.MapStr{ + "order": "desc", + }, + }, + }, + "collapse": util.MapStr{ + "field": "agent.id", + }, + "query": util.MapStr{ + "bool": util.MapStr{ + "filter": []util.MapStr{ + { + "range": util.MapStr{ + "timestamp": util.MapStr{ + "gte": fmt.Sprintf("now-%ds", lastSeconds), + }, + }, + }, + }, + "must": mustQ, + }, + }, + } + q.RawQuery = util.MustToJSONBytes(queryDSL) + err, result := orm.Search(event.Event{}, &q) + if err != nil { + return nil, fmt.Errorf("query agent instance metric error: %w", err) + } + agentIDs := map[string]struct{}{} + if len(result.Result) > 0 { + searchRes := elastic.SearchResponse{} + err = util.FromJSONBytes(result.Raw, &searchRes) + if err != nil { + return nil, err + } + agentIDKeyPath := []string{"agent", "id"} + for _, hit := range searchRes.Hits.Hits { + agentID, _ := util.GetMapValueByKeys(agentIDKeyPath, hit.Source) + if v, ok := agentID.(string); ok { + agentIDs[v] = struct{}{} + } + } + } + return agentIDs, nil +} \ No newline at end of file