add batch operation api

This commit is contained in:
liugq 2023-08-18 16:01:28 +08:00
parent 14f391b58b
commit 9e6a1019fe
7 changed files with 405 additions and 28 deletions

View File

@ -22,7 +22,9 @@ type Alert struct {
Title string `json:"title" elastic_mapping:"title: { type: keyword }"`
Message string `json:"message" elastic_mapping:"context: { type: keyword, copy_to:search_text }"`
AcknowledgedTime interface{} `json:"acknowledged_time,omitempty"`
ActionExecutionResults []ActionExecutionResult `json:"action_execution_results"`
ActionExecutionResults []ActionExecutionResult `json:"action_execution_results,omitempty"`
RecoverActionResults []ActionExecutionResult `json:"recover_action_results,omitempty"`
EscalationActionResults []ActionExecutionResult `json:"escalation_action_results,omitempty"`
Users []string `json:"users,omitempty"`
State string `json:"state"`
Error string `json:"error,omitempty"`

View File

@ -27,6 +27,8 @@ func (alert *AlertAPI) Init() {
api.HandleAPIMethod(api.POST, "/alerting/rule/:rule_id/_enable", alert.RequirePermission(alert.enableRule, enum.PermissionAlertRuleWrite))
api.HandleAPIMethod(api.GET, "/alerting/rule/:rule_id/metric", alert.RequirePermission(alert.getMetricData, enum.PermissionAlertRuleRead))
api.HandleAPIMethod(api.GET, "/alerting/rule/:rule_id/info", alert.RequirePermission(alert.getRuleDetail, enum.PermissionAlertRuleRead, enum.PermissionAlertMessageRead))
api.HandleAPIMethod(api.POST, "/alerting/rule/_enable", alert.RequirePermission(alert.batchEnableRule, enum.PermissionAlertRuleWrite))
api.HandleAPIMethod(api.POST, "/alerting/rule/_disable", alert.RequirePermission(alert.batchDisableRule, enum.PermissionAlertRuleWrite))
api.HandleAPIMethod(api.GET, "/alerting/channel/:channel_id", alert.RequirePermission(alert.getChannel, enum.PermissionAlertChannelRead))
api.HandleAPIMethod(api.POST, "/alerting/channel", alert.RequirePermission(alert.createChannel, enum.PermissionAlertChannelWrite))
@ -43,6 +45,7 @@ func (alert *AlertAPI) Init() {
api.HandleAPIMethod(api.POST, "/alerting/message/_ignore", alert.RequirePermission(alert.ignoreAlertMessage, enum.PermissionAlertMessageWrite))
api.HandleAPIMethod(api.GET, "/alerting/message/_stats", alert.RequirePermission(alert.getAlertMessageStats, enum.PermissionAlertMessageRead))
api.HandleAPIMethod(api.GET, "/alerting/message/:message_id", alert.RequirePermission(alert.getAlertMessage, enum.PermissionAlertMessageRead))
api.HandleAPIMethod(api.GET, "/alerting/message/:message_id/notification", alert.getMessageNotificationInfo)
//just for test

View File

@ -6,6 +6,7 @@ package alerting
import (
"fmt"
"github.com/buger/jsonparser"
log "github.com/cihub/seelog"
"infini.sh/console/model/alerting"
alerting2 "infini.sh/console/service/alerting"
@ -391,4 +392,192 @@ func (h *AlertAPI) getAlertMessage(w http.ResponseWriter, req *http.Request, ps
"hit_condition": hitCondition,
}
h.WriteJSON(w, detailObj, http.StatusOK)
}
func (h *AlertAPI) getMessageNotificationInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
message := &alerting.AlertMessage{
ID: ps.ByName("message_id"),
}
exists, err := orm.Get(message)
if !exists || err != nil {
log.Error(err)
h.WriteJSON(w, util.MapStr{
"_id": message.ID,
"found": false,
}, http.StatusNotFound)
return
}
rule := &alerting.Rule{
ID: message.RuleID,
}
exists, err = orm.Get(rule)
if !exists || err != nil {
log.Error(err)
h.WriteError(w, fmt.Sprintf("rule [%s] not found", rule.ID), http.StatusInternalServerError)
return
}
notificationInfo := util.MapStr{}
if rule.NotificationConfig == nil && rule.RecoveryNotificationConfig == nil {
notificationInfo["is_empty"] = true
h.WriteJSON(w, notificationInfo, http.StatusOK)
return
}
stats, err := getMessageNotificationStats(message)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
if rule.NotificationConfig != nil {
notificationInfo["alerting"] = util.MapStr{
"accept_time_range": rule.NotificationConfig.AcceptTimeRange,
"throttle_period": rule.NotificationConfig.ThrottlePeriod,
"escalation_enabled": rule.NotificationConfig.EscalationEnabled,
"escalation_throttle_period": rule.NotificationConfig.EscalationThrottlePeriod,
"normal_stats": stats["normal"],
"escalation_stats": stats["escalation"],
}
}
if rule.RecoveryNotificationConfig != nil {
notificationInfo["recovery"] = util.MapStr{
"stats": stats["recovery"],
}
}
h.WriteJSON(w, notificationInfo, http.StatusOK)
}
func getMessageNotificationStats(msg *alerting.AlertMessage )(util.MapStr, error){
rangeQ := util.MapStr{
"gte": msg.Created.UnixMilli(),
}
if msg.Status == alerting.MessageStateRecovered {
rangeQ["lte"] = msg.Updated.UnixMilli()
}
aggs := util.MapStr{
"grp_normal_channel": util.MapStr{
"terms": util.MapStr{
"field": "action_execution_results.channel_type",
"size": 20,
},
"aggs": util.MapStr{
"top": util.MapStr{
"top_hits": util.MapStr{
"sort": []util.MapStr{
{
"created": util.MapStr{
"order": "desc",
},
},
},
"_source": util.MapStr{
"includes": []string{"created", "action_execution_results.channel_name"},
},
"size": 1,
},
},
},
},
"grp_escalation_channel": util.MapStr{
"terms": util.MapStr{
"field": "escalation_action_results.channel_type",
"size": 20,
},
"aggs": util.MapStr{
"top": util.MapStr{
"top_hits": util.MapStr{
"sort": []util.MapStr{
{
"created": util.MapStr{
"order": "desc",
},
},
},
"_source": util.MapStr{
"includes": []string{"created", "escalation_action_results.channel_name"},
},
"size": 1,
},
},
},
},
}
if msg.Status == alerting.MessageStateRecovered {
aggs["grp_recover_channel"] = util.MapStr{
"terms": util.MapStr{
"field": "recover_action_results.channel_type",
"size": 20,
},
"aggs": util.MapStr{
"top": util.MapStr{
"top_hits": util.MapStr{
"sort": []util.MapStr{
{
"created": util.MapStr{
"order": "desc",
},
},
},
"_source": util.MapStr{
"includes": []string{"created", "recover_action_results.channel_name"},
},
"size": 1,
},
},
},
}
}
query := util.MapStr{
"size": 0,
"aggs": aggs,
"query": util.MapStr{
"bool": util.MapStr{
"must": []util.MapStr{
{
"range": util.MapStr{
"created": rangeQ,
},
},
{
"term": util.MapStr{
"rule_id": util.MapStr{
"value": msg.RuleID,
},
},
},
},
},
},
}
q := orm.Query{
RawQuery: util.MustToJSONBytes(query),
}
err, result := orm.Search(alerting.Alert{}, &q)
if err != nil {
return nil, err
}
var normalStats = extractStatsFromRaw(result.Raw, "grp_normal_channel", "action_execution_results")
var escalationStats = extractStatsFromRaw(result.Raw, "grp_escalation_channel", "escalation_action_results")
stats := util.MapStr{
"normal": normalStats,
"escalation": escalationStats,
}
if msg.Status == alerting.MessageStateRecovered {
recoverStats := extractStatsFromRaw(result.Raw, "grp_recover_channel", "recover_action_results")
stats["recovery"] = recoverStats
}
return stats, nil
}
func extractStatsFromRaw(searchRawRes []byte, grpKey string, actionKey string) []util.MapStr {
var stats []util.MapStr
jsonparser.ArrayEach(searchRawRes, func(value []byte, dataType jsonparser.ValueType, offset int, err error) {
statsItem := util.MapStr{}
statsItem["channel_type"], _ = jsonparser.GetString(value, "key")
statsItem["count"], _ = jsonparser.GetInt(value, "doc_count")
statsItem["channel_name"], _ = jsonparser.GetString(value, "top", "hits","hits", "[0]", "_source",actionKey, "[0]", "channel_name")
statsItem["last_time"], _ = jsonparser.GetString(value, "top", "hits","hits", "[0]", "_source","created")
stats = append(stats, statsItem)
}, "aggregations", grpKey, "buckets")
return stats
}

View File

@ -592,9 +592,7 @@ func (alertAPI *AlertAPI) fetchAlertInfos(w http.ResponseWriter, req *http.Reque
searchRes, err := esClient.SearchWithRawQueryDSL(orm.GetWildcardIndexName(alerting.Alert{}), util.MustToJSONBytes(queryDsl) )
if err != nil {
log.Error(err)
alertAPI.WriteJSON(w, util.MapStr{
"error": err.Error(),
}, http.StatusInternalServerError)
alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
if len(searchRes.Hits.Hits) == 0 {
@ -609,6 +607,50 @@ func (alertAPI *AlertAPI) fetchAlertInfos(w http.ResponseWriter, req *http.Reque
"status": hit.Source["state"],
}
}
}
queryDsl = util.MapStr{
"_source": []string{"created", "rule_id"},
"sort": []util.MapStr{
{
"created": util.MapStr{
"order": "desc",
},
},
},
"collapse": util.MapStr{
"field": "rule_id",
},
"query": util.MapStr{
"bool": util.MapStr{
"must": []util.MapStr{
{
"terms": util.MapStr{
"rule_id": ruleIDs,
},
},
{
"term": util.MapStr{
"state": util.MapStr{
"value": alerting.AlertStateAlerting,
},
},
},
},
},
},
}
searchRes, err = esClient.SearchWithRawQueryDSL(orm.GetWildcardIndexName(alerting.Alert{}), util.MustToJSONBytes(queryDsl) )
if err != nil {
log.Error(err)
alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
for _, hit := range searchRes.Hits.Hits {
if ruleID, ok := hit.Source["rule_id"].(string); ok {
if _, ok = latestAlertInfos[ruleID]; ok {
latestAlertInfos[ruleID]["last_notification_time"] = hit.Source["created"]
}
}
}
alertAPI.WriteJSON(w, latestAlertInfos, http.StatusOK)
@ -636,20 +678,9 @@ func (alertAPI *AlertAPI) enableRule(w http.ResponseWriter, req *http.Request, p
return
}
if reqObj.Enabled {
eng := alerting2.GetEngine(obj.Resource.Type)
ruleTask := task.ScheduleTask{
ID: obj.ID,
Interval: obj.Schedule.Interval,
Description: obj.Metrics.Expression,
Task: eng.GenerateTask(obj),
}
task.DeleteTask(ruleTask.ID)
clearKV(ruleTask.ID)
task.RegisterScheduleTask(ruleTask)
task.StartTask(ruleTask.ID)
}else{
task.DeleteTask(id)
clearKV(id)
enableRule(&obj)
} else {
disableRule(&obj)
}
obj.Enabled = reqObj.Enabled
err = orm.Save(nil, obj)
@ -660,10 +691,28 @@ func (alertAPI *AlertAPI) enableRule(w http.ResponseWriter, req *http.Request, p
}
alertAPI.WriteJSON(w, util.MapStr{
"result": "updated",
"_id": id,
"_id": id,
}, http.StatusOK)
}
func enableRule(obj *alerting.Rule) {
eng := alerting2.GetEngine(obj.Resource.Type)
ruleTask := task.ScheduleTask{
ID: obj.ID,
Interval: obj.Schedule.Interval,
Description: obj.Metrics.Expression,
Task: eng.GenerateTask(*obj),
}
task.DeleteTask(ruleTask.ID)
clearKV(ruleTask.ID)
task.RegisterScheduleTask(ruleTask)
task.StartTask(ruleTask.ID)
}
func disableRule(obj *alerting.Rule) {
task.DeleteTask(obj.ID)
clearKV(obj.ID)
}
func (alertAPI *AlertAPI) sendTestMessage(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
typ := alertAPI.GetParameterOrDefault(req, "type", "notification")
rule := alerting.Rule{}
@ -917,4 +966,129 @@ func getRuleMetricData( rule *alerting.Rule, filterParam *alerting.FilterParam)
})
}
return &metricItem,queryResult, nil
}
func (alertAPI *AlertAPI) batchEnableRule(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
var ruleIDs = []string{}
err := alertAPI.DecodeJSON(req, &ruleIDs)
if err != nil {
log.Error(err)
alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
if len(ruleIDs) == 0 {
alertAPI.WriteJSON(w, util.MapStr{}, http.StatusOK)
return
}
rules, err := getRulesByID(ruleIDs)
if err != nil {
log.Error(err)
alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
var newIDs []string
for _, rule := range rules {
if !rule.Enabled {
enableRule(&rule)
newIDs = append(newIDs, rule.ID)
}
}
if len(newIDs) > 0 {
q := util.MapStr{
"query": util.MapStr{
"terms": util.MapStr{
"id": newIDs,
},
},
"script": util.MapStr{
"source": fmt.Sprintf("ctx._source['enabled'] = %v", true),
},
}
err = orm.UpdateBy(alerting.Rule{}, util.MustToJSONBytes(q))
if err != nil {
log.Error(err)
alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
}
alertAPI.WriteAckOKJSON(w)
}
func (alertAPI *AlertAPI) batchDisableRule(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
var ruleIDs = []string{}
err := alertAPI.DecodeJSON(req, &ruleIDs)
if err != nil {
log.Error(err)
alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
if len(ruleIDs) == 0 {
alertAPI.WriteJSON(w, util.MapStr{}, http.StatusOK)
return
}
rules, err := getRulesByID(ruleIDs)
if err != nil {
log.Error(err)
alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
log.Info(rules)
var newIDs []string
for _, rule := range rules {
if rule.Enabled {
disableRule(&rule)
newIDs = append(newIDs, rule.ID)
}
}
if len(newIDs) > 0 {
q := util.MapStr{
"query": util.MapStr{
"terms": util.MapStr{
"id": newIDs,
},
},
"script": util.MapStr{
"source": fmt.Sprintf("ctx._source['enabled'] = %v", false),
},
}
log.Info(util.MustToJSON(q))
err = orm.UpdateBy(alerting.Rule{}, util.MustToJSONBytes(q))
if err != nil {
log.Error(err)
alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
}
alertAPI.WriteAckOKJSON(w)
}
func getRulesByID(ruleIDs []string) ([]alerting.Rule, error){
if len(ruleIDs) == 0 {
return nil, nil
}
query := util.MapStr{
"size": len(ruleIDs),
"query": util.MapStr{
"terms": util.MapStr{
"id": ruleIDs,
},
},
}
q := &orm.Query{
RawQuery: util.MustToJSONBytes(query),
}
err, result := orm.Search(alerting.Rule{}, q)
if err != nil {
return nil, err
}
var rules []alerting.Rule
for _, row := range result.Result {
buf := util.MustToJSONBytes(row)
rule := alerting.Rule{}
util.MustFromJSONBytes(buf, &rule)
rules = append(rules, rule)
}
return rules, nil
}

View File

@ -25,7 +25,7 @@ func (h *DataAPI) exportData(w http.ResponseWriter, req *http.Request, ps httpro
}
var resBody []ExportData
for _, meta := range reqBody.Metadatas {
result, err := getExportData(meta.Type)
result, err := getExportData(meta)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
@ -86,9 +86,9 @@ func indexExportData(eds []ExportData) error {
return nil
}
func getExportData(typ string) (*orm.Result, error) {
func getExportData(meta ExportMetadata) (*orm.Result, error) {
var obj interface{}
switch typ {
switch meta.Type {
case DataTypeAlertChannel:
obj = alerting.Channel{}
case DataTypeAlertRule:
@ -96,11 +96,19 @@ func getExportData(typ string) (*orm.Result, error) {
case DataTypeAlertEmailServer:
obj = model.EmailServer{}
default:
return nil, fmt.Errorf("unkonw data type: %s", typ)
return nil, fmt.Errorf("unkonw data type: %s", meta.Type)
}
err, result := orm.Search(obj, &orm.Query{
q := &orm.Query{
Size: 1000,
})
}
if meta.Filter != nil {
query := util.MapStr{
"size": 1000,
"query": meta.Filter,
}
q.RawQuery = util.MustToJSONBytes(query)
}
err, result := orm.Search(obj, q)
if err != nil {
return nil, err
}

View File

@ -10,6 +10,7 @@ type ExportDataRequest struct {
type ExportMetadata struct {
Type string `json:"type"`
Filter interface{} `json:"filter,omitempty"`
}
type ExportData struct {

View File

@ -699,7 +699,7 @@ func (engine *Engine) Do(rule *alerting.Rule) error {
return err
}
actionResults, _ := performChannels(recoverCfg.Normal, paramsCtx, false)
alertItem.ActionExecutionResults = actionResults
alertItem.RecoverActionResults = actionResults
}
}
return nil
@ -852,7 +852,7 @@ func (engine *Engine) Do(rule *alerting.Rule) error {
}
if time.Now().Sub(rule.LastEscalationTime.Local()) > periodDuration {
actionResults, errCount := performChannels(notifyCfg.Escalation, paramsCtx, false)
alertItem.ActionExecutionResults = actionResults
alertItem.EscalationActionResults = actionResults
//todo init last escalation time when create task (by last alert item is escalated)
if errCount == 0 {
rule.LastEscalationTime = time.Now()
@ -1063,7 +1063,7 @@ func performChannels(channels []alerting.Channel, ctx map[string]interface{}, ra
Error: errStr,
Message: string(messageBytes),
ExecutionTime: int(time.Now().UnixNano()/1e6),
ChannelType: channel.Type,
ChannelType: channel.SubType,
ChannelName: channel.Name,
})
}