[api] add list notification api (#35)

[api] add list notification api

Co-authored-by: Kassian Sun <kassiansun@outlook.com>
Co-authored-by: liugq <silenceqi@hotmail.com>
Co-authored-by: medcl <m@medcl.net>
Co-authored-by: silenceqi <silenceqi@infini.ltd>
This commit is contained in:
sunjiacheng 2023-04-01 18:52:02 +08:00 committed by medcl
parent 1a645a655a
commit 55ac534b02
6 changed files with 208 additions and 27 deletions

View File

@ -4,11 +4,14 @@ import (
"context"
"errors"
_ "expvar"
_ "time/tzdata"
log "github.com/cihub/seelog"
"infini.sh/console/config"
"infini.sh/console/model"
"infini.sh/console/model/alerting"
_ "infini.sh/console/plugin"
setup1 "infini.sh/console/plugin/setup"
alerting2 "infini.sh/console/service/alerting"
"infini.sh/framework"
"infini.sh/framework/core/elastic"
@ -31,11 +34,8 @@ import (
"infini.sh/framework/modules/task"
"infini.sh/framework/modules/ui"
_ "infini.sh/framework/plugins"
setup1 "infini.sh/console/plugin/setup"
_ "infini.sh/console/plugin"
api2 "infini.sh/gateway/api"
_ "infini.sh/gateway/proxy"
_ "time/tzdata"
)
var appConfig *config.AppConfig
@ -117,7 +117,6 @@ func main() {
var initFunc = func() {
elastic2.InitTemplate(false)
//orm.RegisterSchemaWithIndexName(model.Dict{}, "dict")
@ -134,6 +133,7 @@ func main() {
orm.RegisterSchemaWithIndexName(task1.Task{}, "task")
orm.RegisterSchemaWithIndexName(task1.Log{}, "task-log")
orm.RegisterSchemaWithIndexName(model.Layout{}, "layout")
orm.RegisterSchemaWithIndexName(model.Notification{}, "notification")
api.RegisterSchema()
if global.Env().SetupRequired() {

31
model/notification.go Normal file
View File

@ -0,0 +1,31 @@
package model
import (
"infini.sh/framework/core/orm"
)
type NotificationType string
const (
NotificationTypeProductNews NotificationType = "PRODUCT_NEWS"
NotificationTypeAlertTriggered NotificationType = "ALERT_TRIGGERED"
NotificationTypeDataMigration NotificationType = "DATA_MIGRATION"
)
type NotificationStatus string
const (
NotificationStatusNew NotificationStatus = "NEW"
NotificationStatusRead NotificationStatus = "READ"
)
type Notification struct {
orm.ORMObjectBase
UserId string `json:"user_id,omitempty" elastic_mapping:"user_id: { type: keyword }"`
NotificationType NotificationType `json:"notification_type,omitempty" elastic_mapping:"notification_type:{type:keyword,fields:{text: {type: text}}}"`
Status NotificationStatus `json:"status,omitempty" elastic_mapping:"status: { type: keyword }"`
Title string `json:"title,omitempty" elastic_mapping:"title: { type: keyword }"`
Body string `json:"body,omitempty" elastic_mapping:"body: { type: keyword }"`
Link string `json:"link,omitempty" elastic_mapping:"link: { type: keyword }"`
}

View File

@ -1,15 +1,17 @@
package api
import (
"path"
"infini.sh/console/config"
"infini.sh/console/plugin/api/alerting"
"infini.sh/console/plugin/api/gateway"
"infini.sh/console/plugin/api/index_management"
"infini.sh/console/plugin/api/insight"
"infini.sh/console/plugin/api/layout"
"infini.sh/console/plugin/api/notification"
"infini.sh/framework/core/api"
"infini.sh/framework/core/api/rbac/enum"
"path"
)
func Init(cfg *config.AppConfig) {
@ -61,12 +63,12 @@ func Init(cfg *config.AppConfig) {
// },
//})
alertAPI := alerting.AlertAPI{
}
alertAPI := alerting.AlertAPI{}
alertAPI.Init()
gateway.InitAPI()
insight.InitAPI()
layout.InitAPI()
notification.InitAPI()
}

View File

@ -0,0 +1,14 @@
package notification
import (
"infini.sh/framework/core/api"
)
type NotificationAPI struct {
api.Handler
}
func InitAPI() {
notification := NotificationAPI{}
api.HandleAPIMethod(api.GET, "/notification", notification.RequireLogin(notification.listNotifications))
}

View File

@ -0,0 +1,67 @@
package notification
import (
"errors"
"fmt"
"net/http"
"strconv"
log "github.com/cihub/seelog"
"infini.sh/console/model"
"infini.sh/framework/core/api/rbac"
httprouter "infini.sh/framework/core/api/router"
"infini.sh/framework/core/orm"
"infini.sh/framework/core/util"
)
func (h *NotificationAPI) listNotifications(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
user, err := rbac.FromUserContext(req.Context())
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
if user == nil {
log.Error(errors.New("no user info"))
h.WriteError(w, "no user info", http.StatusInternalServerError)
return
}
var (
queryDSL = `{
"sort": [
{ "created": {"order": "desc"} }
],
"query": {
"bool": { "must": [
{ "term": {"user_id": { "value": "%s" } } }
] }
},
"size": %d, "from": %d
}`
strSize = h.GetParameterOrDefault(req, "size", "20")
strFrom = h.GetParameterOrDefault(req, "from", "0")
)
size, _ := strconv.Atoi(strSize)
if size <= 0 {
size = 20
}
from, _ := strconv.Atoi(strFrom)
if from < 0 {
from = 0
}
q := orm.Query{}
queryDSL = fmt.Sprintf(queryDSL, user.UserId, size, from)
q.RawQuery = util.UnsafeStringToBytes(queryDSL)
err, res := orm.Search(&model.Notification{}, &q)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
h.WriteJSONHeader(w)
h.Write(w, res.Raw)
}

View File

@ -13,6 +13,8 @@ import (
"time"
log "github.com/cihub/seelog"
"github.com/mitchellh/mapstructure"
"infini.sh/console/model"
"infini.sh/framework/core/config"
"infini.sh/framework/core/elastic"
@ -236,6 +238,7 @@ func (p *DispatcherProcessor) handleReadyMajorTask(taskItem *task2.Task) error {
Timestamp: time.Now().UTC(),
}
taskItem.Status = task2.StatusRunning
p.sendMajorTaskNotification(taskItem)
p.saveTaskAndWriteLog(taskItem, taskLog, "")
return nil
}
@ -268,6 +271,7 @@ func (p *DispatcherProcessor) handlePendingStopMajorTask(taskItem *task2.Task) e
// all subtask stopped or error or complete
if len(tasks) == 0 {
taskItem.Status = task2.StatusStopped
p.sendMajorTaskNotification(taskItem)
p.saveTaskAndWriteLog(taskItem, &task2.Log{
ID: util.GetUUID(),
TaskId: taskItem.ID,
@ -290,6 +294,7 @@ func (p *DispatcherProcessor) handleRunningMajorTask(taskItem *task2.Task) error
taskItem.Status = ts.Status
tn := time.Now()
taskItem.CompletedTime = &tn
p.sendMajorTaskNotification(taskItem)
p.saveTaskAndWriteLog(taskItem, &task2.Log{
ID: util.GetUUID(),
TaskId: taskItem.ID,
@ -1467,3 +1472,65 @@ func (p *DispatcherProcessor) getInstanceTaskState() (map[string]DispatcherState
}
return state, nil
}
func (p *DispatcherProcessor) sendMajorTaskNotification(taskItem *task2.Task) {
config, err := p.extractTaskConfig(taskItem)
if err != nil {
log.Errorf("failed to parse config info from major task, id: %s, err: %v", taskItem.ID, err)
return
}
creatorID := config.Creator.Id
var title, body string
body = fmt.Sprintf("From Cluster: [%s (%s)], To Cluster: [%s (%s)]", config.Cluster.Source.Id, config.Cluster.Source.Name, config.Cluster.Target.Id, config.Cluster.Target.Name)
link := fmt.Sprintf("/#/migration/data/%s/detail", taskItem.ID)
switch taskItem.Status {
case task2.StatusReady:
log.Debugf("skip sending notification for ready task, id: %s", taskItem.ID)
return
case task2.StatusStopped:
title = fmt.Sprintf("Data Migration Stopped")
case task2.StatusComplete:
title = fmt.Sprintf("Data Migration Completed")
case task2.StatusError:
title = fmt.Sprintf("Data Migration Failed")
case task2.StatusRunning:
title = fmt.Sprintf("Data Migration Started")
default:
log.Warnf("skip sending notification for invalid task status, id: %s", taskItem.ID)
return
}
notification := &model.Notification{
UserId: util.ToString(creatorID),
NotificationType: model.NotificationTypeDataMigration,
Status: model.NotificationStatusNew,
Title: title,
Body: body,
Link: link,
}
err = orm.Create(nil, notification)
if err != nil {
log.Errorf("failed to create notification, err: %v", err)
return
}
return
}
func (p *DispatcherProcessor) extractTaskConfig(taskItem *task2.Task) (*ElasticDataConfig, error) {
origConfig, ok := taskItem.Config.(ElasticDataConfig)
if ok {
return &origConfig, nil
}
rawConfig, ok := taskItem.Config.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("failed to extract configuration from major task, id: %s, type: %T", taskItem.ID, taskItem.Config)
}
config := &ElasticDataConfig{}
err := mapstructure.Decode(rawConfig, config)
if err != nil {
return nil, err
}
return config, nil
}