diff --git a/modules/agent/agent.go b/modules/agent/agent.go index 1461856e..8352021a 100644 --- a/modules/agent/agent.go +++ b/modules/agent/agent.go @@ -7,7 +7,10 @@ package agent import ( log "github.com/cihub/seelog" "infini.sh/console/modules/agent/api" + "infini.sh/console/modules/agent/client" "infini.sh/console/modules/agent/common" + "infini.sh/console/modules/agent/model" + "infini.sh/console/modules/agent/state" "infini.sh/framework/core/agent" "infini.sh/framework/core/env" "infini.sh/framework/core/host" @@ -38,7 +41,7 @@ func (module *AgentModule) Start() error { orm.RegisterSchemaWithIndexName(agent.ESNodeInfo{}, "agent-node") orm.RegisterSchemaWithIndexName(host.HostInfo{}, "host") orm.RegisterSchemaWithIndexName(agent.Setting{}, "agent-setting") - common.RegisterClient(&common.Client{}) + client.RegisterClient(&client.Client{}) if module.AgentConfig.StateManager.Enabled { onlineAgentIDs, err := common.GetLatestOnlineAgentIDs(nil, 60) @@ -56,8 +59,8 @@ func (module *AgentModule) Start() error { } } - sm := common.NewStateManager(time.Second*30, "agent_state", agentIds) - common.RegisterStateManager(sm) + sm := state.NewStateManager(time.Second*30, "agent_state", agentIds) + state.RegisterStateManager(sm) go sm.LoopState() } return nil @@ -69,12 +72,12 @@ func (module *AgentModule) Stop() error { } log.Info("start to stop agent module") if module.AgentConfig.StateManager.Enabled { - common.GetStateManager().Stop() + state.GetStateManager().Stop() } log.Info("agent module was stopped") return nil } type AgentModule struct { - common.AgentConfig + model.AgentConfig } diff --git a/modules/agent/api/host.go b/modules/agent/api/host.go index c873501c..3d1b15a1 100644 --- a/modules/agent/api/host.go +++ b/modules/agent/api/host.go @@ -7,13 +7,13 @@ package api import ( "context" "fmt" - common2 "infini.sh/console/modules/agent/common" + log "github.com/cihub/seelog" + "infini.sh/console/modules/agent/state" httprouter "infini.sh/framework/core/api/router" "infini.sh/framework/core/host" "infini.sh/framework/core/orm" "infini.sh/framework/core/util" "net/http" - log "github.com/cihub/seelog" "time" ) @@ -107,7 +107,7 @@ func (h *APIHandler) GetHostAgentInfo(w http.ResponseWriter, req *http.Request, return } - sm := common2.GetStateManager() + sm := state.GetStateManager() ag, err := sm.GetAgent(hostInfo.AgentID) if err != nil { log.Error(err) @@ -159,7 +159,7 @@ func (h *APIHandler) GetHostElasticProcess(w http.ResponseWriter, req *http.Requ h.WriteJSON(w, util.MapStr{}, http.StatusOK) return } - sm := common2.GetStateManager() + sm := state.GetStateManager() ag, err := sm.GetAgent(hostInfo.AgentID) if err != nil { log.Error(err) @@ -193,7 +193,7 @@ func (h *APIHandler) GetHostElasticProcess(w http.ResponseWriter, req *http.Requ } func enrollHostFromAgent(agentID string) (*host.HostInfo, error){ - sm := common2.GetStateManager() + sm := state.GetStateManager() ag, err := sm.GetAgent(agentID) if err != nil { return nil, err diff --git a/modules/agent/api/instance.go b/modules/agent/api/instance.go index b85480ca..2b681a9e 100644 --- a/modules/agent/api/instance.go +++ b/modules/agent/api/instance.go @@ -8,7 +8,10 @@ import ( "context" "fmt" log "github.com/cihub/seelog" + "infini.sh/console/modules/agent/client" common2 "infini.sh/console/modules/agent/common" + "infini.sh/console/modules/agent/model" + "infini.sh/console/modules/agent/state" "infini.sh/framework/core/agent" "infini.sh/framework/core/api" httprouter "infini.sh/framework/core/api/router" @@ -51,7 +54,7 @@ func (h *APIHandler) createInstance(w http.ResponseWriter, req *http.Request, ps return } //fetch more information of agent instance - res, err := common2.GetClient().GetInstanceBasicInfo(context.Background(), obj.GetEndpoint()) + res, err := client.GetClient().GetInstanceBasicInfo(context.Background(), obj.GetEndpoint()) if err != nil { errStr := fmt.Sprintf("get agent instance basic info error: %s", err.Error()) h.WriteError(w,errStr , http.StatusInternalServerError) @@ -71,7 +74,7 @@ func (h *APIHandler) createInstance(w http.ResponseWriter, req *http.Request, ps obj.IPS = res.IPS } - obj.Status = common2.StatusOnline + obj.Status = model.StatusOnline err = orm.Create(nil, obj) if err != nil { h.WriteError(w, err.Error(), http.StatusInternalServerError) @@ -135,7 +138,7 @@ func (h *APIHandler) deleteInstance(w http.ResponseWriter, req *http.Request, ps log.Error(err) return } - if sm := common2.GetStateManager(); sm != nil { + if sm := state.GetStateManager(); sm != nil { sm.DeleteAgent(obj.ID) } queryDsl := util.MapStr{ @@ -388,7 +391,7 @@ func (h *APIHandler) authESNode(w http.ResponseWriter, req *http.Request, ps htt return } cfg.BasicAuth = &basicAuth - nodeInfo, err := common2.GetClient().AuthESNode(context.Background(), inst.GetEndpoint(), *cfg) + nodeInfo, err := client.GetClient().AuthESNode(context.Background(), inst.GetEndpoint(), *cfg) if err != nil { log.Error(err) h.WriteError(w, err.Error(), http.StatusInternalServerError) @@ -511,7 +514,7 @@ func (h *APIHandler) tryConnect(w http.ResponseWriter, req *http.Request, ps htt h.WriteError(w, err.Error(), http.StatusInternalServerError) return } - connectRes, err := common2.GetClient().GetInstanceBasicInfo(context.Background(), reqBody.Endpoint) + connectRes, err := client.GetClient().GetInstanceBasicInfo(context.Background(), reqBody.Endpoint) if err != nil { h.WriteError(w, err.Error(), http.StatusInternalServerError) return @@ -520,7 +523,7 @@ func (h *APIHandler) tryConnect(w http.ResponseWriter, req *http.Request, ps htt } func refreshNodesInfo(inst *agent.Instance) ([]agent.ESNodeInfo, error) { - nodesInfo, err := common2.GetClient().GetElasticsearchNodes(context.Background(), inst.GetEndpoint()) + nodesInfo, err := client.GetClient().GetElasticsearchNodes(context.Background(), inst.GetEndpoint()) if err != nil { return nil, fmt.Errorf("get elasticsearch nodes error: %w", err) } @@ -656,7 +659,7 @@ func getAgentTaskSetting(agentID string, node agent.ESNodeInfo) (*agent.Setting, } // getSettingsByClusterID query agent task settings with cluster id -func getSettingsByClusterID(clusterID string) (*common2.TaskSetting, error) { +func getSettingsByClusterID(clusterID string) (*model.TaskSetting, error) { queryDsl := util.MapStr{ "size": 200, "query": util.MapStr{ @@ -704,8 +707,8 @@ func getSettingsByClusterID(clusterID string) (*common2.TaskSetting, error) { return nil, err } - setting := &common2.TaskSetting{ - NodeStats: &common2.NodeStatsTask{ + setting := &model.TaskSetting{ + NodeStats: &model.NodeStatsTask{ Enabled: true, }, } @@ -734,17 +737,17 @@ func getSettingsByClusterID(clusterID string) (*common2.TaskSetting, error) { } } if clusterStats { - setting.ClusterStats = &common2.ClusterStatsTask{ + setting.ClusterStats = &model.ClusterStatsTask{ Enabled: true, } } if indexStats { - setting.IndexStats = &common2.IndexStatsTask{ + setting.IndexStats = &model.IndexStatsTask{ Enabled: true, } } if clusterHealth { - setting.ClusterHealth = &common2.ClusterHealthTask{ + setting.ClusterHealth = &model.ClusterHealthTask{ Enabled: true, } } diff --git a/modules/agent/api/log.go b/modules/agent/api/log.go index f36e2be0..0164fc5d 100644 --- a/modules/agent/api/log.go +++ b/modules/agent/api/log.go @@ -7,7 +7,8 @@ package api import ( "fmt" log "github.com/cihub/seelog" - "infini.sh/console/modules/agent/common" + "infini.sh/console/modules/agent/client" + "infini.sh/console/modules/agent/state" "infini.sh/framework/core/agent" httprouter "infini.sh/framework/core/api/router" "infini.sh/framework/core/orm" @@ -31,7 +32,7 @@ func (h *APIHandler) getLogFilesByNode(w http.ResponseWriter, req *http.Request, }, http.StatusOK) return } - logFiles, err := common.GetClient().GetElasticLogFiles(nil, inst.GetEndpoint(), node.Path.Logs) + logFiles, err := client.GetClient().GetElasticLogFiles(nil, inst.GetEndpoint(), node.Path.Logs) if err != nil { h.WriteError(w, err.Error(), http.StatusInternalServerError) log.Error(err) @@ -68,7 +69,7 @@ func (h *APIHandler) getLogFileContent(w http.ResponseWriter, req *http.Request, return } reqBody.LogsPath = node.Path.Logs - sm := common.GetStateManager() + sm := state.GetStateManager() res, err := sm.GetAgentClient().GetElasticLogFileContent(nil, inst.GetEndpoint(), reqBody) if err != nil { h.WriteError(w, err.Error(), http.StatusInternalServerError) diff --git a/modules/agent/common/client.go b/modules/agent/client/client.go similarity index 84% rename from modules/agent/common/client.go rename to modules/agent/client/client.go index cdbdf944..5db68a14 100644 --- a/modules/agent/common/client.go +++ b/modules/agent/client/client.go @@ -2,12 +2,13 @@ * Web: https://infinilabs.com * Email: hello#infini.ltd */ -package common +package client import ( "bytes" "context" "fmt" + "infini.sh/console/modules/agent/common" "infini.sh/framework/core/agent" "infini.sh/framework/core/elastic" "infini.sh/framework/core/global" @@ -20,6 +21,31 @@ import ( "sync" ) +var defaultClient ClientAPI + +func GetClient() ClientAPI { + if defaultClient == nil { + panic("agent client not init") + } + return defaultClient +} + +func RegisterClient(client ClientAPI) { + defaultClient = client +} +type ClientAPI interface { + GetHostInfo(ctx context.Context, agentBaseURL string) (*host.HostInfo, error) + GetElasticProcess(ctx context.Context, agentBaseURL string, agentID string)(interface{}, error) + GetElasticLogFiles(ctx context.Context, agentBaseURL string, logsPath string)(interface{}, error) + GetElasticLogFileContent(ctx context.Context, agentBaseURL string, body interface{})(interface{}, error) + GetInstanceBasicInfo(ctx context.Context, agentBaseURL string) (*agent.Instance, error) + RegisterElasticsearch(ctx context.Context, agentBaseURL string, cfgs []elastic.ElasticsearchConfig) error + GetElasticsearchNodes(ctx context.Context, agentBaseURL string) ([]agent.ESNodeInfo, error) + AuthESNode(ctx context.Context, agentBaseURL string, cfg elastic.ElasticsearchConfig) (*agent.ESNodeInfo, error) + CreatePipeline(ctx context.Context, agentBaseURL string, body []byte) error + DeletePipeline(ctx context.Context, agentBaseURL, pipelineID string) error +} + type Client struct { } @@ -213,7 +239,7 @@ var( hClientOnce = sync.Once{} ) func (client *Client) doRequest(req *util.Request, respObj interface{}) error { - agCfg := GetAgentConfig() + agCfg := common.GetAgentConfig() var err error hClientOnce.Do(func() { var ( @@ -270,7 +296,7 @@ func getAgentInstanceCerts(caFile, caKey string) (string, string, error) { if util.FileExists(instanceCrt) && util.FileExists(instanceKey) { return instanceCrt, instanceKey, nil } - _, clientCertPEM, clientKeyPEM, err = GenerateClientCert(caFile, caKey) + _, clientCertPEM, clientKeyPEM, err = common.GenerateClientCert(caFile, caKey) if err != nil { return "", "", err } diff --git a/modules/agent/common/config.go b/modules/agent/common/config.go index 71fc0a04..643b67fe 100644 --- a/modules/agent/common/config.go +++ b/modules/agent/common/config.go @@ -5,32 +5,17 @@ package common import ( + "infini.sh/console/modules/agent/model" "infini.sh/framework/core/env" "sync" ) -type AgentConfig struct { - Enabled bool `config:"enabled"` - StateManager struct{ - Enabled bool `config:"enabled"` - } `config:"state_manager"` - Setup SetupConfig `config:"setup"` -} - -type SetupConfig struct { - DownloadURL string `config:"download_url"` - Version string `config:"version"` - CACertFile string `config:"ca_cert"` - CAKeyFile string `config:"ca_key"` - ScriptEndpoint string `config:"script_endpoint"` -} - -var agentCfg *AgentConfig +var agentCfg *model.AgentConfig var onceCfg = sync.Once{} -func GetAgentConfig() *AgentConfig { +func GetAgentConfig() *model.AgentConfig { onceCfg.Do(func() { - agentCfg = &AgentConfig{} + agentCfg = &model.AgentConfig{} _, err := env.ParseConfig("agent", agentCfg ) if err != nil { panic(err) diff --git a/modules/agent/common/helper.go b/modules/agent/common/helper.go index 5763fa7c..8e760104 100644 --- a/modules/agent/common/helper.go +++ b/modules/agent/common/helper.go @@ -6,13 +6,16 @@ package common import ( "fmt" + "infini.sh/console/modules/agent/model" "infini.sh/framework/core/agent" "infini.sh/framework/core/elastic" + "infini.sh/framework/core/event" "infini.sh/framework/core/orm" "infini.sh/framework/core/util" + log "src/github.com/cihub/seelog" ) -func ParseAgentSettings(settings []agent.Setting)(*ParseAgentSettingsResult, error){ +func ParseAgentSettings(settings []agent.Setting)(*model.ParseAgentSettingsResult, error){ var clusterCfgs []elastic.ElasticsearchConfig var ( pipelines []util.MapStr @@ -49,7 +52,7 @@ func ParseAgentSettings(settings []agent.Setting)(*ParseAgentSettingsResult, err if err != nil { return nil, err } - taskSetting := TaskSetting{} + taskSetting := model.TaskSetting{} err = util.FromJSONBytes(vBytes, &taskSetting) if err != nil { return nil, err @@ -61,7 +64,7 @@ func ParseAgentSettings(settings []agent.Setting)(*ParseAgentSettingsResult, err pipelines = append(pipelines, partPipelines...) toDeletePipelineNames = append(toDeletePipelineNames, partDeletePipelineNames...) } - return &ParseAgentSettingsResult{ + return &model.ParseAgentSettingsResult{ ClusterConfigs: clusterCfgs, Pipelines: pipelines, ToDeletePipelineNames: toDeletePipelineNames, @@ -132,7 +135,7 @@ func GetAgentSettings(agentID string, timestamp int64) ([]agent.Setting, error) return settings, nil } -func TransformSettingsToConfig(setting *TaskSetting, clusterID, nodeUUID string) ([]util.MapStr, []string, error) { +func TransformSettingsToConfig(setting *model.TaskSetting, clusterID, nodeUUID string) ([]util.MapStr, []string, error) { if setting == nil { return nil, nil, fmt.Errorf("empty setting") } @@ -227,3 +230,109 @@ func getMetricPipelineName(clusterID, processorName string) string{ return fmt.Sprintf("collect_%s_%s", clusterID, processorName) } + +func LoadAgentsFromES(clusterID string) ([]agent.Instance, error) { + q := orm.Query{ + Size: 1000, + } + if clusterID != "" { + q.Conds = orm.And(orm.Eq("id", clusterID)) + } + err, result := orm.Search(agent.Instance{}, &q) + if err != nil { + return nil, fmt.Errorf("query agent error: %w", err) + } + + if len(result.Result) > 0 { + var agents = make([]agent.Instance, 0, len(result.Result)) + for _, row := range result.Result { + ag := agent.Instance{} + bytes := util.MustToJSONBytes(row) + err = util.FromJSONBytes(bytes, &ag) + if err != nil { + log.Errorf("got unexpected agent: %s, error: %v", string(bytes), err) + continue + } + agents = append(agents, ag) + } + return agents, nil + } + return nil, nil +} + +func GetLatestOnlineAgentIDs(agentIds []string, lastSeconds int) (map[string]struct{}, error) { + q := orm.Query{ + WildcardIndex: true, + } + mustQ := []util.MapStr{ + { + "term": util.MapStr{ + "metadata.name": util.MapStr{ + "value": "agent", + }, + }, + }, + { + "term": util.MapStr{ + "metadata.category": util.MapStr{ + "value": "instance", + }, + }, + }, + } + if len(agentIds) > 0 { + mustQ = append(mustQ, util.MapStr{ + "terms": util.MapStr{ + "agent.id": agentIds, + }, + }) + } + queryDSL := util.MapStr{ + "_source": "agent.id", + "sort": []util.MapStr{ + { + "timestamp": util.MapStr{ + "order": "desc", + }, + }, + }, + "collapse": util.MapStr{ + "field": "agent.id", + }, + "query": util.MapStr{ + "bool": util.MapStr{ + "filter": []util.MapStr{ + { + "range": util.MapStr{ + "timestamp": util.MapStr{ + "gte": fmt.Sprintf("now-%ds", lastSeconds), + }, + }, + }, + }, + "must": mustQ, + }, + }, + } + q.RawQuery = util.MustToJSONBytes(queryDSL) + err, result := orm.Search(event.Event{}, &q) + if err != nil { + return nil, fmt.Errorf("query agent instance metric error: %w", err) + } + agentIDs := map[string]struct{}{} + if len(result.Result) > 0 { + searchRes := elastic.SearchResponse{} + err = util.FromJSONBytes(result.Raw, &searchRes) + if err != nil { + return nil, err + } + agentIDKeyPath := []string{"agent", "id"} + for _, hit := range searchRes.Hits.Hits { + agentID, _ := util.GetMapValueByKeys(agentIDKeyPath, hit.Source) + if v, ok := agentID.(string); ok { + agentIDs[v] = struct{}{} + } + } + } + return agentIDs, nil +} \ No newline at end of file diff --git a/modules/agent/common/interface.go b/modules/agent/common/interface.go deleted file mode 100644 index 6407358f..00000000 --- a/modules/agent/common/interface.go +++ /dev/null @@ -1,65 +0,0 @@ -/* Copyright © INFINI Ltd. All rights reserved. - * Web: https://infinilabs.com - * Email: hello#infini.ltd */ - -package common - -import ( - "context" - "infini.sh/framework/core/agent" - "infini.sh/framework/core/elastic" - "infini.sh/framework/core/host" -) - -var defaultClient ClientAPI - -func GetClient() ClientAPI { - if defaultClient == nil { - panic("agent client not init") - } - return defaultClient -} - -func RegisterClient(client ClientAPI) { - defaultClient = client -} -type ClientAPI interface { - GetHostInfo(ctx context.Context, agentBaseURL string) (*host.HostInfo, error) - GetElasticProcess(ctx context.Context, agentBaseURL string, agentID string)(interface{}, error) - GetElasticLogFiles(ctx context.Context, agentBaseURL string, logsPath string)(interface{}, error) - GetElasticLogFileContent(ctx context.Context, agentBaseURL string, body interface{})(interface{}, error) - GetInstanceBasicInfo(ctx context.Context, agentBaseURL string) (*agent.Instance, error) - RegisterElasticsearch(ctx context.Context, agentBaseURL string, cfgs []elastic.ElasticsearchConfig) error - GetElasticsearchNodes(ctx context.Context, agentBaseURL string) ([]agent.ESNodeInfo, error) - AuthESNode(ctx context.Context, agentBaseURL string, cfg elastic.ElasticsearchConfig) (*agent.ESNodeInfo, error) - CreatePipeline(ctx context.Context, agentBaseURL string, body []byte) error - DeletePipeline(ctx context.Context, agentBaseURL, pipelineID string) error -} - - -var stateManager IStateManager - -func GetStateManager() IStateManager { - if stateManager == nil { - panic("agent state manager not init") - } - return stateManager -} - -func RegisterStateManager(sm IStateManager) { - stateManager = sm -} - -func IsEnabled() bool { - return stateManager != nil -} - -type IStateManager interface { - GetAgent(ID string) (*agent.Instance, error) - UpdateAgent(inst *agent.Instance, syncToES bool) (*agent.Instance, error) - GetTaskAgent(clusterID string) (*agent.Instance, error) - DeleteAgent(agentID string) error - LoopState() - Stop() - GetAgentClient() ClientAPI -} \ No newline at end of file diff --git a/modules/agent/model/config.go b/modules/agent/model/config.go new file mode 100644 index 00000000..465e8f52 --- /dev/null +++ b/modules/agent/model/config.go @@ -0,0 +1,21 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package model + +type AgentConfig struct { + Enabled bool `config:"enabled"` + StateManager struct{ + Enabled bool `config:"enabled"` + } `config:"state_manager"` + Setup SetupConfig `config:"setup"` +} + +type SetupConfig struct { + DownloadURL string `config:"download_url"` + Version string `config:"version"` + CACertFile string `config:"ca_cert"` + CAKeyFile string `config:"ca_key"` + ScriptEndpoint string `config:"script_endpoint"` +} diff --git a/modules/agent/model/const.go b/modules/agent/model/const.go new file mode 100644 index 00000000..6115f841 --- /dev/null +++ b/modules/agent/model/const.go @@ -0,0 +1,10 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package model + +const ( + StatusOnline string = "online" + StatusOffline = "offline" +) diff --git a/modules/agent/common/model.go b/modules/agent/model/task.go similarity index 98% rename from modules/agent/common/model.go rename to modules/agent/model/task.go index e46452be..d8b42006 100644 --- a/modules/agent/common/model.go +++ b/modules/agent/model/task.go @@ -2,7 +2,7 @@ * Web: https://infinilabs.com * Email: hello#infini.ltd */ -package common +package model import ( "infini.sh/framework/core/elastic" diff --git a/modules/agent/common/state.go b/modules/agent/state/state.go similarity index 65% rename from modules/agent/common/state.go rename to modules/agent/state/state.go index de26a0ee..27507948 100644 --- a/modules/agent/common/state.go +++ b/modules/agent/state/state.go @@ -2,16 +2,17 @@ * Web: https://infinilabs.com * Email: hello#infini.ltd */ -package common +package state import ( "context" "fmt" "github.com/buger/jsonparser" log "github.com/cihub/seelog" + "infini.sh/console/modules/agent/client" + "infini.sh/console/modules/agent/common" + "infini.sh/console/modules/agent/model" "infini.sh/framework/core/agent" - "infini.sh/framework/core/elastic" - "infini.sh/framework/core/event" "infini.sh/framework/core/host" "infini.sh/framework/core/kv" "infini.sh/framework/core/orm" @@ -23,17 +24,39 @@ import ( "time" ) -const ( - StatusOnline string = "online" - StatusOffline = "offline" -) +var stateManager IStateManager + +func GetStateManager() IStateManager { + if stateManager == nil { + panic("agent state manager not init") + } + return stateManager +} + +func RegisterStateManager(sm IStateManager) { + stateManager = sm +} + +func IsEnabled() bool { + return stateManager != nil +} + +type IStateManager interface { + GetAgent(ID string) (*agent.Instance, error) + UpdateAgent(inst *agent.Instance, syncToES bool) (*agent.Instance, error) + GetTaskAgent(clusterID string) (*agent.Instance, error) + DeleteAgent(agentID string) error + LoopState() + Stop() + GetAgentClient() client.ClientAPI +} type StateManager struct { TTL time.Duration // kv ttl KVKey string stopC chan struct{} stopCompleteC chan struct{} - agentClient *Client + agentClient *client.Client agentIds map[string]string agentMutex sync.Mutex workerChan chan struct{} @@ -46,7 +69,7 @@ func NewStateManager(TTL time.Duration, kvKey string, agentIds map[string]string KVKey: kvKey, stopC: make(chan struct{}), stopCompleteC: make(chan struct{}), - agentClient: &Client{}, + agentClient: &client.Client{}, agentIds: agentIds, workerChan: make(chan struct{}, runtime.NumCPU()), timestamps: map[string]int64{}, @@ -54,7 +77,7 @@ func NewStateManager(TTL time.Duration, kvKey string, agentIds map[string]string } func (sm *StateManager) checkAgentStatus() { - onlineAgentIDs, err := GetLatestOnlineAgentIDs(nil, int(sm.TTL.Seconds())) + onlineAgentIDs, err := common.GetLatestOnlineAgentIDs(nil, int(sm.TTL.Seconds())) if err != nil { log.Error(err) return @@ -64,32 +87,32 @@ func (sm *StateManager) checkAgentStatus() { for agentID := range onlineAgentIDs { if _, ok := sm.agentIds[agentID]; !ok { log.Infof("status of agent [%s] changed to online", agentID) - sm.agentIds[agentID] = StatusOnline + sm.agentIds[agentID] = model.StatusOnline } } sm.agentMutex.Unlock() for agentID, status := range sm.agentIds { if _, ok := onlineAgentIDs[agentID]; ok { sm.syncSettings(agentID) - host.UpdateHostAgentStatus(agentID, StatusOnline) - if status == StatusOnline { + host.UpdateHostAgentStatus(agentID, model.StatusOnline) + if status == model.StatusOnline { continue } // status change to online - sm.agentIds[agentID] = StatusOnline + sm.agentIds[agentID] = model.StatusOnline log.Infof("status of agent [%s] changed to online", agentID) //set timestamp equals 0 to create pipeline sm.timestamps[agentID] = 0 continue }else{ // already offline - if status == StatusOffline { + if status == model.StatusOffline { continue } } // status change to offline // todo validate whether agent is offline - sm.agentIds[agentID] = StatusOffline + sm.agentIds[agentID] = model.StatusOffline sm.workerChan <- struct{}{} go func(agentID string) { defer func() { @@ -104,7 +127,7 @@ func (sm *StateManager) checkAgentStatus() { log.Error(err) return } - ag.Status = StatusOffline + ag.Status = model.StatusOffline log.Infof("agent [%s] is offline", ag.Endpoint) _, err = sm.UpdateAgent(ag, true) if err != nil { @@ -112,7 +135,7 @@ func (sm *StateManager) checkAgentStatus() { return } //update host agent status - host.UpdateHostAgentStatus(ag.ID, StatusOffline) + host.UpdateHostAgentStatus(ag.ID, model.StatusOffline) }(agentID) } @@ -120,7 +143,7 @@ func (sm *StateManager) checkAgentStatus() { func (sm *StateManager) syncSettings(agentID string) { newTimestamp := time.Now().UnixMilli() - settings, err := GetAgentSettings(agentID, sm.timestamps[agentID]) + settings, err := common.GetAgentSettings(agentID, sm.timestamps[agentID]) if err != nil { log.Errorf("query agent settings error: %v", err) return @@ -129,7 +152,7 @@ func (sm *StateManager) syncSettings(agentID string) { log.Debugf("got no settings of agent [%s]", agentID) return } - parseResult, err := ParseAgentSettings(settings) + parseResult, err := common.ParseAgentSettings(settings) if err != nil { log.Errorf("parse agent settings error: %v", err) return @@ -168,7 +191,7 @@ func (sm *StateManager) syncSettings(agentID string) { } func (sm *StateManager) getAvailableAgent(clusterID string) (*agent.Instance, error) { - agents, err := LoadAgentsFromES(clusterID) + agents, err := common.LoadAgentsFromES(clusterID) if err != nil { return nil, err } @@ -261,112 +284,6 @@ func (sm *StateManager) DeleteAgent(agentID string) error { return kv.DeleteKey(sm.KVKey, []byte(agentID)) } -func (sm *StateManager) GetAgentClient() ClientAPI { +func (sm *StateManager) GetAgentClient() client.ClientAPI { return sm.agentClient } - -func LoadAgentsFromES(clusterID string) ([]agent.Instance, error) { - q := orm.Query{ - Size: 1000, - } - if clusterID != "" { - q.Conds = orm.And(orm.Eq("id", clusterID)) - } - err, result := orm.Search(agent.Instance{}, &q) - if err != nil { - return nil, fmt.Errorf("query agent error: %w", err) - } - - if len(result.Result) > 0 { - var agents = make([]agent.Instance, 0, len(result.Result)) - for _, row := range result.Result { - ag := agent.Instance{} - bytes := util.MustToJSONBytes(row) - err = util.FromJSONBytes(bytes, &ag) - if err != nil { - log.Errorf("got unexpected agent: %s, error: %v", string(bytes), err) - continue - } - agents = append(agents, ag) - } - return agents, nil - } - return nil, nil -} - -func GetLatestOnlineAgentIDs(agentIds []string, lastSeconds int) (map[string]struct{}, error) { - q := orm.Query{ - WildcardIndex: true, - } - mustQ := []util.MapStr{ - { - "term": util.MapStr{ - "metadata.name": util.MapStr{ - "value": "agent", - }, - }, - }, - { - "term": util.MapStr{ - "metadata.category": util.MapStr{ - "value": "instance", - }, - }, - }, - } - if len(agentIds) > 0 { - mustQ = append(mustQ, util.MapStr{ - "terms": util.MapStr{ - "agent.id": agentIds, - }, - }) - } - queryDSL := util.MapStr{ - "_source": "agent.id", - "sort": []util.MapStr{ - { - "timestamp": util.MapStr{ - "order": "desc", - }, - }, - }, - "collapse": util.MapStr{ - "field": "agent.id", - }, - "query": util.MapStr{ - "bool": util.MapStr{ - "filter": []util.MapStr{ - { - "range": util.MapStr{ - "timestamp": util.MapStr{ - "gte": fmt.Sprintf("now-%ds", lastSeconds), - }, - }, - }, - }, - "must": mustQ, - }, - }, - } - q.RawQuery = util.MustToJSONBytes(queryDSL) - err, result := orm.Search(event.Event{}, &q) - if err != nil { - return nil, fmt.Errorf("query agent instance metric error: %w", err) - } - agentIDs := map[string]struct{}{} - if len(result.Result) > 0 { - searchRes := elastic.SearchResponse{} - err = util.FromJSONBytes(result.Raw, &searchRes) - if err != nil { - return nil, err - } - agentIDKeyPath := []string{"agent", "id"} - for _, hit := range searchRes.Hits.Hits { - agentID, _ := util.GetMapValueByKeys(agentIDKeyPath, hit.Source) - if v, ok := agentID.(string); ok { - agentIDs[v] = struct{}{} - } - } - } - return agentIDs, nil -} \ No newline at end of file