From a039d33d8d09f43b9f8e4d56f22a7bcbbf40dac9 Mon Sep 17 00:00:00 2001 From: liugq Date: Mon, 5 Jun 2023 15:11:44 +0800 Subject: [PATCH] save sync agent settings timestamp to kv --- modules/agent/state/state.go | 29 +++++++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/modules/agent/state/state.go b/modules/agent/state/state.go index 9c3e4279..c2db3916 100644 --- a/modules/agent/state/state.go +++ b/modules/agent/state/state.go @@ -21,6 +21,7 @@ import ( "infini.sh/framework/modules/elastic" "runtime" "runtime/debug" + "strconv" "sync" "time" ) @@ -61,7 +62,6 @@ type StateManager struct { agentIds map[string]string agentMutex sync.Mutex workerChan chan struct{} - timestamps map[string]int64 } func NewStateManager(TTL time.Duration, kvKey string, agentIds map[string]string, agentClient *client.Client) *StateManager { @@ -73,7 +73,6 @@ func NewStateManager(TTL time.Duration, kvKey string, agentIds map[string]string agentClient: agentClient, agentIds: agentIds, workerChan: make(chan struct{}, runtime.NumCPU()), - timestamps: map[string]int64{}, } } @@ -141,6 +140,22 @@ func (sm *StateManager) checkAgentStatus() { } } +const KVSyncSettings = "agent_sync_settings" +func (sm *StateManager) getLastSyncSettingsTimestamp(agentID string) int64{ + vbytes, err := kv.GetValue(KVSyncSettings, []byte(agentID)) + if err != nil { + log.Error(err) + } + if vbytes == nil { + return 0 + } + t, err := strconv.ParseInt(string(vbytes), 10, 64) + if err != nil { + log.Error(err) + } + + return t +} func (sm *StateManager) syncSettings(agentID string) { ag, err := sm.GetAgent(agentID) @@ -151,7 +166,8 @@ func (sm *StateManager) syncSettings(agentID string) { return } newTimestamp := time.Now().UnixMilli() - settings, err := common.GetAgentSettings(agentID, sm.timestamps[agentID]) + lastSyncTimestamp := sm.getLastSyncSettingsTimestamp(agentID) + settings, err := common.GetAgentSettings(agentID, lastSyncTimestamp) if err != nil { log.Errorf("query agent settings error: %v", err) return @@ -201,7 +217,12 @@ func (sm *StateManager) syncSettings(agentID string) { return } err = agClient.SaveDynamicConfig(context.Background(), ag.GetEndpoint(), "dynamic_task", string(cfgBytes)) - sm.timestamps[agentID] = newTimestamp + + newTimestampStr := strconv.FormatInt(newTimestamp, 10) + err = kv.AddValue(KVSyncSettings, []byte(agentID), []byte(newTimestampStr)) + if err != nil { + log.Error(err) + } } func (sm *StateManager) getAvailableAgent(clusterID string) (*agent.Instance, error) {