diff --git a/main.go b/main.go index 2d382650..822e6bf6 100644 --- a/main.go +++ b/main.go @@ -67,9 +67,9 @@ func main() { modules = append(modules, &redis.RedisModule{}) modules = append(modules, &pipeline.PipeModule{}) modules = append(modules, &task.TaskModule{}) - modules = append(modules, &agent.AgentModule{}) modules = append(modules, &metrics.MetricsModule{}) modules = append(modules, &security.Module{}) + modules = append(modules, &agent.AgentModule{}) uiModule := &ui.UIModule{} diff --git a/modules/agent/agent.go b/modules/agent/agent.go index 9e8c3ee5..1a9241e4 100644 --- a/modules/agent/agent.go +++ b/modules/agent/agent.go @@ -5,6 +5,7 @@ package agent import ( + "fmt" log "github.com/cihub/seelog" "infini.sh/console/modules/agent/api" "infini.sh/console/modules/agent/client" @@ -12,9 +13,13 @@ import ( "infini.sh/console/modules/agent/model" "infini.sh/console/modules/agent/state" "infini.sh/framework/core/agent" + "infini.sh/framework/core/credential" + "infini.sh/framework/core/elastic" "infini.sh/framework/core/env" "infini.sh/framework/core/host" + "infini.sh/framework/core/kv" "infini.sh/framework/core/orm" + "infini.sh/framework/core/util" "time" ) @@ -73,6 +78,53 @@ func (module *AgentModule) Start() error { agentIds[ag.ID] = "online" } } + credential.RegisterChangeEvent(func(cred *credential.Credential) { + var effectsClusterIDs []string + elastic.WalkConfigs(func(key, value interface{}) bool { + if cfg, ok := value.(*elastic.ElasticsearchConfig); ok { + if cfg.CredentialID == cred.ID { + effectsClusterIDs = append(effectsClusterIDs, cfg.ID) + } + } + return true + }) + if len(effectsClusterIDs) > 0 { + queryDsl := util.MapStr{ + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "terms": util.MapStr{ + "metadata.labels.cluster_id": effectsClusterIDs, + }, + }, + }, + }, + }, + "script": util.MapStr{ + "source": fmt.Sprintf("ctx._source['updated'] = '%s'", time.Now().Format(time.RFC3339Nano)), + }, + } + err = orm.UpdateBy(agent.Setting{}, util.MustToJSONBytes(queryDsl)) + if err != nil { + log.Error(err) + } + } + //check ingest cluster credential + if module.AgentConfig.Setup != nil && module.AgentConfig.Setup.IngestClusterCredentialID == cred.ID { + agents, err = common.LoadAgentsFromES("") + if err != nil { + log.Error(err) + return + } + for _, ag := range agents { + err = kv.AddValue(model.KVAgentIngestConfigChanged, []byte(ag.ID), []byte("1")) + if err != nil { + log.Error(err) + } + } + } + }) sm := state.NewStateManager(time.Second*30, "agent_state", agentIds, agClient) state.RegisterStateManager(sm) diff --git a/modules/agent/api/instance.go b/modules/agent/api/instance.go index f6eff3e7..0495125c 100644 --- a/modules/agent/api/instance.go +++ b/modules/agent/api/instance.go @@ -112,7 +112,7 @@ func (h *APIHandler) createInstance(w http.ResponseWriter, req *http.Request, ps if err != nil { log.Error(err) } - err = pushIngestConfigToAgent(obj) + err = client.GetClient().SaveIngestConfig(context.Background(), obj.GetEndpoint()) if err != nil { log.Error(err) } @@ -121,23 +121,6 @@ func (h *APIHandler) createInstance(w http.ResponseWriter, req *http.Request, ps } -func pushIngestConfigToAgent(inst *agent.Instance) error{ - ingestCfg, basicAuth, err := common2.GetAgentIngestConfig() - if err != nil { - return err - } - if basicAuth != nil && basicAuth.Password != "" { - err = client.GetClient().SetKeystoreValue(context.Background(), inst.GetEndpoint(), "ingest_cluster_password", basicAuth.Password) - if err != nil { - return fmt.Errorf("set keystore value to agent error: %w", err) - } - } - err = client.GetClient().SaveDynamicConfig(context.Background(), inst.GetEndpoint(), "ingest", ingestCfg ) - if err != nil { - fmt.Errorf("save dynamic config to agent error: %w", err) - } - return nil -} func (h *APIHandler) getInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { id := ps.MustGetParameter("instance_id") diff --git a/modules/agent/client/client.go b/modules/agent/client/client.go index 9f35870b..46f80e42 100644 --- a/modules/agent/client/client.go +++ b/modules/agent/client/client.go @@ -7,6 +7,7 @@ package client import ( "context" "fmt" + "infini.sh/console/modules/agent/common" "infini.sh/framework/core/agent" "infini.sh/framework/core/elastic" "infini.sh/framework/core/host" @@ -39,6 +40,7 @@ type ClientAPI interface { DeletePipeline(ctx context.Context, agentBaseURL, pipelineID string) error SetKeystoreValue(ctx context.Context, agentBaseURL string, key, value string) error SaveDynamicConfig(ctx context.Context, agentBaseURL string, name, content string) error + SaveIngestConfig(ctx context.Context, agentBaseURL string) error } type Client struct { @@ -246,6 +248,24 @@ func (client *Client) SaveDynamicConfig(ctx context.Context, agentBaseURL string return client.doRequest(req, nil) } +func (client *Client) SaveIngestConfig(ctx context.Context, agentBaseURL string) error { + ingestCfg, basicAuth, err := common.GetAgentIngestConfig() + if err != nil { + return err + } + if basicAuth != nil && basicAuth.Password != "" { + err = client.SetKeystoreValue(ctx, agentBaseURL, "ingest_cluster_password", basicAuth.Password) + if err != nil { + return fmt.Errorf("set keystore value to agent error: %w", err) + } + } + err = client.SaveDynamicConfig(context.Background(), agentBaseURL, "ingest", ingestCfg ) + if err != nil { + return fmt.Errorf("save dynamic config to agent error: %w", err) + } + return nil +} + func (client *Client) doRequest(req *util.Request, respObj interface{}) error { return client.Executor.DoRequest(req, respObj) diff --git a/modules/agent/model/const.go b/modules/agent/model/const.go index 6115f841..3cea10ea 100644 --- a/modules/agent/model/const.go +++ b/modules/agent/model/const.go @@ -8,3 +8,8 @@ const ( StatusOnline string = "online" StatusOffline = "offline" ) + +const ( + KVAgentIngestConfigChanged = "agent_ingest_config_changed" + KVSyncDynamicTaskSettings = "agent_sync_dynamic_task_settings" +) \ No newline at end of file diff --git a/modules/agent/state/state.go b/modules/agent/state/state.go index c2db3916..7b65effd 100644 --- a/modules/agent/state/state.go +++ b/modules/agent/state/state.go @@ -92,25 +92,6 @@ func (sm *StateManager) checkAgentStatus() { } sm.agentMutex.Unlock() for agentID, status := range sm.agentIds { - if _, ok := onlineAgentIDs[agentID]; ok { - sm.syncSettings(agentID) - host.UpdateHostAgentStatus(agentID, model.StatusOnline) - if status == model.StatusOnline { - continue - } - // status change to online - sm.agentIds[agentID] = model.StatusOnline - log.Infof("status of agent [%s] changed to online", agentID) - continue - }else{ - // already offline - if status == model.StatusOffline { - continue - } - } - // status change to offline - // todo validate whether agent is offline - sm.agentIds[agentID] = model.StatusOffline sm.workerChan <- struct{}{} go func(agentID string) { defer func() { @@ -120,6 +101,25 @@ func (sm *StateManager) checkAgentStatus() { } <-sm.workerChan }() + sm.syncSettings(agentID) + sm.syncIngestSettings(agentID) + if _, ok := onlineAgentIDs[agentID]; ok { + host.UpdateHostAgentStatus(agentID, model.StatusOnline) + if status == model.StatusOnline { + return + } + // status change to online + sm.agentIds[agentID] = model.StatusOnline + log.Infof("status of agent [%s] changed to online", agentID) + return + }else{ + // already offline + if status == model.StatusOffline { + return + } + } + // status change to offline + sm.agentIds[agentID] = model.StatusOffline ag, err := sm.GetAgent(agentID) if err != nil { if err != elastic.ErrNotFound { @@ -140,9 +140,8 @@ func (sm *StateManager) checkAgentStatus() { } } -const KVSyncSettings = "agent_sync_settings" func (sm *StateManager) getLastSyncSettingsTimestamp(agentID string) int64{ - vbytes, err := kv.GetValue(KVSyncSettings, []byte(agentID)) + vbytes, err := kv.GetValue(model.KVSyncDynamicTaskSettings, []byte(agentID)) if err != nil { log.Error(err) } @@ -219,11 +218,31 @@ func (sm *StateManager) syncSettings(agentID string) { err = agClient.SaveDynamicConfig(context.Background(), ag.GetEndpoint(), "dynamic_task", string(cfgBytes)) newTimestampStr := strconv.FormatInt(newTimestamp, 10) - err = kv.AddValue(KVSyncSettings, []byte(agentID), []byte(newTimestampStr)) + err = kv.AddValue(model.KVSyncDynamicTaskSettings, []byte(agentID), []byte(newTimestampStr)) if err != nil { log.Error(err) } } +func (sm *StateManager) syncIngestSettings(agentID string) { + v, err := kv.GetValue(model.KVAgentIngestConfigChanged, []byte(agentID)) + if err != nil { + log.Error(err) + } + if string(v) != "1" { + return + } + ag, err := sm.GetAgent(agentID) + if err != nil { + if err != elastic.ErrNotFound { + log.Errorf("get agent error: %v", err) + } + return + } + err = sm.agentClient.SaveIngestConfig(context.Background(), ag.GetEndpoint()) + if err == nil { + kv.AddValue(model.KVAgentIngestConfigChanged,[]byte(agentID), []byte("0")) + } +} func (sm *StateManager) getAvailableAgent(clusterID string) (*agent.Instance, error) { agents, err := common.LoadAgentsFromES(clusterID)