Merge pull request 'refresh cluster settings to agent after credential updated' (#120) from refresh_credential into master

This commit is contained in:
silenceqi 2023-06-06 19:14:17 +08:00
commit 623ca27a4e
6 changed files with 120 additions and 41 deletions

View File

@ -67,9 +67,9 @@ func main() {
modules = append(modules, &redis.RedisModule{})
modules = append(modules, &pipeline.PipeModule{})
modules = append(modules, &task.TaskModule{})
modules = append(modules, &agent.AgentModule{})
modules = append(modules, &metrics.MetricsModule{})
modules = append(modules, &security.Module{})
modules = append(modules, &agent.AgentModule{})
uiModule := &ui.UIModule{}

View File

@ -5,6 +5,7 @@
package agent
import (
"fmt"
log "github.com/cihub/seelog"
"infini.sh/console/modules/agent/api"
"infini.sh/console/modules/agent/client"
@ -12,9 +13,13 @@ import (
"infini.sh/console/modules/agent/model"
"infini.sh/console/modules/agent/state"
"infini.sh/framework/core/agent"
"infini.sh/framework/core/credential"
"infini.sh/framework/core/elastic"
"infini.sh/framework/core/env"
"infini.sh/framework/core/host"
"infini.sh/framework/core/kv"
"infini.sh/framework/core/orm"
"infini.sh/framework/core/util"
"time"
)
@ -73,6 +78,53 @@ func (module *AgentModule) Start() error {
agentIds[ag.ID] = "online"
}
}
credential.RegisterChangeEvent(func(cred *credential.Credential) {
var effectsClusterIDs []string
elastic.WalkConfigs(func(key, value interface{}) bool {
if cfg, ok := value.(*elastic.ElasticsearchConfig); ok {
if cfg.CredentialID == cred.ID {
effectsClusterIDs = append(effectsClusterIDs, cfg.ID)
}
}
return true
})
if len(effectsClusterIDs) > 0 {
queryDsl := util.MapStr{
"query": util.MapStr{
"bool": util.MapStr{
"must": []util.MapStr{
{
"terms": util.MapStr{
"metadata.labels.cluster_id": effectsClusterIDs,
},
},
},
},
},
"script": util.MapStr{
"source": fmt.Sprintf("ctx._source['updated'] = '%s'", time.Now().Format(time.RFC3339Nano)),
},
}
err = orm.UpdateBy(agent.Setting{}, util.MustToJSONBytes(queryDsl))
if err != nil {
log.Error(err)
}
}
//check ingest cluster credential
if module.AgentConfig.Setup != nil && module.AgentConfig.Setup.IngestClusterCredentialID == cred.ID {
agents, err = common.LoadAgentsFromES("")
if err != nil {
log.Error(err)
return
}
for _, ag := range agents {
err = kv.AddValue(model.KVAgentIngestConfigChanged, []byte(ag.ID), []byte("1"))
if err != nil {
log.Error(err)
}
}
}
})
sm := state.NewStateManager(time.Second*30, "agent_state", agentIds, agClient)
state.RegisterStateManager(sm)

View File

@ -112,7 +112,7 @@ func (h *APIHandler) createInstance(w http.ResponseWriter, req *http.Request, ps
if err != nil {
log.Error(err)
}
err = pushIngestConfigToAgent(obj)
err = client.GetClient().SaveIngestConfig(context.Background(), obj.GetEndpoint())
if err != nil {
log.Error(err)
}
@ -121,23 +121,6 @@ func (h *APIHandler) createInstance(w http.ResponseWriter, req *http.Request, ps
}
func pushIngestConfigToAgent(inst *agent.Instance) error{
ingestCfg, basicAuth, err := common2.GetAgentIngestConfig()
if err != nil {
return err
}
if basicAuth != nil && basicAuth.Password != "" {
err = client.GetClient().SetKeystoreValue(context.Background(), inst.GetEndpoint(), "ingest_cluster_password", basicAuth.Password)
if err != nil {
return fmt.Errorf("set keystore value to agent error: %w", err)
}
}
err = client.GetClient().SaveDynamicConfig(context.Background(), inst.GetEndpoint(), "ingest", ingestCfg )
if err != nil {
fmt.Errorf("save dynamic config to agent error: %w", err)
}
return nil
}
func (h *APIHandler) getInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
id := ps.MustGetParameter("instance_id")

View File

@ -7,6 +7,7 @@ package client
import (
"context"
"fmt"
"infini.sh/console/modules/agent/common"
"infini.sh/framework/core/agent"
"infini.sh/framework/core/elastic"
"infini.sh/framework/core/host"
@ -39,6 +40,7 @@ type ClientAPI interface {
DeletePipeline(ctx context.Context, agentBaseURL, pipelineID string) error
SetKeystoreValue(ctx context.Context, agentBaseURL string, key, value string) error
SaveDynamicConfig(ctx context.Context, agentBaseURL string, name, content string) error
SaveIngestConfig(ctx context.Context, agentBaseURL string) error
}
type Client struct {
@ -246,6 +248,24 @@ func (client *Client) SaveDynamicConfig(ctx context.Context, agentBaseURL string
return client.doRequest(req, nil)
}
func (client *Client) SaveIngestConfig(ctx context.Context, agentBaseURL string) error {
ingestCfg, basicAuth, err := common.GetAgentIngestConfig()
if err != nil {
return err
}
if basicAuth != nil && basicAuth.Password != "" {
err = client.SetKeystoreValue(ctx, agentBaseURL, "ingest_cluster_password", basicAuth.Password)
if err != nil {
return fmt.Errorf("set keystore value to agent error: %w", err)
}
}
err = client.SaveDynamicConfig(context.Background(), agentBaseURL, "ingest", ingestCfg )
if err != nil {
return fmt.Errorf("save dynamic config to agent error: %w", err)
}
return nil
}
func (client *Client) doRequest(req *util.Request, respObj interface{}) error {
return client.Executor.DoRequest(req, respObj)

View File

@ -8,3 +8,8 @@ const (
StatusOnline string = "online"
StatusOffline = "offline"
)
const (
KVAgentIngestConfigChanged = "agent_ingest_config_changed"
KVSyncDynamicTaskSettings = "agent_sync_dynamic_task_settings"
)

View File

@ -92,25 +92,6 @@ func (sm *StateManager) checkAgentStatus() {
}
sm.agentMutex.Unlock()
for agentID, status := range sm.agentIds {
if _, ok := onlineAgentIDs[agentID]; ok {
sm.syncSettings(agentID)
host.UpdateHostAgentStatus(agentID, model.StatusOnline)
if status == model.StatusOnline {
continue
}
// status change to online
sm.agentIds[agentID] = model.StatusOnline
log.Infof("status of agent [%s] changed to online", agentID)
continue
}else{
// already offline
if status == model.StatusOffline {
continue
}
}
// status change to offline
// todo validate whether agent is offline
sm.agentIds[agentID] = model.StatusOffline
sm.workerChan <- struct{}{}
go func(agentID string) {
defer func() {
@ -120,6 +101,25 @@ func (sm *StateManager) checkAgentStatus() {
}
<-sm.workerChan
}()
sm.syncSettings(agentID)
sm.syncIngestSettings(agentID)
if _, ok := onlineAgentIDs[agentID]; ok {
host.UpdateHostAgentStatus(agentID, model.StatusOnline)
if status == model.StatusOnline {
return
}
// status change to online
sm.agentIds[agentID] = model.StatusOnline
log.Infof("status of agent [%s] changed to online", agentID)
return
}else{
// already offline
if status == model.StatusOffline {
return
}
}
// status change to offline
sm.agentIds[agentID] = model.StatusOffline
ag, err := sm.GetAgent(agentID)
if err != nil {
if err != elastic.ErrNotFound {
@ -140,9 +140,8 @@ func (sm *StateManager) checkAgentStatus() {
}
}
const KVSyncSettings = "agent_sync_settings"
func (sm *StateManager) getLastSyncSettingsTimestamp(agentID string) int64{
vbytes, err := kv.GetValue(KVSyncSettings, []byte(agentID))
vbytes, err := kv.GetValue(model.KVSyncDynamicTaskSettings, []byte(agentID))
if err != nil {
log.Error(err)
}
@ -219,11 +218,31 @@ func (sm *StateManager) syncSettings(agentID string) {
err = agClient.SaveDynamicConfig(context.Background(), ag.GetEndpoint(), "dynamic_task", string(cfgBytes))
newTimestampStr := strconv.FormatInt(newTimestamp, 10)
err = kv.AddValue(KVSyncSettings, []byte(agentID), []byte(newTimestampStr))
err = kv.AddValue(model.KVSyncDynamicTaskSettings, []byte(agentID), []byte(newTimestampStr))
if err != nil {
log.Error(err)
}
}
func (sm *StateManager) syncIngestSettings(agentID string) {
v, err := kv.GetValue(model.KVAgentIngestConfigChanged, []byte(agentID))
if err != nil {
log.Error(err)
}
if string(v) != "1" {
return
}
ag, err := sm.GetAgent(agentID)
if err != nil {
if err != elastic.ErrNotFound {
log.Errorf("get agent error: %v", err)
}
return
}
err = sm.agentClient.SaveIngestConfig(context.Background(), ag.GetEndpoint())
if err == nil {
kv.AddValue(model.KVAgentIngestConfigChanged,[]byte(agentID), []byte("0"))
}
}
func (sm *StateManager) getAvailableAgent(clusterID string) (*agent.Instance, error) {
agents, err := common.LoadAgentsFromES(clusterID)