From 57d5dbc66243a04bd3670751f6889abe9bb5995d Mon Sep 17 00:00:00 2001 From: liugq Date: Wed, 27 Apr 2022 11:23:07 +0800 Subject: [PATCH] read last term start time from kv --- service/alerting/elasticsearch/engine.go | 50 +++++++++++++++++++----- 1 file changed, 40 insertions(+), 10 deletions(-) diff --git a/service/alerting/elasticsearch/engine.go b/service/alerting/elasticsearch/engine.go index 56c331a1..e38028dd 100644 --- a/service/alerting/elasticsearch/engine.go +++ b/service/alerting/elasticsearch/engine.go @@ -555,6 +555,8 @@ func (engine *Engine) Do(rule *alerting.Rule) error { }else{ if lastAlertItem.State == "" || lastAlertItem.State == alerting.AlertStateNormal { rule.LastTermStartTime = time.Now() + strTime := rule.LastTermStartTime.UTC().Format(time.RFC3339) + kv.AddValue(alerting2.KVLastTermStartTime, []byte(rule.ID), []byte(strTime)) } log.Debugf("check condition result of rule %s is %v", conditionResults, rule.ID ) var ( @@ -579,16 +581,12 @@ func (engine *Engine) Do(rule *alerting.Rule) error { return err } if rule.LastNotificationTime.IsZero() { - timeBytes, err := kv.GetValue(alerting2.KVLastNotificationTime, []byte(rule.ID)) + tm, err := readTimeFromKV(alerting2.KVLastNotificationTime, []byte(rule.ID)) if err != nil { return fmt.Errorf("get last notification time from kv error: %w", err) } - timeStr := string(timeBytes) - if timeStr != ""{ - rule.LastNotificationTime, err = time.ParseInLocation(time.RFC3339, string(timeBytes), time.UTC) - if err != nil { - return fmt.Errorf("parse last notification time from kv error: %w", err) - } + if !tm.IsZero(){ + rule.LastNotificationTime = tm } } period := time.Now().Sub(rule.LastNotificationTime.Local()) @@ -603,7 +601,6 @@ func (engine *Engine) Do(rule *alerting.Rule) error { strTime := time.Now().UTC().Format(time.RFC3339) kv.AddValue(alerting2.KVLastNotificationTime, []byte(rule.ID), []byte(strTime)) alertItem.IsNotified = true - //kv.AddValue(alerting2.KVLastNotificationTime, []byte(rule.ID), []byte(rule.LastNotificationTime.Format(time.RFC3339))) } isAck, err := hasAcknowledgedRule(rule.ID, rule.LastTermStartTime) if err != nil { @@ -616,13 +613,33 @@ func (engine *Engine) Do(rule *alerting.Rule) error { return err } //todo init last term start time when create task (by last alert item of state normal) - if time.Now().Sub(rule.LastTermStartTime) > throttlePeriod { - if time.Now().Sub(rule.LastEscalationTime) > periodDuration { + if rule.LastTermStartTime.IsZero(){ + tm, err := readTimeFromKV(alerting2.KVLastTermStartTime, []byte(rule.ID)) + if err != nil { + return fmt.Errorf("get last term start time from kv error: %w", err) + } + if !tm.IsZero(){ + rule.LastTermStartTime = tm + } + } + if time.Now().Sub(rule.LastTermStartTime.Local()) > throttlePeriod { + if rule.LastEscalationTime.IsZero(){ + tm, err := readTimeFromKV(alerting2.KVLastEscalationTime, []byte(rule.ID)) + if err != nil { + return fmt.Errorf("get last escalation time from kv error: %w", err) + } + if !tm.IsZero(){ + rule.LastEscalationTime = tm + } + } + if time.Now().Sub(rule.LastEscalationTime.Local()) > periodDuration { actionResults := performChannels(rule.Channels.Escalation, conditionResults) alertItem.ActionExecutionResults = append(alertItem.ActionExecutionResults, actionResults...) //todo init last escalation time when create task (by last alert item is escalated) rule.LastEscalationTime = time.Now() alertItem.IsEscalated = true + strTime := rule.LastEscalationTime.UTC().Format(time.RFC3339) + kv.AddValue(alerting2.KVLastEscalationTime, []byte(rule.ID), []byte(strTime)) } } @@ -852,4 +869,17 @@ func hasAcknowledgedRule(ruleID string, startTime time.Time) (bool, error){ return false, nil } return true, nil +} + +func readTimeFromKV(bucketKey string, key []byte)(time.Time, error){ + timeBytes, err := kv.GetValue(bucketKey, key) + zeroTime := time.Time{} + if err != nil { + return zeroTime, err + } + timeStr := string(timeBytes) + if timeStr != ""{ + return time.ParseInLocation(time.RFC3339, string(timeBytes), time.UTC) + } + return zeroTime, nil } \ No newline at end of file