diff --git a/model/alerting/alert.go b/model/alerting/alert.go index 2b23e95f..8a989fed 100644 --- a/model/alerting/alert.go +++ b/model/alerting/alert.go @@ -22,7 +22,9 @@ type Alert struct { Title string `json:"title" elastic_mapping:"title: { type: keyword }"` Message string `json:"message" elastic_mapping:"context: { type: keyword, copy_to:search_text }"` AcknowledgedTime interface{} `json:"acknowledged_time,omitempty"` - ActionExecutionResults []ActionExecutionResult `json:"action_execution_results"` + ActionExecutionResults []ActionExecutionResult `json:"action_execution_results,omitempty"` + RecoverActionResults []ActionExecutionResult `json:"recover_action_results,omitempty"` + EscalationActionResults []ActionExecutionResult `json:"escalation_action_results,omitempty"` Users []string `json:"users,omitempty"` State string `json:"state"` Error string `json:"error,omitempty"` diff --git a/plugin/api/alerting/api.go b/plugin/api/alerting/api.go index 995ccd00..eca3a292 100644 --- a/plugin/api/alerting/api.go +++ b/plugin/api/alerting/api.go @@ -27,6 +27,8 @@ func (alert *AlertAPI) Init() { api.HandleAPIMethod(api.POST, "/alerting/rule/:rule_id/_enable", alert.RequirePermission(alert.enableRule, enum.PermissionAlertRuleWrite)) api.HandleAPIMethod(api.GET, "/alerting/rule/:rule_id/metric", alert.RequirePermission(alert.getMetricData, enum.PermissionAlertRuleRead)) api.HandleAPIMethod(api.GET, "/alerting/rule/:rule_id/info", alert.RequirePermission(alert.getRuleDetail, enum.PermissionAlertRuleRead, enum.PermissionAlertMessageRead)) + api.HandleAPIMethod(api.POST, "/alerting/rule/_enable", alert.RequirePermission(alert.batchEnableRule, enum.PermissionAlertRuleWrite)) + api.HandleAPIMethod(api.POST, "/alerting/rule/_disable", alert.RequirePermission(alert.batchDisableRule, enum.PermissionAlertRuleWrite)) api.HandleAPIMethod(api.GET, "/alerting/channel/:channel_id", alert.RequirePermission(alert.getChannel, enum.PermissionAlertChannelRead)) api.HandleAPIMethod(api.POST, "/alerting/channel", alert.RequirePermission(alert.createChannel, enum.PermissionAlertChannelWrite)) @@ -43,6 +45,7 @@ func (alert *AlertAPI) Init() { api.HandleAPIMethod(api.POST, "/alerting/message/_ignore", alert.RequirePermission(alert.ignoreAlertMessage, enum.PermissionAlertMessageWrite)) api.HandleAPIMethod(api.GET, "/alerting/message/_stats", alert.RequirePermission(alert.getAlertMessageStats, enum.PermissionAlertMessageRead)) api.HandleAPIMethod(api.GET, "/alerting/message/:message_id", alert.RequirePermission(alert.getAlertMessage, enum.PermissionAlertMessageRead)) + api.HandleAPIMethod(api.GET, "/alerting/message/:message_id/notification", alert.getMessageNotificationInfo) //just for test diff --git a/plugin/api/alerting/message.go b/plugin/api/alerting/message.go index b30368bb..c583b645 100644 --- a/plugin/api/alerting/message.go +++ b/plugin/api/alerting/message.go @@ -6,6 +6,7 @@ package alerting import ( "fmt" + "github.com/buger/jsonparser" log "github.com/cihub/seelog" "infini.sh/console/model/alerting" alerting2 "infini.sh/console/service/alerting" @@ -391,4 +392,192 @@ func (h *AlertAPI) getAlertMessage(w http.ResponseWriter, req *http.Request, ps "hit_condition": hitCondition, } h.WriteJSON(w, detailObj, http.StatusOK) +} + +func (h *AlertAPI) getMessageNotificationInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + message := &alerting.AlertMessage{ + ID: ps.ByName("message_id"), + } + exists, err := orm.Get(message) + if !exists || err != nil { + log.Error(err) + h.WriteJSON(w, util.MapStr{ + "_id": message.ID, + "found": false, + }, http.StatusNotFound) + return + } + rule := &alerting.Rule{ + ID: message.RuleID, + } + exists, err = orm.Get(rule) + if !exists || err != nil { + log.Error(err) + h.WriteError(w, fmt.Sprintf("rule [%s] not found", rule.ID), http.StatusInternalServerError) + return + } + notificationInfo := util.MapStr{} + if rule.NotificationConfig == nil && rule.RecoveryNotificationConfig == nil { + notificationInfo["is_empty"] = true + h.WriteJSON(w, notificationInfo, http.StatusOK) + return + } + stats, err := getMessageNotificationStats(message) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + if rule.NotificationConfig != nil { + notificationInfo["alerting"] = util.MapStr{ + "accept_time_range": rule.NotificationConfig.AcceptTimeRange, + "throttle_period": rule.NotificationConfig.ThrottlePeriod, + "escalation_enabled": rule.NotificationConfig.EscalationEnabled, + "escalation_throttle_period": rule.NotificationConfig.EscalationThrottlePeriod, + "normal_stats": stats["normal"], + "escalation_stats": stats["escalation"], + } + } + if rule.RecoveryNotificationConfig != nil { + notificationInfo["recovery"] = util.MapStr{ + "stats": stats["recovery"], + } + } + h.WriteJSON(w, notificationInfo, http.StatusOK) +} + +func getMessageNotificationStats(msg *alerting.AlertMessage )(util.MapStr, error){ + rangeQ := util.MapStr{ + "gte": msg.Created.UnixMilli(), + } + if msg.Status == alerting.MessageStateRecovered { + rangeQ["lte"] = msg.Updated.UnixMilli() + } + aggs := util.MapStr{ + "grp_normal_channel": util.MapStr{ + "terms": util.MapStr{ + "field": "action_execution_results.channel_type", + "size": 20, + }, + "aggs": util.MapStr{ + "top": util.MapStr{ + "top_hits": util.MapStr{ + "sort": []util.MapStr{ + { + "created": util.MapStr{ + "order": "desc", + }, + }, + }, + "_source": util.MapStr{ + "includes": []string{"created", "action_execution_results.channel_name"}, + }, + "size": 1, + }, + }, + }, + }, + "grp_escalation_channel": util.MapStr{ + "terms": util.MapStr{ + "field": "escalation_action_results.channel_type", + "size": 20, + }, + "aggs": util.MapStr{ + "top": util.MapStr{ + "top_hits": util.MapStr{ + "sort": []util.MapStr{ + { + "created": util.MapStr{ + "order": "desc", + }, + }, + }, + "_source": util.MapStr{ + "includes": []string{"created", "escalation_action_results.channel_name"}, + }, + "size": 1, + }, + }, + }, + }, + } + if msg.Status == alerting.MessageStateRecovered { + aggs["grp_recover_channel"] = util.MapStr{ + "terms": util.MapStr{ + "field": "recover_action_results.channel_type", + "size": 20, + }, + "aggs": util.MapStr{ + "top": util.MapStr{ + "top_hits": util.MapStr{ + "sort": []util.MapStr{ + { + "created": util.MapStr{ + "order": "desc", + }, + }, + }, + "_source": util.MapStr{ + "includes": []string{"created", "recover_action_results.channel_name"}, + }, + "size": 1, + }, + }, + }, + } + } + query := util.MapStr{ + "size": 0, + "aggs": aggs, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "range": util.MapStr{ + "created": rangeQ, + }, + }, + { + "term": util.MapStr{ + "rule_id": util.MapStr{ + "value": msg.RuleID, + }, + }, + }, + }, + }, + }, + } + q := orm.Query{ + RawQuery: util.MustToJSONBytes(query), + } + err, result := orm.Search(alerting.Alert{}, &q) + if err != nil { + return nil, err + } + + var normalStats = extractStatsFromRaw(result.Raw, "grp_normal_channel", "action_execution_results") + var escalationStats = extractStatsFromRaw(result.Raw, "grp_escalation_channel", "escalation_action_results") + stats := util.MapStr{ + "normal": normalStats, + "escalation": escalationStats, + } + if msg.Status == alerting.MessageStateRecovered { + recoverStats := extractStatsFromRaw(result.Raw, "grp_recover_channel", "recover_action_results") + stats["recovery"] = recoverStats + } + + return stats, nil +} +func extractStatsFromRaw(searchRawRes []byte, grpKey string, actionKey string) []util.MapStr { + var stats []util.MapStr + jsonparser.ArrayEach(searchRawRes, func(value []byte, dataType jsonparser.ValueType, offset int, err error) { + statsItem := util.MapStr{} + statsItem["channel_type"], _ = jsonparser.GetString(value, "key") + statsItem["count"], _ = jsonparser.GetInt(value, "doc_count") + statsItem["channel_name"], _ = jsonparser.GetString(value, "top", "hits","hits", "[0]", "_source",actionKey, "[0]", "channel_name") + statsItem["last_time"], _ = jsonparser.GetString(value, "top", "hits","hits", "[0]", "_source","created") + stats = append(stats, statsItem) + }, "aggregations", grpKey, "buckets") + return stats } \ No newline at end of file diff --git a/plugin/api/alerting/rule.go b/plugin/api/alerting/rule.go index e9b18129..ea4745d0 100644 --- a/plugin/api/alerting/rule.go +++ b/plugin/api/alerting/rule.go @@ -592,9 +592,7 @@ func (alertAPI *AlertAPI) fetchAlertInfos(w http.ResponseWriter, req *http.Reque searchRes, err := esClient.SearchWithRawQueryDSL(orm.GetWildcardIndexName(alerting.Alert{}), util.MustToJSONBytes(queryDsl) ) if err != nil { log.Error(err) - alertAPI.WriteJSON(w, util.MapStr{ - "error": err.Error(), - }, http.StatusInternalServerError) + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) return } if len(searchRes.Hits.Hits) == 0 { @@ -609,6 +607,50 @@ func (alertAPI *AlertAPI) fetchAlertInfos(w http.ResponseWriter, req *http.Reque "status": hit.Source["state"], } } + } + queryDsl = util.MapStr{ + "_source": []string{"created", "rule_id"}, + "sort": []util.MapStr{ + { + "created": util.MapStr{ + "order": "desc", + }, + }, + }, + "collapse": util.MapStr{ + "field": "rule_id", + }, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "terms": util.MapStr{ + "rule_id": ruleIDs, + }, + }, + { + "term": util.MapStr{ + "state": util.MapStr{ + "value": alerting.AlertStateAlerting, + }, + }, + }, + }, + }, + }, + } + searchRes, err = esClient.SearchWithRawQueryDSL(orm.GetWildcardIndexName(alerting.Alert{}), util.MustToJSONBytes(queryDsl) ) + if err != nil { + log.Error(err) + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + for _, hit := range searchRes.Hits.Hits { + if ruleID, ok := hit.Source["rule_id"].(string); ok { + if _, ok = latestAlertInfos[ruleID]; ok { + latestAlertInfos[ruleID]["last_notification_time"] = hit.Source["created"] + } + } } alertAPI.WriteJSON(w, latestAlertInfos, http.StatusOK) @@ -636,20 +678,9 @@ func (alertAPI *AlertAPI) enableRule(w http.ResponseWriter, req *http.Request, p return } if reqObj.Enabled { - eng := alerting2.GetEngine(obj.Resource.Type) - ruleTask := task.ScheduleTask{ - ID: obj.ID, - Interval: obj.Schedule.Interval, - Description: obj.Metrics.Expression, - Task: eng.GenerateTask(obj), - } - task.DeleteTask(ruleTask.ID) - clearKV(ruleTask.ID) - task.RegisterScheduleTask(ruleTask) - task.StartTask(ruleTask.ID) - }else{ - task.DeleteTask(id) - clearKV(id) + enableRule(&obj) + } else { + disableRule(&obj) } obj.Enabled = reqObj.Enabled err = orm.Save(nil, obj) @@ -660,10 +691,28 @@ func (alertAPI *AlertAPI) enableRule(w http.ResponseWriter, req *http.Request, p } alertAPI.WriteJSON(w, util.MapStr{ "result": "updated", - "_id": id, + "_id": id, }, http.StatusOK) } +func enableRule(obj *alerting.Rule) { + eng := alerting2.GetEngine(obj.Resource.Type) + ruleTask := task.ScheduleTask{ + ID: obj.ID, + Interval: obj.Schedule.Interval, + Description: obj.Metrics.Expression, + Task: eng.GenerateTask(*obj), + } + task.DeleteTask(ruleTask.ID) + clearKV(ruleTask.ID) + task.RegisterScheduleTask(ruleTask) + task.StartTask(ruleTask.ID) +} +func disableRule(obj *alerting.Rule) { + task.DeleteTask(obj.ID) + clearKV(obj.ID) +} + func (alertAPI *AlertAPI) sendTestMessage(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { typ := alertAPI.GetParameterOrDefault(req, "type", "notification") rule := alerting.Rule{} @@ -917,4 +966,129 @@ func getRuleMetricData( rule *alerting.Rule, filterParam *alerting.FilterParam) }) } return &metricItem,queryResult, nil +} + +func (alertAPI *AlertAPI) batchEnableRule(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + var ruleIDs = []string{} + err := alertAPI.DecodeJSON(req, &ruleIDs) + if err != nil { + log.Error(err) + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + if len(ruleIDs) == 0 { + alertAPI.WriteJSON(w, util.MapStr{}, http.StatusOK) + return + } + rules, err := getRulesByID(ruleIDs) + if err != nil { + log.Error(err) + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + var newIDs []string + for _, rule := range rules { + if !rule.Enabled { + enableRule(&rule) + newIDs = append(newIDs, rule.ID) + } + } + if len(newIDs) > 0 { + q := util.MapStr{ + "query": util.MapStr{ + "terms": util.MapStr{ + "id": newIDs, + }, + }, + "script": util.MapStr{ + "source": fmt.Sprintf("ctx._source['enabled'] = %v", true), + }, + } + err = orm.UpdateBy(alerting.Rule{}, util.MustToJSONBytes(q)) + if err != nil { + log.Error(err) + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + } + alertAPI.WriteAckOKJSON(w) +} + +func (alertAPI *AlertAPI) batchDisableRule(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + var ruleIDs = []string{} + err := alertAPI.DecodeJSON(req, &ruleIDs) + if err != nil { + log.Error(err) + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + if len(ruleIDs) == 0 { + alertAPI.WriteJSON(w, util.MapStr{}, http.StatusOK) + return + } + rules, err := getRulesByID(ruleIDs) + if err != nil { + log.Error(err) + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + log.Info(rules) + var newIDs []string + for _, rule := range rules { + if rule.Enabled { + disableRule(&rule) + newIDs = append(newIDs, rule.ID) + } + } + if len(newIDs) > 0 { + q := util.MapStr{ + "query": util.MapStr{ + "terms": util.MapStr{ + "id": newIDs, + }, + }, + "script": util.MapStr{ + "source": fmt.Sprintf("ctx._source['enabled'] = %v", false), + }, + } + log.Info(util.MustToJSON(q)) + err = orm.UpdateBy(alerting.Rule{}, util.MustToJSONBytes(q)) + if err != nil { + log.Error(err) + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + } + alertAPI.WriteAckOKJSON(w) +} + +func getRulesByID(ruleIDs []string) ([]alerting.Rule, error){ + if len(ruleIDs) == 0 { + return nil, nil + } + query := util.MapStr{ + "size": len(ruleIDs), + "query": util.MapStr{ + "terms": util.MapStr{ + "id": ruleIDs, + }, + }, + } + q := &orm.Query{ + RawQuery: util.MustToJSONBytes(query), + } + err, result := orm.Search(alerting.Rule{}, q) + if err != nil { + return nil, err + } + var rules []alerting.Rule + for _, row := range result.Result { + buf := util.MustToJSONBytes(row) + rule := alerting.Rule{} + util.MustFromJSONBytes(buf, &rule) + rules = append(rules, rule) + } + return rules, nil } \ No newline at end of file diff --git a/plugin/api/data/export.go b/plugin/api/data/export.go index dea5b5d0..36f415f8 100644 --- a/plugin/api/data/export.go +++ b/plugin/api/data/export.go @@ -25,7 +25,7 @@ func (h *DataAPI) exportData(w http.ResponseWriter, req *http.Request, ps httpro } var resBody []ExportData for _, meta := range reqBody.Metadatas { - result, err := getExportData(meta.Type) + result, err := getExportData(meta) if err != nil { log.Error(err) h.WriteError(w, err.Error(), http.StatusInternalServerError) @@ -86,9 +86,9 @@ func indexExportData(eds []ExportData) error { return nil } -func getExportData(typ string) (*orm.Result, error) { +func getExportData(meta ExportMetadata) (*orm.Result, error) { var obj interface{} - switch typ { + switch meta.Type { case DataTypeAlertChannel: obj = alerting.Channel{} case DataTypeAlertRule: @@ -96,11 +96,19 @@ func getExportData(typ string) (*orm.Result, error) { case DataTypeAlertEmailServer: obj = model.EmailServer{} default: - return nil, fmt.Errorf("unkonw data type: %s", typ) + return nil, fmt.Errorf("unkonw data type: %s", meta.Type) } - err, result := orm.Search(obj, &orm.Query{ + q := &orm.Query{ Size: 1000, - }) + } + if meta.Filter != nil { + query := util.MapStr{ + "size": 1000, + "query": meta.Filter, + } + q.RawQuery = util.MustToJSONBytes(query) + } + err, result := orm.Search(obj, q) if err != nil { return nil, err } diff --git a/plugin/api/data/model.go b/plugin/api/data/model.go index c87eae0f..1635285f 100644 --- a/plugin/api/data/model.go +++ b/plugin/api/data/model.go @@ -10,6 +10,7 @@ type ExportDataRequest struct { type ExportMetadata struct { Type string `json:"type"` + Filter interface{} `json:"filter,omitempty"` } type ExportData struct { diff --git a/service/alerting/elasticsearch/engine.go b/service/alerting/elasticsearch/engine.go index 18348591..038baba0 100644 --- a/service/alerting/elasticsearch/engine.go +++ b/service/alerting/elasticsearch/engine.go @@ -699,7 +699,7 @@ func (engine *Engine) Do(rule *alerting.Rule) error { return err } actionResults, _ := performChannels(recoverCfg.Normal, paramsCtx, false) - alertItem.ActionExecutionResults = actionResults + alertItem.RecoverActionResults = actionResults } } return nil @@ -852,7 +852,7 @@ func (engine *Engine) Do(rule *alerting.Rule) error { } if time.Now().Sub(rule.LastEscalationTime.Local()) > periodDuration { actionResults, errCount := performChannels(notifyCfg.Escalation, paramsCtx, false) - alertItem.ActionExecutionResults = actionResults + alertItem.EscalationActionResults = actionResults //todo init last escalation time when create task (by last alert item is escalated) if errCount == 0 { rule.LastEscalationTime = time.Now() @@ -1063,7 +1063,7 @@ func performChannels(channels []alerting.Channel, ctx map[string]interface{}, ra Error: errStr, Message: string(messageBytes), ExecutionTime: int(time.Now().UnixNano()/1e6), - ChannelType: channel.Type, + ChannelType: channel.SubType, ChannelName: channel.Name, }) }