From f910203599c0b620b7513e0d801e11cbd5d75b65 Mon Sep 17 00:00:00 2001 From: silenceqi Date: Wed, 12 Feb 2025 17:32:27 +0800 Subject: [PATCH] feat: support alerts based on bucket diff state (#119) * feat: support alerts based on bucket diff status * fix: correct document count calculation * fix: empty param `event_id ` * fix: removing first and last time bucket * fix: removing first and last time bucket * chore: update bucket diff algorithm for improved accuracy * refactor: optimize bucket diff algorithm * feat: trigger bucket diff content change alert using expression * feat: include bucket diff type in alert message API response * chore: update release notes * feat: add alert rule template to detect cluster metrics collection anomaly * chore: update release notes --- config/setup/common/alerting.tpl | 154 ++++++++++++ docs/content.en/docs/release-notes/_index.md | 1 + docs/content.zh/docs/release-notes/_index.md | 2 +- model/alerting/condition.go | 26 ++ model/alerting/metric.go | 9 +- model/alerting/rule.go | 5 +- plugin/api/alerting/message.go | 58 +++-- service/alerting/elasticsearch/engine.go | 235 ++++++++++++++++--- 8 files changed, 429 insertions(+), 61 deletions(-) diff --git a/config/setup/common/alerting.tpl b/config/setup/common/alerting.tpl index 6a1248df..89f48f8b 100644 --- a/config/setup/common/alerting.tpl +++ b/config/setup/common/alerting.tpl @@ -1760,4 +1760,158 @@ POST $[[SETUP_INDEX_PREFIX]]alert-rule/$[[SETUP_DOC_TYPE]]/builtin-cal8n7p7h710d "name": "$[[SETUP_USERNAME]]", "id": "$[[SETUP_USER_ID]]" } +} +POST $[[SETUP_INDEX_PREFIX]]alert-rule/$[[SETUP_DOC_TYPE]]/builtin-cujivv5ath26drn6bcl0 +{ + "id": "builtin-cujivv5ath26drn6bcl0", + "created": "2025-02-08T18:20:44.273334+08:00", + "updated": "2025-02-12T16:31:05.672771+08:00", + "name": "Cluster Metrics Collection Anomaly", + "enabled": true, + "resource": { + "resource_id": "$[[SETUP_RESOURCE_ID]]", + "resource_name": "$[[SETUP_RESOURCE_NAME]]", + "type": "elasticsearch", + "objects": [ + ".infini_metrics*" + ], + "filter": {}, + "raw_filter": { + "bool": { + "must": [ + { + "terms": { + "metadata.name": [ + "cluster_health", + "cluster_stats", + "index_stats", + "node_stats", + "shard_stats" + ] + } + } + ] + } + }, + "time_field": "timestamp", + "context": { + "fields": null + } + }, + "metrics": { + "bucket_size": "1m", + "groups": [ + { + "field": "metadata.labels.cluster_id", + "limit": 5 + }, + { + "field": "metadata.name", + "limit": 5 + } + ], + "formula": "a", + "items": [ + { + "name": "a", + "field": "agent.id", + "statistic": "count" + } + ], + "bucket_label": { + "enabled": false + }, + "expression": "count(agent.id)" + }, + "bucket_conditions": { + "operator": "any", + "items": [ + { + "minimum_period_match": 1, + "operator": "lt", + "values": [ + "0" + ], + "priority": "critical", + "type": "content", + "bucket_count": 10 + } + ] + }, + "notification_config": { + "enabled": true, + "title": "🔥 [{{.rule_name}}] Alerting", + "message": "{{range .results}}\n{{$cn := lookup \"category=metadata, object=cluster, property=name, default=N/A\" (index .group_values 0) }}\n{{$cu := printf \"%s/#/cluster/monitor/elasticsearch/%s\" $.env.INFINI_CONSOLE_ENDPOINT (index .group_values 0)}}\nCluster [[{{$cn}}]({{$cu}}?_g=%7B%22timeRange%22:%7B%22min%22:%22{{$.min}}%22%2C%22max%22:%22{{$.max}}%22%7D%7D)] ({{index .group_values 1}}) metrics has dropped at {{.issue_timestamp | datetime}};\n{{end}}", + "normal": [ + { + "id": "cgnb2nt3q95nmusjl65g", + "enabled": true + }, + { + "id": "cgiospt3q95q49k3u00g", + "enabled": true + }, + { + "id": "cj865st3q95rega919ig", + "enabled": true + }, + { + "id": "cgnb2r53q95nmusjl6vg", + "enabled": true + }, + { + "id": "ch1os6t3q95lk6lepkq0", + "enabled": true + }, + { + "id": "cgnb2kt3q95nmusjl64g", + "enabled": true + } + ], + "throttle_period": "6h", + "accept_time_range": { + "start": "00:00", + "end": "23:59" + } + }, + "category": "Platform", + "recovery_notification_config": { + "enabled": true, + "title": "🌈 [{{.rule_name}}] Resolved", + "message": "EventID: {{.event_id}} \nTarget: {{.resource_name}}-{{.objects}} \nTriggerAt: {{.trigger_at | datetime}} \nResolveAt: {{.timestamp | datetime}} \nDuration: {{.duration}} ", + "normal": [ + { + "id": "cj8bq8d3q95ogankugqg", + "enabled": true + }, + { + "id": "cj8ctat3q95l9ebbntlg", + "enabled": true + }, + { + "id": "cj8atf53q95lhahebg8g", + "enabled": true + }, + { + "id": "cj8e9s53q95gsdbb054g", + "enabled": true + }, + { + "id": "cj8e9gt3q95gsdbb0170", + "enabled": true + }, + { + "id": "cj86l0l3q95rrpfea6ug", + "enabled": true + } + ], + "event_enabled": true + }, + "schedule": { + "interval": "1m" + }, + "creator": { + "name": "$[[SETUP_USERNAME]]", + "id": "$[[SETUP_USER_ID]]" + } } \ No newline at end of file diff --git a/docs/content.en/docs/release-notes/_index.md b/docs/content.en/docs/release-notes/_index.md index 125342e6..f341a52a 100644 --- a/docs/content.en/docs/release-notes/_index.md +++ b/docs/content.en/docs/release-notes/_index.md @@ -12,6 +12,7 @@ Information about release notes of INFINI Console is provided here. ### Breaking changes ### Features +- Support alerts based on bucket diff state (#119) ### Bug fix diff --git a/docs/content.zh/docs/release-notes/_index.md b/docs/content.zh/docs/release-notes/_index.md index 8e892a6b..b1647f61 100644 --- a/docs/content.zh/docs/release-notes/_index.md +++ b/docs/content.zh/docs/release-notes/_index.md @@ -12,7 +12,7 @@ title: "版本历史" ### Breaking changes ### Features - +- 告警功能支持根据桶之间文档数差值和内容差异告警 (#119) ### Bug fix ### Improvements diff --git a/model/alerting/condition.go b/model/alerting/condition.go index 2e48e7ae..c54014b0 100644 --- a/model/alerting/condition.go +++ b/model/alerting/condition.go @@ -44,6 +44,28 @@ func (cond *Condition) GetMinimumPeriodMatch() int { return minPeriodMatch } +func (cond *Condition) GetMaxBucketCount() int { + var bucketCount = 0 + for _, citem := range cond.Items { + if citem.BucketCount > bucketCount { + bucketCount = citem.BucketCount + } + } + return bucketCount +} + +// BucketDiffType represents the type of bucket difference +type BucketDiffType string + +// Constants defining possible bucket difference types +const ( + // BucketDiffTypeSize indicates the difference in bucket size + BucketDiffTypeSize BucketDiffType = "size" + + // BucketDiffTypeContent indicates the difference in bucket content + BucketDiffTypeContent BucketDiffType = "content" +) + type ConditionItem struct { //MetricName string `json:"metric"` MinimumPeriodMatch int `json:"minimum_period_match"` @@ -51,6 +73,10 @@ type ConditionItem struct { Values []string `json:"values"` Priority string `json:"priority"` Expression string `json:"expression,omitempty"` + //bucket condition type, e.g: size, content + Type BucketDiffType `json:"type,omitempty"` + // Represents the number of buckets in the bucket condition type. + BucketCount int `json:"bucket_count,omitempty"` } func (cond *ConditionItem) GenerateConditionExpression() (conditionExpression string, err error) { diff --git a/model/alerting/metric.go b/model/alerting/metric.go index ac3869eb..07d50f76 100644 --- a/model/alerting/metric.go +++ b/model/alerting/metric.go @@ -81,10 +81,15 @@ type QueryResult struct { type MetricData struct { GroupValues []string `json:"group_values"` - Data map[string][]TimeMetricData `json:"data"` + Data map[string][]MetricDataItem `json:"data"` } -type TimeMetricData []interface{} +type MetricDataItem struct { + Timestamp interface{} `json:"timestamp,omitempty"` + Value interface{} `json:"value"` + Groups []string `json:"groups,omitempty"` + DocCount int `json:"doc_count,omitempty"` +} type AlertMetricItem struct { common.MetricItem diff --git a/model/alerting/rule.go b/model/alerting/rule.go index dc81ac69..c5c327ca 100644 --- a/model/alerting/rule.go +++ b/model/alerting/rule.go @@ -54,8 +54,9 @@ type Rule struct { Name string `json:"name" elastic_mapping:"name: { type: keyword }"` Id string `json:"id" elastic_mapping:"id: { type: keyword }"` } `json:"creator" elastic_mapping:"creator:{type:object}"` - Category string `json:"category,omitempty" elastic_mapping:"category: { type: keyword,copy_to:search_text }"` - Tags []string `json:"tags,omitempty" elastic_mapping:"tags: { type: keyword,copy_to:search_text }"` + Category string `json:"category,omitempty" elastic_mapping:"category: { type: keyword,copy_to:search_text }"` + Tags []string `json:"tags,omitempty" elastic_mapping:"tags: { type: keyword,copy_to:search_text }"` + BucketConditions *Condition `json:"bucket_conditions" elastic_mapping:"bucket_conditions:{type:object}"` } func (rule *Rule) GetOrInitExpression() (string, error) { diff --git a/plugin/api/alerting/message.go b/plugin/api/alerting/message.go index 024696b6..fe755304 100644 --- a/plugin/api/alerting/message.go +++ b/plugin/api/alerting/message.go @@ -404,13 +404,23 @@ func (h *AlertAPI) getAlertMessage(w http.ResponseWriter, req *http.Request, ps return } metricExpression, _ := rule.Metrics.GenerateExpression() - var hitCondition string - for i, cond := range rule.Conditions.Items { + var ( + hitCondition string + bucketDiffType string + ) + conditions := rule.Conditions + if rule.BucketConditions != nil { + conditions = *rule.BucketConditions + } + for i, cond := range conditions.Items { expression, _ := cond.GenerateConditionExpression() if cond.Priority == message.Priority { hitCondition = strings.ReplaceAll(expression, "result", "") + if rule.BucketConditions != nil { + bucketDiffType = string(cond.Type) + } } - rule.Conditions.Items[i].Expression = strings.ReplaceAll(expression, "result", metricExpression) + conditions.Items[i].Expression = strings.ReplaceAll(expression, "result", metricExpression) } var duration time.Duration if message.Status == alerting.MessageStateRecovered { @@ -419,26 +429,28 @@ func (h *AlertAPI) getAlertMessage(w http.ResponseWriter, req *http.Request, ps duration = time.Now().Sub(message.Created) } detailObj := util.MapStr{ - "message_id": message.ID, - "rule_id": message.RuleID, - "rule_name": rule.Name, - "rule_enabled": rule.Enabled, - "title": message.Title, - "message": message.Message, - "priority": message.Priority, - "created": message.Created, - "updated": message.Updated, - "resource_name": rule.Resource.Name, - "resource_id": rule.Resource.ID, - "resource_objects": rule.Resource.Objects, - "conditions": rule.Conditions, - "duration": duration.Milliseconds(), - "ignored_time": message.IgnoredTime, - "ignored_reason": message.IgnoredReason, - "ignored_user": message.IgnoredUser, - "status": message.Status, - "expression": rule.Metrics.Expression, - "hit_condition": hitCondition, + "message_id": message.ID, + "rule_id": message.RuleID, + "rule_name": rule.Name, + "rule_enabled": rule.Enabled, + "title": message.Title, + "message": message.Message, + "priority": message.Priority, + "created": message.Created, + "updated": message.Updated, + "resource_name": rule.Resource.Name, + "resource_id": rule.Resource.ID, + "resource_objects": rule.Resource.Objects, + "conditions": rule.Conditions, + "bucket_conditions": rule.BucketConditions, + "bucket_diff_type": bucketDiffType, + "duration": duration.Milliseconds(), + "ignored_time": message.IgnoredTime, + "ignored_reason": message.IgnoredReason, + "ignored_user": message.IgnoredUser, + "status": message.Status, + "expression": rule.Metrics.Expression, + "hit_condition": hitCondition, } h.WriteJSON(w, detailObj, http.StatusOK) } diff --git a/service/alerting/elasticsearch/engine.go b/service/alerting/elasticsearch/engine.go index 389e37c5..83df0ca1 100644 --- a/service/alerting/elasticsearch/engine.go +++ b/service/alerting/elasticsearch/engine.go @@ -329,9 +329,16 @@ func getQueryTimeRange(rule *alerting.Rule, filterParam *alerting.FilterParam) ( } else { return nil, fmt.Errorf("period interval: %s is too small", rule.Metrics.BucketSize) } - bucketCount := rule.Conditions.GetMinimumPeriodMatch() + 1 + var bucketCount int + if rule.BucketConditions != nil { + bucketCount = rule.BucketConditions.GetMaxBucketCount() + //for removing first and last time bucket + bucketCount += 2 + } else { + bucketCount = rule.Conditions.GetMinimumPeriodMatch() + 1 + } if bucketCount <= 0 { - bucketCount = 1 + bucketCount = 2 } duration, err := time.ParseDuration(fmt.Sprintf("%d%s", value*bucketCount, units)) if err != nil { @@ -484,7 +491,7 @@ func (engine *Engine) GetTargetMetricData(rule *alerting.Rule, isFilterNaN bool, } else { targetData = alerting.MetricData{ GroupValues: md.GroupValues, - Data: map[string][]alerting.TimeMetricData{}, + Data: map[string][]alerting.MetricDataItem{}, } expression, err := govaluate.NewEvaluableExpression(rule.Metrics.Formula) if err != nil { @@ -508,14 +515,14 @@ func (engine *Engine) GetTargetMetricData(rule *alerting.Rule, isFilterNaN bool, } //drop nil value bucket - if v == nil || len(v[i]) < 2 { + if v == nil { continue DataLoop } - if _, ok := v[i][1].(float64); !ok { + if _, ok := v[i].Value.(float64); !ok { continue DataLoop } - parameters[k] = v[i][1] - timestamp = v[i][0] + parameters[k] = v[i].Value + timestamp = v[i].Timestamp } if len(parameters) == 0 { continue @@ -528,13 +535,13 @@ func (engine *Engine) GetTargetMetricData(rule *alerting.Rule, isFilterNaN bool, if r, ok := result.(float64); ok { if math.IsNaN(r) || math.IsInf(r, 0) { if !isFilterNaN { - targetData.Data["result"] = append(targetData.Data["result"], []interface{}{timestamp, math.NaN()}) + targetData.Data["result"] = append(targetData.Data["result"], alerting.MetricDataItem{Timestamp: timestamp, Value: math.NaN()}) } continue } } - targetData.Data["result"] = append(targetData.Data["result"], []interface{}{timestamp, result}) + targetData.Data["result"] = append(targetData.Data["result"], alerting.MetricDataItem{Timestamp: timestamp, Value: result}) } } targetMetricData = append(targetMetricData, targetData) @@ -554,6 +561,9 @@ func (engine *Engine) CheckCondition(rule *alerting.Rule) (*alerting.ConditionRe if err != nil { return conditionResult, err } + if rule.BucketConditions != nil { + return engine.CheckBucketCondition(rule, targetMetricData, queryResult) + } for idx, targetData := range targetMetricData { if idx == 0 { sort.Slice(rule.Conditions.Items, func(i, j int) bool { @@ -579,16 +589,16 @@ func (engine *Engine) CheckCondition(rule *alerting.Rule) (*alerting.ConditionRe triggerCount := 0 for i := 0; i < dataLength; i++ { //clear nil value - if targetData.Data[dataKey][i][1] == nil { + if targetData.Data[dataKey][i].Value == nil { continue } - if r, ok := targetData.Data[dataKey][i][1].(float64); ok { + if r, ok := targetData.Data[dataKey][i].Value.(float64); ok { if math.IsNaN(r) { continue } } evaluateResult, err := expression.Evaluate(map[string]interface{}{ - "result": targetData.Data[dataKey][i][1], + "result": targetData.Data[dataKey][i].Value, }) if err != nil { return conditionResult, fmt.Errorf("evaluate rule [%s] error: %w", rule.ID, err) @@ -603,12 +613,12 @@ func (engine *Engine) CheckCondition(rule *alerting.Rule) (*alerting.ConditionRe resultItem := alerting.ConditionResultItem{ GroupValues: targetData.GroupValues, ConditionItem: &cond, - ResultValue: targetData.Data[dataKey][i][1], - IssueTimestamp: targetData.Data[dataKey][i][0], + ResultValue: targetData.Data[dataKey][i].Value, + IssueTimestamp: targetData.Data[dataKey][i].Timestamp, RelationValues: map[string]interface{}{}, } for _, metric := range rule.Metrics.Items { - resultItem.RelationValues[metric.Name] = queryResult.MetricData[idx].Data[metric.Name][i][1] + resultItem.RelationValues[metric.Name] = queryResult.MetricData[idx].Data[metric.Name][i].Value } resultItems = append(resultItems, resultItem) break LoopCondition @@ -621,6 +631,155 @@ func (engine *Engine) CheckCondition(rule *alerting.Rule) (*alerting.ConditionRe conditionResult.ResultItems = resultItems return conditionResult, nil } + +type BucketDiffState struct { + ContentChangeState int + DocCount int +} + +func (engine *Engine) CheckBucketCondition(rule *alerting.Rule, targetMetricData []alerting.MetricData, queryResult *alerting.QueryResult) (*alerting.ConditionResult, error) { + var resultItems []alerting.ConditionResultItem + conditionResult := &alerting.ConditionResult{ + QueryResult: queryResult, + } + //transform targetMetricData + var ( + times = map[int64]struct{}{} + buckets = map[string]map[int64]int{} + maxTime int64 + minTime = time.Now().UnixMilli() + ) + for _, targetData := range targetMetricData { + for _, v := range targetData.Data { + for _, item := range v { + if tv, ok := item.Timestamp.(float64); ok { + timestamp := int64(tv) + if timestamp < minTime { + minTime = timestamp + } + if timestamp > maxTime { + maxTime = timestamp + } + if _, ok = times[timestamp]; !ok { + times[timestamp] = struct{}{} + } + bucketKey := strings.Join(targetData.GroupValues, "*") + if _, ok = buckets[bucketKey]; !ok { + buckets[bucketKey] = map[int64]int{} + } + buckets[bucketKey][timestamp] = item.DocCount + } else { + log.Warnf("invalid timestamp type: %T", item.Timestamp) + } + } + } + } + var timesArr []int64 + for t := range times { + timesArr = append(timesArr, t) + } + sort.Slice(timesArr, func(i, j int) bool { + return timesArr[i] < timesArr[j] // Ascending order + }) + + // Remove the first bucket if its timestamp equals minTime, and + // the last bucket if its timestamp equals maxTime + if len(timesArr) > 0 && timesArr[0] == minTime { + // Remove first bucket if timestamp matches minTime + timesArr = timesArr[1:] + } + if len(timesArr) > 0 && timesArr[len(timesArr)-1] == maxTime { + // Remove last bucket if timestamp matches maxTime + timesArr = timesArr[:len(timesArr)-1] + } + + //check bucket diff + diffResult := map[string]map[int64]BucketDiffState{} + for grps, bk := range buckets { + hasPre := false + if _, ok := diffResult[grps]; !ok { + diffResult[grps] = map[int64]BucketDiffState{} + } + for i, t := range timesArr { + if v, ok := bk[t]; !ok { + if hasPre { + diffResult[grps][t] = BucketDiffState{ + ContentChangeState: -1, + } + } + // reset hasPre to false + hasPre = false + } else { + if !hasPre { + if i > 0 { + diffResult[grps][t] = BucketDiffState{ + ContentChangeState: 1, + } + } + } else { + diffResult[grps][t] = BucketDiffState{ + ContentChangeState: 0, + DocCount: v - bk[timesArr[i-1]], + } + } + hasPre = true + } + } + } + + sort.Slice(rule.BucketConditions.Items, func(i, j int) bool { + return alerting.PriorityWeights[rule.BucketConditions.Items[i].Priority] > alerting.PriorityWeights[rule.BucketConditions.Items[j].Priority] + }) + + for grps, states := range diffResult { + LoopCondition: + for _, cond := range rule.BucketConditions.Items { + conditionExpression, err := cond.GenerateConditionExpression() + if err != nil { + return conditionResult, err + } + expression, err := govaluate.NewEvaluableExpression(conditionExpression) + if err != nil { + return conditionResult, err + } + triggerCount := 0 + for t, state := range states { + resultValue := state.DocCount + if cond.Type == alerting.BucketDiffTypeContent { + resultValue = state.ContentChangeState + } + evaluateResult, err := expression.Evaluate(map[string]interface{}{ + "result": resultValue, + }) + if err != nil { + return conditionResult, fmt.Errorf("evaluate rule [%s] error: %w", rule.ID, err) + } + if evaluateResult == true { + triggerCount += 1 + } else { + triggerCount = 0 + } + if triggerCount >= cond.MinimumPeriodMatch { + groupValues := strings.Split(grps, "*") + log.Debugf("triggered condition %v, groups: %v\n", cond, groupValues) + resultItem := alerting.ConditionResultItem{ + GroupValues: groupValues, + ConditionItem: &cond, + ResultValue: resultValue, + IssueTimestamp: t, + RelationValues: map[string]interface{}{}, + } + resultItems = append(resultItems, resultItem) + break LoopCondition + } + } + } + + } + conditionResult.QueryResult.MetricData = targetMetricData + conditionResult.ResultItems = resultItems + return conditionResult, nil +} func (engine *Engine) Do(rule *alerting.Rule) error { var ( @@ -755,15 +914,9 @@ func (engine *Engine) Do(rule *alerting.Rule) error { }) alertItem.Priority = priority - title, message := rule.GetNotificationTitleAndMessage() - err = attachTitleMessageToCtx(title, message, paramsCtx) - if err != nil { - return err - } - alertItem.Message = paramsCtx[alerting2.ParamMessage].(string) - alertItem.Title = paramsCtx[alerting2.ParamTitle].(string) + var newAlertMessage *alerting.AlertMessage if alertMessage == nil || alertMessage.Status == alerting.MessageStateRecovered { - msg := &alerting.AlertMessage{ + newAlertMessage = &alerting.AlertMessage{ RuleID: rule.ID, Created: alertItem.Created, Updated: time.Now(), @@ -772,13 +925,25 @@ func (engine *Engine) Do(rule *alerting.Rule) error { ResourceName: rule.Resource.Name, Status: alerting.MessageStateAlerting, Priority: priority, - Title: alertItem.Title, - Message: alertItem.Message, Tags: rule.Tags, Category: rule.Category, } - alertMessage = msg - err = saveAlertMessage(msg) + paramsCtx[alerting2.ParamEventID] = newAlertMessage.ID + } else { + paramsCtx[alerting2.ParamEventID] = alertMessage.ID + } + title, message := rule.GetNotificationTitleAndMessage() + err = attachTitleMessageToCtx(title, message, paramsCtx) + if err != nil { + return err + } + alertItem.Message = paramsCtx[alerting2.ParamMessage].(string) + alertItem.Title = paramsCtx[alerting2.ParamTitle].(string) + if newAlertMessage != nil { + alertMessage = newAlertMessage + alertMessage.Title = alertItem.Title + alertMessage.Message = alertItem.Message + err = saveAlertMessage(newAlertMessage) if err != nil { return fmt.Errorf("save alert message error: %w", err) } @@ -813,10 +978,10 @@ func (engine *Engine) Do(rule *alerting.Rule) error { log.Debugf("check condition result of rule %s is %v", conditionResults, rule.ID) // if alert message status equals ignored , then skip sending message to channel - if alertMessage != nil && alertMessage.Status == alerting.MessageStateIgnored { + if alertMessage.Status == alerting.MessageStateIgnored { return nil } - if alertMessage != nil && paramsCtx != nil { + if paramsCtx != nil { paramsCtx[alerting2.ParamEventID] = alertMessage.ID } // if channel is not enabled return @@ -1135,12 +1300,16 @@ func collectMetricData(agg interface{}, groupValues string, metricData *[]alerti if timeBks, ok := aggM["time_buckets"].(map[string]interface{}); ok { if bks, ok := timeBks["buckets"].([]interface{}); ok { md := alerting.MetricData{ - Data: map[string][]alerting.TimeMetricData{}, + Data: map[string][]alerting.MetricDataItem{}, GroupValues: strings.Split(groupValues, "*"), } for _, bk := range bks { if bkM, ok := bk.(map[string]interface{}); ok { + var docCount int + if v, ok := bkM["doc_count"]; ok { + docCount = int(v.(float64)) + } for k, v := range bkM { if k == "key" || k == "key_as_string" || k == "doc_count" { continue @@ -1150,20 +1319,20 @@ func collectMetricData(agg interface{}, groupValues string, metricData *[]alerti } if vm, ok := v.(map[string]interface{}); ok { if metricVal, ok := vm["value"]; ok { - md.Data[k] = append(md.Data[k], alerting.TimeMetricData{bkM["key"], metricVal}) + md.Data[k] = append(md.Data[k], alerting.MetricDataItem{Timestamp: bkM["key"], Value: metricVal, DocCount: docCount}) } else { //percentiles agg type switch vm["values"].(type) { case []interface{}: for _, val := range vm["values"].([]interface{}) { if valM, ok := val.(map[string]interface{}); ok { - md.Data[k] = append(md.Data[k], alerting.TimeMetricData{bkM["key"], valM["value"]}) + md.Data[k] = append(md.Data[k], alerting.MetricDataItem{Timestamp: bkM["key"], Value: valM["value"], DocCount: docCount}) } break } case map[string]interface{}: for _, val := range vm["values"].(map[string]interface{}) { - md.Data[k] = append(md.Data[k], alerting.TimeMetricData{bkM["key"], val}) + md.Data[k] = append(md.Data[k], alerting.MetricDataItem{Timestamp: bkM["key"], Value: val, DocCount: docCount}) break } }