diff --git a/main.go b/main.go index 9f47c0e7..e1c7b700 100644 --- a/main.go +++ b/main.go @@ -133,6 +133,7 @@ func main() { orm.RegisterSchemaWithIndexName(alerting.Channel{}, "channel") orm.RegisterSchemaWithIndexName(insight.Visualization{}, "visualization") orm.RegisterSchemaWithIndexName(insight.Dashboard{}, "dashboard") + orm.RegisterSchemaWithIndexName(insight.Widget{}, "widget") orm.RegisterSchemaWithIndexName(task1.Task{}, "task") orm.RegisterSchemaWithIndexName(model.Layout{}, "layout") orm.RegisterSchemaWithIndexName(model.Notification{}, "notification") diff --git a/model/alerting/alert.go b/model/alerting/alert.go index 6e21f572..fc8bf8b1 100644 --- a/model/alerting/alert.go +++ b/model/alerting/alert.go @@ -22,7 +22,9 @@ type Alert struct { Title string `json:"title" elastic_mapping:"title: { type: keyword }"` Message string `json:"message" elastic_mapping:"context: { type: keyword, copy_to:search_text }"` AcknowledgedTime interface{} `json:"acknowledged_time,omitempty"` - ActionExecutionResults []ActionExecutionResult `json:"action_execution_results"` + ActionExecutionResults []ActionExecutionResult `json:"action_execution_results,omitempty"` + RecoverActionResults []ActionExecutionResult `json:"recover_action_results,omitempty"` + EscalationActionResults []ActionExecutionResult `json:"escalation_action_results,omitempty"` Users []string `json:"users,omitempty"` State string `json:"state"` Error string `json:"error,omitempty"` @@ -40,6 +42,7 @@ type ActionExecutionResult struct { Message string `json:"message"` ChannelName string `json:"channel_name"` ChannelType string `json:"channel_type"` + ChannelID string `json:"channel_id"` } const ( @@ -70,6 +73,8 @@ type AlertMessage struct { IgnoredUser string `json:"ignored_user,omitempty" elastic_mapping:"ignored_user: { type: keyword,copy_to:search_text }"` Priority string `json:"priority" elastic_mapping:"priority: { type: keyword }"` SearchText string `json:"-" elastic_mapping:"search_text:{type:text,index_prefixes:{},index_phrases:true, analyzer:suggest_text_search }"` + Category string `json:"category,omitempty" elastic_mapping:"category: { type: keyword,copy_to:search_text }"` + Tags []string `json:"tags,omitempty" elastic_mapping:"tags: { type: keyword,copy_to:search_text }"` } /* diff --git a/model/alerting/rule.go b/model/alerting/rule.go index e930d64f..9ba01324 100644 --- a/model/alerting/rule.go +++ b/model/alerting/rule.go @@ -31,6 +31,8 @@ type Rule struct { Name string `json:"name" elastic_mapping:"name: { type: keyword }"` Id string `json:"id" elastic_mapping:"id: { type: keyword }"` } `json:"creator" elastic_mapping:"creator:{type:object}"` + Category string `json:"category,omitempty" elastic_mapping:"category: { type: keyword,copy_to:search_text }"` + Tags []string `json:"tags,omitempty" elastic_mapping:"tags: { type: keyword,copy_to:search_text }"` } func (rule *Rule) GetOrInitExpression() (string, error){ diff --git a/model/insight/widget.go b/model/insight/widget.go new file mode 100644 index 00000000..e004cc3f --- /dev/null +++ b/model/insight/widget.go @@ -0,0 +1,13 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package insight + +import "infini.sh/framework/core/orm" + +type Widget struct { + orm.ORMObjectBase + Title string `json:"title" elastic_mapping:"title: { type: text }"` + Config interface{}`json:"config" elastic_mapping:"config: { type: object,enabled:false }"` +} diff --git a/plugin/api/alerting/api.go b/plugin/api/alerting/api.go index 995ccd00..87d80eda 100644 --- a/plugin/api/alerting/api.go +++ b/plugin/api/alerting/api.go @@ -19,6 +19,7 @@ func (alert *AlertAPI) Init() { api.HandleAPIMethod(api.POST, "/alerting/rule", alert.RequirePermission(alert.createRule, enum.PermissionAlertRuleWrite)) api.HandleAPIMethod(api.POST, "/alerting/rule/test", alert.RequireLogin(alert.sendTestMessage)) api.HandleAPIMethod(api.DELETE, "/alerting/rule/:rule_id", alert.RequirePermission(alert.deleteRule, enum.PermissionAlertRuleWrite)) + api.HandleAPIMethod(api.DELETE, "/alerting/rule", alert.RequirePermission(alert.batchDeleteRule, enum.PermissionAlertRuleWrite)) api.HandleAPIMethod(api.PUT, "/alerting/rule/:rule_id", alert.RequirePermission(alert.updateRule, enum.PermissionAlertRuleWrite)) api.HandleAPIMethod(api.GET, "/alerting/rule/_search", alert.RequirePermission(alert.searchRule, enum.PermissionAlertRuleRead)) api.HandleAPIMethod(api.GET, "/alerting/stats", alert.RequirePermission(alert.getAlertStats, enum.PermissionAlertHistoryRead)) @@ -27,6 +28,9 @@ func (alert *AlertAPI) Init() { api.HandleAPIMethod(api.POST, "/alerting/rule/:rule_id/_enable", alert.RequirePermission(alert.enableRule, enum.PermissionAlertRuleWrite)) api.HandleAPIMethod(api.GET, "/alerting/rule/:rule_id/metric", alert.RequirePermission(alert.getMetricData, enum.PermissionAlertRuleRead)) api.HandleAPIMethod(api.GET, "/alerting/rule/:rule_id/info", alert.RequirePermission(alert.getRuleDetail, enum.PermissionAlertRuleRead, enum.PermissionAlertMessageRead)) + api.HandleAPIMethod(api.POST, "/alerting/rule/_enable", alert.RequirePermission(alert.batchEnableRule, enum.PermissionAlertRuleWrite)) + api.HandleAPIMethod(api.POST, "/alerting/rule/_disable", alert.RequirePermission(alert.batchDisableRule, enum.PermissionAlertRuleWrite)) + api.HandleAPIMethod(api.GET, "/alerting/rule/_search_values", alert.RequirePermission(alert.searchFieldValues, enum.PermissionAlertRuleRead)) api.HandleAPIMethod(api.GET, "/alerting/channel/:channel_id", alert.RequirePermission(alert.getChannel, enum.PermissionAlertChannelRead)) api.HandleAPIMethod(api.POST, "/alerting/channel", alert.RequirePermission(alert.createChannel, enum.PermissionAlertChannelWrite)) @@ -34,6 +38,8 @@ func (alert *AlertAPI) Init() { api.HandleAPIMethod(api.PUT, "/alerting/channel/:channel_id", alert.RequirePermission(alert.updateChannel, enum.PermissionAlertChannelWrite)) api.HandleAPIMethod(api.GET, "/alerting/channel/_search", alert.RequirePermission(alert.searchChannel, enum.PermissionAlertChannelRead)) api.HandleAPIMethod(api.POST, "/alerting/channel/test", alert.RequirePermission(alert.testChannel, enum.PermissionAlertChannelWrite)) + api.HandleAPIMethod(api.POST, "/alerting/channel/_enable", alert.RequirePermission(alert.batchEnableChannel, enum.PermissionAlertChannelWrite)) + api.HandleAPIMethod(api.POST, "/alerting/channel/_disable", alert.RequirePermission(alert.batchDisableChannel, enum.PermissionAlertChannelWrite)) api.HandleAPIMethod(api.GET, "/alerting/alert/_search", alert.RequirePermission(alert.searchAlert, enum.PermissionAlertHistoryRead)) api.HandleAPIMethod(api.GET, "/alerting/alert/:alert_id", alert.RequirePermission(alert.getAlert, enum.PermissionAlertHistoryRead)) @@ -43,6 +49,7 @@ func (alert *AlertAPI) Init() { api.HandleAPIMethod(api.POST, "/alerting/message/_ignore", alert.RequirePermission(alert.ignoreAlertMessage, enum.PermissionAlertMessageWrite)) api.HandleAPIMethod(api.GET, "/alerting/message/_stats", alert.RequirePermission(alert.getAlertMessageStats, enum.PermissionAlertMessageRead)) api.HandleAPIMethod(api.GET, "/alerting/message/:message_id", alert.RequirePermission(alert.getAlertMessage, enum.PermissionAlertMessageRead)) + api.HandleAPIMethod(api.GET, "/alerting/message/:message_id/notification", alert.getMessageNotificationInfo) //just for test diff --git a/plugin/api/alerting/channel.go b/plugin/api/alerting/channel.go index 6999098b..fb1de867 100644 --- a/plugin/api/alerting/channel.go +++ b/plugin/api/alerting/channel.go @@ -315,4 +315,65 @@ func (h *AlertAPI) testChannel(w http.ResponseWriter, req *http.Request, ps http return } h.WriteAckOKJSON(w) +} + +func (alertAPI *AlertAPI) batchEnableChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + var channelIDs = []string{} + err := alertAPI.DecodeJSON(req, &channelIDs) + if err != nil { + log.Error(err) + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + if len(channelIDs) == 0 { + alertAPI.WriteJSON(w, util.MapStr{}, http.StatusOK) + return + } + if len(channelIDs) > 0 { + err = setChannelEnabled(true, channelIDs) + if err != nil { + log.Error(err) + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + } + alertAPI.WriteAckOKJSON(w) +} + +func (alertAPI *AlertAPI) batchDisableChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + var channelIDs = []string{} + err := alertAPI.DecodeJSON(req, &channelIDs) + if err != nil { + log.Error(err) + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + if len(channelIDs) == 0 { + alertAPI.WriteJSON(w, util.MapStr{}, http.StatusOK) + return + } + if len(channelIDs) > 0 { + err = setChannelEnabled(false, channelIDs) + if err != nil { + log.Error(err) + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + } + alertAPI.WriteAckOKJSON(w) +} + +func setChannelEnabled(enabled bool, channelIDs []string) error { + q := util.MapStr{ + "query": util.MapStr{ + "terms": util.MapStr{ + "id": channelIDs, + }, + }, + "script": util.MapStr{ + "source": fmt.Sprintf("ctx._source['enabled'] = %v", enabled), + }, + } + err := orm.UpdateBy(alerting.Channel{}, util.MustToJSONBytes(q)) + return err } \ No newline at end of file diff --git a/plugin/api/alerting/message.go b/plugin/api/alerting/message.go index 59b23644..3a8d790a 100644 --- a/plugin/api/alerting/message.go +++ b/plugin/api/alerting/message.go @@ -6,6 +6,7 @@ package alerting import ( "fmt" + "github.com/buger/jsonparser" log "github.com/cihub/seelog" "infini.sh/console/model/alerting" alerting2 "infini.sh/console/service/alerting" @@ -25,6 +26,7 @@ func (h *AlertAPI) ignoreAlertMessage(w http.ResponseWriter, req *http.Request, body := struct { Messages []alerting.AlertMessage `json:"messages"` IgnoredReason string `json:"ignored_reason"` + IsReset bool `json:"is_reset"` }{} err := h.DecodeJSON(req, &body) if err != nil { @@ -41,27 +43,41 @@ func (h *AlertAPI) ignoreAlertMessage(w http.ResponseWriter, req *http.Request, messageIDs = append(messageIDs, msg.ID) } currentUser := h.GetCurrentUser(req) + must := []util.MapStr{ + { + "terms": util.MapStr{ + "_id": messageIDs, + }, + }, + } + var source string + if body.IsReset { + must = append(must, util.MapStr{ + "term": util.MapStr{ + "status": util.MapStr{ + "value": alerting.MessageStateIgnored, + }, + }, + }) + source = fmt.Sprintf("ctx._source['status'] = '%s'", alerting.MessageStateAlerting) + }else { + must = append(must, util.MapStr{ + "term": util.MapStr{ + "status": util.MapStr{ + "value": alerting.MessageStateAlerting, + }, + }, + }) + source = fmt.Sprintf("ctx._source['status'] = '%s';ctx._source['ignored_time']='%s';ctx._source['ignored_reason']='%s';ctx._source['ignored_user']='%s'", alerting.MessageStateIgnored, time.Now().Format(time.RFC3339Nano), body.IgnoredReason, currentUser) + } queryDsl := util.MapStr{ "query": util.MapStr{ "bool": util.MapStr{ - "must": []util.MapStr{ - { - "terms": util.MapStr{ - "_id": messageIDs, - }, - }, - { - "term": util.MapStr{ - "status": util.MapStr{ - "value": alerting.MessageStateAlerting, - }, - }, - }, - }, + "must": must, }, }, "script": util.MapStr{ - "source": fmt.Sprintf("ctx._source['status'] = '%s';ctx._source['ignored_time']='%s';ctx._source['ignored_reason']='%s';ctx._source['ignored_user']='%s'", alerting.MessageStateIgnored, time.Now().Format(time.RFC3339Nano), body.IgnoredReason, currentUser), + "source": source, }, } err = orm.UpdateBy(alerting.AlertMessage{}, util.MustToJSONBytes(queryDsl)) @@ -160,10 +176,53 @@ func (h *AlertAPI) getAlertMessageStats(w http.ResponseWriter, req *http.Request return } statusCounts[alerting.MessageStateIgnored] = countRes.Count + + queryDsl = util.MapStr{ + "size": 0, + "aggs": util.MapStr{ + "terms_by_category": util.MapStr{ + "terms": util.MapStr{ + "field": "category", + "size": 100, + }, + }, + "terms_by_tags": util.MapStr{ + "terms": util.MapStr{ + "field": "tags", + "size": 100, + }, + }, + }, + } + searchRes, err = esClient.SearchWithRawQueryDSL(indexName, util.MustToJSONBytes(queryDsl) ) + if err != nil { + h.WriteJSON(w, util.MapStr{ + "error": err.Error(), + }, http.StatusInternalServerError) + return + } + categories := []string{} + if termsAgg, ok := searchRes.Aggregations["terms_by_category"]; ok { + for _, bk := range termsAgg.Buckets { + if cate, ok := bk["key"].(string); ok { + categories = append(categories, cate) + } + } + } + tags := []string{} + if termsAgg, ok := searchRes.Aggregations["terms_by_tags"]; ok { + for _, bk := range termsAgg.Buckets { + if tag, ok := bk["key"].(string); ok { + tags = append(tags, tag) + } + } + } h.WriteJSON(w, util.MapStr{ "alert": util.MapStr{ "current": statusCounts, }, + "categories": categories, + "tags": tags, }, http.StatusOK) } @@ -182,6 +241,8 @@ func (h *AlertAPI) searchAlertMessage(w http.ResponseWriter, req *http.Request, max = h.GetParameterOrDefault(req, "max", "now") mustBuilder = &strings.Builder{} sortBuilder = strings.Builder{} + category = h.GetParameterOrDefault(req, "category", "") + tags = h.GetParameterOrDefault(req, "tags", "") ) mustBuilder.WriteString(fmt.Sprintf(`{"range":{"created":{"gte":"%s", "lte": "%s"}}}`, min, max)) if ruleID != "" { @@ -213,6 +274,14 @@ func (h *AlertAPI) searchAlertMessage(w http.ResponseWriter, req *http.Request, mustBuilder.WriteString(",") mustBuilder.WriteString(fmt.Sprintf(`{"term":{"priority":{"value":"%s"}}}`, priority)) } + if category != "" { + mustBuilder.WriteString(",") + mustBuilder.WriteString(fmt.Sprintf(`{"term":{"category":{"value":"%s"}}}`, category)) + } + if tags != "" { + mustBuilder.WriteString(",") + mustBuilder.WriteString(fmt.Sprintf(`{"term":{"tags":{"value":"%s"}}}`, tags)) + } size, _ := strconv.Atoi(strSize) if size <= 0 { size = 20 @@ -286,8 +355,12 @@ func (h *AlertAPI) getAlertMessage(w http.ResponseWriter, req *http.Request, ps return } metricExpression, _ := rule.Metrics.GenerateExpression() + var hitCondition string for i, cond := range rule.Conditions.Items { expression, _ := cond.GenerateConditionExpression() + if cond.Priority == message.Priority { + hitCondition = strings.ReplaceAll(expression, "result", "") + } rule.Conditions.Items[i].Expression = strings.ReplaceAll(expression, "result", metricExpression) } var duration time.Duration @@ -315,6 +388,202 @@ func (h *AlertAPI) getAlertMessage(w http.ResponseWriter, req *http.Request, ps "ignored_reason": message.IgnoredReason, "ignored_user": message.IgnoredUser, "status": message.Status, + "expression": rule.Metrics.Expression, + "hit_condition": hitCondition, } h.WriteJSON(w, detailObj, http.StatusOK) +} + +func (h *AlertAPI) getMessageNotificationInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + message := &alerting.AlertMessage{ + ID: ps.ByName("message_id"), + } + exists, err := orm.Get(message) + if !exists || err != nil { + log.Error(err) + h.WriteJSON(w, util.MapStr{ + "_id": message.ID, + "found": false, + }, http.StatusNotFound) + return + } + rule := &alerting.Rule{ + ID: message.RuleID, + } + exists, err = orm.Get(rule) + if !exists || err != nil { + log.Error(err) + h.WriteError(w, fmt.Sprintf("rule [%s] not found", rule.ID), http.StatusInternalServerError) + return + } + notificationInfo := util.MapStr{} + if rule.NotificationConfig == nil && rule.RecoveryNotificationConfig == nil { + notificationInfo["is_empty"] = true + h.WriteJSON(w, notificationInfo, http.StatusOK) + return + } + stats, err := getMessageNotificationStats(message) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + if rule.NotificationConfig != nil { + notificationInfo["alerting"] = util.MapStr{ + "accept_time_range": rule.NotificationConfig.AcceptTimeRange, + "throttle_period": rule.NotificationConfig.ThrottlePeriod, + "escalation_enabled": rule.NotificationConfig.EscalationEnabled, + "escalation_throttle_period": rule.NotificationConfig.EscalationThrottlePeriod, + "normal_stats": stats["normal"], + "escalation_stats": stats["escalation"], + } + } + if rule.RecoveryNotificationConfig != nil { + notificationInfo["recovery"] = util.MapStr{ + "stats": stats["recovery"], + } + } + h.WriteJSON(w, notificationInfo, http.StatusOK) +} + +func getMessageNotificationStats(msg *alerting.AlertMessage )(util.MapStr, error){ + rangeQ := util.MapStr{ + "gte": msg.Created.UnixMilli(), + } + if msg.Status == alerting.MessageStateRecovered { + rangeQ["lte"] = msg.Updated.UnixMilli() + } + aggs := util.MapStr{ + "grp_normal_channel": util.MapStr{ + "terms": util.MapStr{ + "field": "action_execution_results.channel_type", + "size": 20, + }, + "aggs": util.MapStr{ + "top": util.MapStr{ + "top_hits": util.MapStr{ + "sort": []util.MapStr{ + { + "created": util.MapStr{ + "order": "desc", + }, + }, + }, + "_source": util.MapStr{ + "includes": []string{"created", "action_execution_results.channel_name", "action_execution_results.channel_type"}, + }, + "size": 1, + }, + }, + }, + }, + "grp_escalation_channel": util.MapStr{ + "terms": util.MapStr{ + "field": "escalation_action_results.channel_type", + "size": 20, + }, + "aggs": util.MapStr{ + "top": util.MapStr{ + "top_hits": util.MapStr{ + "sort": []util.MapStr{ + { + "created": util.MapStr{ + "order": "desc", + }, + }, + }, + "_source": util.MapStr{ + "includes": []string{"created", "escalation_action_results.channel_name", "escalation_action_results.channel_type"}, + }, + "size": 1, + }, + }, + }, + }, + } + if msg.Status == alerting.MessageStateRecovered { + aggs["grp_recover_channel"] = util.MapStr{ + "terms": util.MapStr{ + "field": "recover_action_results.channel_type", + "size": 20, + }, + "aggs": util.MapStr{ + "top": util.MapStr{ + "top_hits": util.MapStr{ + "sort": []util.MapStr{ + { + "created": util.MapStr{ + "order": "desc", + }, + }, + }, + "_source": util.MapStr{ + "includes": []string{"created", "recover_action_results.channel_name", "recover_action_results.channel_type"}, + }, + "size": 1, + }, + }, + }, + } + } + query := util.MapStr{ + "size": 0, + "aggs": aggs, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "range": util.MapStr{ + "created": rangeQ, + }, + }, + { + "term": util.MapStr{ + "rule_id": util.MapStr{ + "value": msg.RuleID, + }, + }, + }, + }, + }, + }, + } + q := orm.Query{ + RawQuery: util.MustToJSONBytes(query), + } + err, result := orm.Search(alerting.Alert{}, &q) + if err != nil { + return nil, err + } + + var normalStats = extractStatsFromRaw(result.Raw, "grp_normal_channel", "action_execution_results") + var escalationStats = extractStatsFromRaw(result.Raw, "grp_escalation_channel", "escalation_action_results") + stats := util.MapStr{ + "normal": normalStats, + "escalation": escalationStats, + } + if msg.Status == alerting.MessageStateRecovered { + recoverStats := extractStatsFromRaw(result.Raw, "grp_recover_channel", "recover_action_results") + stats["recovery"] = recoverStats + } + + return stats, nil +} +func extractStatsFromRaw(searchRawRes []byte, grpKey string, actionKey string) []util.MapStr { + var stats []util.MapStr + jsonparser.ArrayEach(searchRawRes, func(value []byte, dataType jsonparser.ValueType, offset int, err error) { + statsItem := util.MapStr{} + statsItem["channel_type"], _ = jsonparser.GetString(value, "key") + statsItem["count"], _ = jsonparser.GetInt(value, "doc_count") + jsonparser.ArrayEach(value, func(v []byte, dataType jsonparser.ValueType, offset int, err error) { + ck, _ := jsonparser.GetString(v, "channel_type") + cn, _ := jsonparser.GetString(v, "channel_name") + if ck == statsItem["channel_type"] { + statsItem["channel_name"] = cn + } + }, "top", "hits","hits", "[0]", "_source",actionKey) + statsItem["last_time"], _ = jsonparser.GetString(value, "top", "hits","hits", "[0]", "_source","created") + stats = append(stats, statsItem) + }, "aggregations", grpKey, "buckets") + return stats } \ No newline at end of file diff --git a/plugin/api/alerting/rule.go b/plugin/api/alerting/rule.go index e9b18129..ba206702 100644 --- a/plugin/api/alerting/rule.go +++ b/plugin/api/alerting/rule.go @@ -190,7 +190,6 @@ func (alertAPI *AlertAPI) getRuleDetail(w http.ResponseWriter, req *http.Request return } queryDSL := util.MapStr{ - "_source": "state", "size": 1, "sort": []util.MapStr{ { @@ -200,9 +199,22 @@ func (alertAPI *AlertAPI) getRuleDetail(w http.ResponseWriter, req *http.Request }, }, "query": util.MapStr{ - "term": util.MapStr{ - "rule_id": util.MapStr{ - "value": obj.ID, + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "rule_id": util.MapStr{ + "value": obj.ID, + }, + }, + }, + { + "term": util.MapStr{ + "status": util.MapStr{ + "value": alerting.MessageStateAlerting, + }, + }, + }, }, }, }, @@ -211,18 +223,74 @@ func (alertAPI *AlertAPI) getRuleDetail(w http.ResponseWriter, req *http.Request WildcardIndex: true, RawQuery: util.MustToJSONBytes(queryDSL), } - err, result := orm.Search(alerting.Alert{}, q) + err, result := orm.Search(alerting.AlertMessage{}, q) if err != nil { log.Error(err) - alertAPI.WriteJSON(w, util.MapStr{ - "error": err.Error(), - }, http.StatusInternalServerError) + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) return } var state interface{} = "N/A" + var alertingMessageItem interface{} if len(result.Result) > 0 { + alertingMessageItem = result.Result[0] if resultM, ok := result.Result[0].(map[string]interface{}); ok { - state = resultM["state"] + state = resultM["status"] + } + } + var channelIDs []interface{} + if obj.NotificationConfig != nil { + for _, ch := range obj.NotificationConfig.Normal { + channelIDs = append(channelIDs, ch.ID) + } + for _, ch := range obj.NotificationConfig.Escalation { + channelIDs = append(channelIDs, ch.ID) + } + } + if obj.RecoveryNotificationConfig != nil { + for _, ch := range obj.RecoveryNotificationConfig.Normal { + channelIDs = append(channelIDs, ch.ID) + } + } + q = &orm.Query{ + Size: len(channelIDs), + } + q.Conds = append(q.Conds, orm.In("id", channelIDs)) + err, result = orm.Search(alerting.Channel{}, q) + if err != nil { + log.Error(err) + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + chm := map[string]alerting.Channel{} + for _, row := range result.Result { + buf := util.MustToJSONBytes(row) + ch := alerting.Channel{} + util.MustFromJSONBytes(buf, &ch) + chm[ch.ID] = ch + } + if obj.NotificationConfig != nil { + for i, ch := range obj.NotificationConfig.Normal { + if v, ok := chm[ch.ID]; ok { + obj.NotificationConfig.Normal[i].Enabled = v.Enabled && ch.Enabled + obj.NotificationConfig.Normal[i].Type = v.SubType + obj.NotificationConfig.Normal[i].Name = v.Name + } + } + for i, ch := range obj.NotificationConfig.Escalation { + if v, ok := chm[ch.ID]; ok { + obj.NotificationConfig.Escalation[i].Enabled = v.Enabled && ch.Enabled + obj.NotificationConfig.Escalation[i].Type = v.SubType + obj.NotificationConfig.Escalation[i].Name = v.Name + } + } + } + if obj.RecoveryNotificationConfig != nil { + for i, ch := range obj.RecoveryNotificationConfig.Normal { + if v, ok := chm[ch.ID]; ok { + obj.RecoveryNotificationConfig.Normal[i].Enabled = v.Enabled && ch.Enabled + obj.RecoveryNotificationConfig.Normal[i].Type = v.SubType + obj.RecoveryNotificationConfig.Normal[i].Name = v.Name + } } } @@ -231,12 +299,22 @@ func (alertAPI *AlertAPI) getRuleDetail(w http.ResponseWriter, req *http.Request "resource_name": obj.Resource.Name, "resource_id": obj.Resource.ID, "resource_objects": obj.Resource.Objects, + "resource_time_field": obj.Resource.TimeField, + "resource_raw_filter": obj.Resource.RawFilter, + "metrics": obj.Metrics, "bucket_size": obj.Metrics.BucketSize, //统计周期 "updated": obj.Updated, "conditions": obj.Conditions, "message_count": alertNumbers[obj.ID], //所有关联告警消息数(包括已恢复的) "state": state, "enabled": obj.Enabled, + "created": obj.Created, + "creator": obj.Creator, + "tags": obj.Tags, + "alerting_message": alertingMessageItem, + "expression": obj.Metrics.Expression, + "notification_config": obj.NotificationConfig, + "recovery_notification_config": obj.RecoveryNotificationConfig, } alertAPI.WriteJSON(w, detailObj, 200) @@ -418,7 +496,9 @@ func (alertAPI *AlertAPI) deleteRule(w http.ResponseWriter, req *http.Request, p delDsl := util.MapStr{ "query": util.MapStr{ "term": util.MapStr{ - "rule_id": id, + "rule_id": util.MapStr{ + "value": id, + }, }, }, } @@ -437,6 +517,69 @@ func (alertAPI *AlertAPI) deleteRule(w http.ResponseWriter, req *http.Request, p }, 200) } +func (alertAPI *AlertAPI) batchDeleteRule(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + var ruleIDs = []string{} + err := alertAPI.DecodeJSON(req, &ruleIDs) + if err != nil { + log.Error(err) + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + if len(ruleIDs) == 0 { + alertAPI.WriteJSON(w, util.MapStr{}, http.StatusOK) + return + } + rules, err := getRulesByID(ruleIDs) + if err != nil { + log.Error(err) + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + var newIDs []string + for _, rule := range rules { + saveAlertActivity("alerting_rule_change", "delete", util.MapStr{ + "cluster_id": rule.Resource.ID, + "rule_id": rule.ID, + "cluster_name": rule.Resource.Name, + },nil, &rule) + task.DeleteTask(rule.ID) + clearKV(rule.ID) + newIDs = append(newIDs, rule.ID) + } + if len(newIDs) > 0 { + q := util.MapStr{ + "query": util.MapStr{ + "terms": util.MapStr{ + "id": newIDs, + }, + }, + } + err = orm.DeleteBy(alerting.Rule{}, util.MustToJSONBytes(q)) + if err != nil { + log.Error(err) + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + delDsl := util.MapStr{ + "query": util.MapStr{ + "terms": util.MapStr{ + "rule_id": newIDs, + }, + }, + } + err = orm.DeleteBy(alerting.AlertMessage{}, util.MustToJSONBytes(delDsl)) + if err != nil { + log.Error(err) + } + err = orm.DeleteBy(alerting.Alert{}, util.MustToJSONBytes(delDsl)) + if err != nil { + log.Error(err) + } + } + alertAPI.WriteAckOKJSON(w) +} + func (alertAPI *AlertAPI) searchRule(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { var ( keyword = alertAPI.GetParameterOrDefault(req, "keyword", "") @@ -592,9 +735,7 @@ func (alertAPI *AlertAPI) fetchAlertInfos(w http.ResponseWriter, req *http.Reque searchRes, err := esClient.SearchWithRawQueryDSL(orm.GetWildcardIndexName(alerting.Alert{}), util.MustToJSONBytes(queryDsl) ) if err != nil { log.Error(err) - alertAPI.WriteJSON(w, util.MapStr{ - "error": err.Error(), - }, http.StatusInternalServerError) + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) return } if len(searchRes.Hits.Hits) == 0 { @@ -609,6 +750,50 @@ func (alertAPI *AlertAPI) fetchAlertInfos(w http.ResponseWriter, req *http.Reque "status": hit.Source["state"], } } + } + queryDsl = util.MapStr{ + "_source": []string{"created", "rule_id"}, + "sort": []util.MapStr{ + { + "created": util.MapStr{ + "order": "desc", + }, + }, + }, + "collapse": util.MapStr{ + "field": "rule_id", + }, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "terms": util.MapStr{ + "rule_id": ruleIDs, + }, + }, + { + "term": util.MapStr{ + "state": util.MapStr{ + "value": alerting.AlertStateAlerting, + }, + }, + }, + }, + }, + }, + } + searchRes, err = esClient.SearchWithRawQueryDSL(orm.GetWildcardIndexName(alerting.Alert{}), util.MustToJSONBytes(queryDsl) ) + if err != nil { + log.Error(err) + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + for _, hit := range searchRes.Hits.Hits { + if ruleID, ok := hit.Source["rule_id"].(string); ok { + if _, ok = latestAlertInfos[ruleID]; ok { + latestAlertInfos[ruleID]["last_notification_time"] = hit.Source["created"] + } + } } alertAPI.WriteJSON(w, latestAlertInfos, http.StatusOK) @@ -636,20 +821,9 @@ func (alertAPI *AlertAPI) enableRule(w http.ResponseWriter, req *http.Request, p return } if reqObj.Enabled { - eng := alerting2.GetEngine(obj.Resource.Type) - ruleTask := task.ScheduleTask{ - ID: obj.ID, - Interval: obj.Schedule.Interval, - Description: obj.Metrics.Expression, - Task: eng.GenerateTask(obj), - } - task.DeleteTask(ruleTask.ID) - clearKV(ruleTask.ID) - task.RegisterScheduleTask(ruleTask) - task.StartTask(ruleTask.ID) - }else{ - task.DeleteTask(id) - clearKV(id) + enableRule(&obj) + } else { + disableRule(&obj) } obj.Enabled = reqObj.Enabled err = orm.Save(nil, obj) @@ -660,10 +834,28 @@ func (alertAPI *AlertAPI) enableRule(w http.ResponseWriter, req *http.Request, p } alertAPI.WriteJSON(w, util.MapStr{ "result": "updated", - "_id": id, + "_id": id, }, http.StatusOK) } +func enableRule(obj *alerting.Rule) { + eng := alerting2.GetEngine(obj.Resource.Type) + ruleTask := task.ScheduleTask{ + ID: obj.ID, + Interval: obj.Schedule.Interval, + Description: obj.Metrics.Expression, + Task: eng.GenerateTask(*obj), + } + task.DeleteTask(ruleTask.ID) + clearKV(ruleTask.ID) + task.RegisterScheduleTask(ruleTask) + task.StartTask(ruleTask.ID) +} +func disableRule(obj *alerting.Rule) { + task.DeleteTask(obj.ID) + clearKV(obj.ID) +} + func (alertAPI *AlertAPI) sendTestMessage(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { typ := alertAPI.GetParameterOrDefault(req, "type", "notification") rule := alerting.Rule{} @@ -917,4 +1109,183 @@ func getRuleMetricData( rule *alerting.Rule, filterParam *alerting.FilterParam) }) } return &metricItem,queryResult, nil +} + +func (alertAPI *AlertAPI) batchEnableRule(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + var ruleIDs = []string{} + err := alertAPI.DecodeJSON(req, &ruleIDs) + if err != nil { + log.Error(err) + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + if len(ruleIDs) == 0 { + alertAPI.WriteJSON(w, util.MapStr{}, http.StatusOK) + return + } + rules, err := getRulesByID(ruleIDs) + if err != nil { + log.Error(err) + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + var newIDs []string + for _, rule := range rules { + if !rule.Enabled { + enableRule(&rule) + newIDs = append(newIDs, rule.ID) + } + } + if len(newIDs) > 0 { + q := util.MapStr{ + "query": util.MapStr{ + "terms": util.MapStr{ + "id": newIDs, + }, + }, + "script": util.MapStr{ + "source": fmt.Sprintf("ctx._source['enabled'] = %v", true), + }, + } + err = orm.UpdateBy(alerting.Rule{}, util.MustToJSONBytes(q)) + if err != nil { + log.Error(err) + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + } + alertAPI.WriteAckOKJSON(w) +} + +func (alertAPI *AlertAPI) batchDisableRule(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + var ruleIDs = []string{} + err := alertAPI.DecodeJSON(req, &ruleIDs) + if err != nil { + log.Error(err) + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + if len(ruleIDs) == 0 { + alertAPI.WriteJSON(w, util.MapStr{}, http.StatusOK) + return + } + rules, err := getRulesByID(ruleIDs) + if err != nil { + log.Error(err) + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + var newIDs []string + for _, rule := range rules { + if rule.Enabled { + disableRule(&rule) + newIDs = append(newIDs, rule.ID) + } + } + if len(newIDs) > 0 { + q := util.MapStr{ + "query": util.MapStr{ + "terms": util.MapStr{ + "id": newIDs, + }, + }, + "script": util.MapStr{ + "source": fmt.Sprintf("ctx._source['enabled'] = %v", false), + }, + } + log.Info(util.MustToJSON(q)) + err = orm.UpdateBy(alerting.Rule{}, util.MustToJSONBytes(q)) + if err != nil { + log.Error(err) + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + } + alertAPI.WriteAckOKJSON(w) +} + +func (alertAPI *AlertAPI) searchFieldValues(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + var keyword = alertAPI.GetParameterOrDefault(req, "keyword", "") + var field = alertAPI.GetParameterOrDefault(req, "field", "category") + items , err := searchListItems(field, keyword, 20) + if err != nil { + log.Error(err) + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + alertAPI.WriteJSON(w, items, http.StatusOK) +} + +func searchListItems(field, keyword string, size int) ([]string, error){ + query := util.MapStr{ + "size": 0, + "aggs": util.MapStr{ + "items": util.MapStr{ + "terms": util.MapStr{ + "field": field, + "size": size, + }, + }, + }, + } + if v := strings.TrimSpace(keyword); v != ""{ + query["query"]= util.MapStr{ + "query_string": util.MapStr{ + "default_field": field, + "query": fmt.Sprintf("*%s*", v), + }, + } + } + q := orm.Query{ + RawQuery: util.MustToJSONBytes(query), + } + err, result := orm.Search(alerting.Rule{}, &q) + if err != nil { + return nil, err + } + searchRes := elastic.SearchResponse{} + err = util.FromJSONBytes(result.Raw, &searchRes) + if err != nil { + return nil, err + } + items := []string{} + for _, bk := range searchRes.Aggregations["items"].Buckets { + if v, ok := bk["key"].(string); ok { + if strings.Contains(v, keyword){ + items = append(items, v) + } + } + } + return items, nil +} + +func getRulesByID(ruleIDs []string) ([]alerting.Rule, error){ + if len(ruleIDs) == 0 { + return nil, nil + } + query := util.MapStr{ + "size": len(ruleIDs), + "query": util.MapStr{ + "terms": util.MapStr{ + "id": ruleIDs, + }, + }, + } + q := &orm.Query{ + RawQuery: util.MustToJSONBytes(query), + } + err, result := orm.Search(alerting.Rule{}, q) + if err != nil { + return nil, err + } + var rules []alerting.Rule + for _, row := range result.Result { + buf := util.MustToJSONBytes(row) + rule := alerting.Rule{} + util.MustFromJSONBytes(buf, &rule) + rules = append(rules, rule) + } + return rules, nil } \ No newline at end of file diff --git a/plugin/api/data/export.go b/plugin/api/data/export.go index dea5b5d0..36f415f8 100644 --- a/plugin/api/data/export.go +++ b/plugin/api/data/export.go @@ -25,7 +25,7 @@ func (h *DataAPI) exportData(w http.ResponseWriter, req *http.Request, ps httpro } var resBody []ExportData for _, meta := range reqBody.Metadatas { - result, err := getExportData(meta.Type) + result, err := getExportData(meta) if err != nil { log.Error(err) h.WriteError(w, err.Error(), http.StatusInternalServerError) @@ -86,9 +86,9 @@ func indexExportData(eds []ExportData) error { return nil } -func getExportData(typ string) (*orm.Result, error) { +func getExportData(meta ExportMetadata) (*orm.Result, error) { var obj interface{} - switch typ { + switch meta.Type { case DataTypeAlertChannel: obj = alerting.Channel{} case DataTypeAlertRule: @@ -96,11 +96,19 @@ func getExportData(typ string) (*orm.Result, error) { case DataTypeAlertEmailServer: obj = model.EmailServer{} default: - return nil, fmt.Errorf("unkonw data type: %s", typ) + return nil, fmt.Errorf("unkonw data type: %s", meta.Type) } - err, result := orm.Search(obj, &orm.Query{ + q := &orm.Query{ Size: 1000, - }) + } + if meta.Filter != nil { + query := util.MapStr{ + "size": 1000, + "query": meta.Filter, + } + q.RawQuery = util.MustToJSONBytes(query) + } + err, result := orm.Search(obj, q) if err != nil { return nil, err } diff --git a/plugin/api/data/model.go b/plugin/api/data/model.go index c87eae0f..1635285f 100644 --- a/plugin/api/data/model.go +++ b/plugin/api/data/model.go @@ -10,6 +10,7 @@ type ExportDataRequest struct { type ExportMetadata struct { Type string `json:"type"` + Filter interface{} `json:"filter,omitempty"` } type ExportData struct { diff --git a/plugin/api/insight/api.go b/plugin/api/insight/api.go index 8ae2a96e..27b232be 100644 --- a/plugin/api/insight/api.go +++ b/plugin/api/insight/api.go @@ -28,4 +28,6 @@ func InitAPI() { api.HandleAPIMethod(api.DELETE, "/insight/dashboard/:dashboard_id", insight.deleteDashboard) api.HandleAPIMethod(api.GET, "/insight/dashboard/_search", insight.searchDashboard) api.HandleAPIMethod(api.POST, "/elasticsearch/:id/map_label/_render", insight.renderMapLabelTemplate) + api.HandleAPIMethod(api.GET, "/insight/widget/:widget_id", insight.getWidget) + api.HandleAPIMethod(api.POST, "/insight/widget", insight.RequireLogin(insight.createWidget)) } diff --git a/plugin/api/insight/widget.go b/plugin/api/insight/widget.go new file mode 100644 index 00000000..1bc60e70 --- /dev/null +++ b/plugin/api/insight/widget.go @@ -0,0 +1,51 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package insight + +import ( + log "github.com/cihub/seelog" + "infini.sh/console/model/insight" + httprouter "infini.sh/framework/core/api/router" + "infini.sh/framework/core/orm" + "infini.sh/framework/core/util" + "net/http" +) + +func (h *InsightAPI) createWidget(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + var obj = &insight.Widget{} + err := h.DecodeJSON(req, obj) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + err = orm.Create(nil, obj) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + + h.WriteCreatedOKJSON(w, obj.ID) + +} + +func (h *InsightAPI) getWidget(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + id := ps.MustGetParameter("widget_id") + + obj := insight.Widget{} + obj.ID = id + + exists, err := orm.Get(&obj) + if !exists || err != nil { + h.WriteJSON(w, util.MapStr{ + "_id": id, + "found": false, + }, http.StatusNotFound) + return + } + + h.WriteGetOKJSON(w, id, obj) +} diff --git a/service/alerting/elasticsearch/engine.go b/service/alerting/elasticsearch/engine.go index a981c7e9..a6cfc647 100644 --- a/service/alerting/elasticsearch/engine.go +++ b/service/alerting/elasticsearch/engine.go @@ -699,7 +699,10 @@ func (engine *Engine) Do(rule *alerting.Rule) error { return err } actionResults, _ := performChannels(recoverCfg.Normal, paramsCtx, false) - alertItem.ActionExecutionResults = actionResults + alertItem.RecoverActionResults = actionResults + //clear history notification time + _ = kv.DeleteKey(alerting2.KVLastNotificationTime, []byte(rule.ID)) + _ = kv.DeleteKey(alerting2.KVLastEscalationTime, []byte(rule.ID)) } } return nil @@ -745,6 +748,8 @@ func (engine *Engine) Do(rule *alerting.Rule) error { Priority: priority, Title: alertItem.Title, Message: alertItem.Message, + Tags: rule.Tags, + Category: rule.Category, } err = saveAlertMessage(msg) if err != nil { @@ -849,15 +854,13 @@ func (engine *Engine) Do(rule *alerting.Rule) error { } } if time.Now().Sub(rule.LastEscalationTime.Local()) > periodDuration { - actionResults, errCount := performChannels(notifyCfg.Escalation, paramsCtx, false) - alertItem.ActionExecutionResults = actionResults + actionResults, _ := performChannels(notifyCfg.Escalation, paramsCtx, false) + alertItem.EscalationActionResults = actionResults //todo init last escalation time when create task (by last alert item is escalated) - if errCount == 0 { - rule.LastEscalationTime = time.Now() - alertItem.IsEscalated = true - strTime := rule.LastEscalationTime.UTC().Format(time.RFC3339) - kv.AddValue(alerting2.KVLastEscalationTime, []byte(rule.ID), []byte(strTime)) - } + rule.LastEscalationTime = time.Now() + alertItem.IsEscalated = true + strTime := rule.LastEscalationTime.UTC().Format(time.RFC3339) + kv.AddValue(alerting2.KVLastEscalationTime, []byte(rule.ID), []byte(strTime)) } } @@ -1061,8 +1064,9 @@ func performChannels(channels []alerting.Channel, ctx map[string]interface{}, ra Error: errStr, Message: string(messageBytes), ExecutionTime: int(time.Now().UnixNano()/1e6), - ChannelType: channel.Type, + ChannelType: channel.SubType, ChannelName: channel.Name, + ChannelID: channel.ID, }) } return actionResults, errCount