save sync agent settings timestamp to kv

This commit is contained in:
liugq 2023-06-05 15:11:44 +08:00
parent ffde139506
commit a039d33d8d
1 changed files with 25 additions and 4 deletions

View File

@ -21,6 +21,7 @@ import (
"infini.sh/framework/modules/elastic" "infini.sh/framework/modules/elastic"
"runtime" "runtime"
"runtime/debug" "runtime/debug"
"strconv"
"sync" "sync"
"time" "time"
) )
@ -61,7 +62,6 @@ type StateManager struct {
agentIds map[string]string agentIds map[string]string
agentMutex sync.Mutex agentMutex sync.Mutex
workerChan chan struct{} workerChan chan struct{}
timestamps map[string]int64
} }
func NewStateManager(TTL time.Duration, kvKey string, agentIds map[string]string, agentClient *client.Client) *StateManager { func NewStateManager(TTL time.Duration, kvKey string, agentIds map[string]string, agentClient *client.Client) *StateManager {
@ -73,7 +73,6 @@ func NewStateManager(TTL time.Duration, kvKey string, agentIds map[string]string
agentClient: agentClient, agentClient: agentClient,
agentIds: agentIds, agentIds: agentIds,
workerChan: make(chan struct{}, runtime.NumCPU()), workerChan: make(chan struct{}, runtime.NumCPU()),
timestamps: map[string]int64{},
} }
} }
@ -141,6 +140,22 @@ func (sm *StateManager) checkAgentStatus() {
} }
} }
const KVSyncSettings = "agent_sync_settings"
func (sm *StateManager) getLastSyncSettingsTimestamp(agentID string) int64{
vbytes, err := kv.GetValue(KVSyncSettings, []byte(agentID))
if err != nil {
log.Error(err)
}
if vbytes == nil {
return 0
}
t, err := strconv.ParseInt(string(vbytes), 10, 64)
if err != nil {
log.Error(err)
}
return t
}
func (sm *StateManager) syncSettings(agentID string) { func (sm *StateManager) syncSettings(agentID string) {
ag, err := sm.GetAgent(agentID) ag, err := sm.GetAgent(agentID)
@ -151,7 +166,8 @@ func (sm *StateManager) syncSettings(agentID string) {
return return
} }
newTimestamp := time.Now().UnixMilli() newTimestamp := time.Now().UnixMilli()
settings, err := common.GetAgentSettings(agentID, sm.timestamps[agentID]) lastSyncTimestamp := sm.getLastSyncSettingsTimestamp(agentID)
settings, err := common.GetAgentSettings(agentID, lastSyncTimestamp)
if err != nil { if err != nil {
log.Errorf("query agent settings error: %v", err) log.Errorf("query agent settings error: %v", err)
return return
@ -201,7 +217,12 @@ func (sm *StateManager) syncSettings(agentID string) {
return return
} }
err = agClient.SaveDynamicConfig(context.Background(), ag.GetEndpoint(), "dynamic_task", string(cfgBytes)) err = agClient.SaveDynamicConfig(context.Background(), ag.GetEndpoint(), "dynamic_task", string(cfgBytes))
sm.timestamps[agentID] = newTimestamp
newTimestampStr := strconv.FormatInt(newTimestamp, 10)
err = kv.AddValue(KVSyncSettings, []byte(agentID), []byte(newTimestampStr))
if err != nil {
log.Error(err)
}
} }
func (sm *StateManager) getAvailableAgent(clusterID string) (*agent.Instance, error) { func (sm *StateManager) getAvailableAgent(clusterID string) (*agent.Instance, error) {