diff --git a/main.go b/main.go index 50b2be8a..eef232f8 100644 --- a/main.go +++ b/main.go @@ -4,11 +4,14 @@ import ( "context" "errors" _ "expvar" + _ "time/tzdata" + log "github.com/cihub/seelog" "infini.sh/console/config" "infini.sh/console/model" "infini.sh/console/model/alerting" _ "infini.sh/console/plugin" + setup1 "infini.sh/console/plugin/setup" alerting2 "infini.sh/console/service/alerting" "infini.sh/framework" "infini.sh/framework/core/elastic" @@ -31,11 +34,8 @@ import ( "infini.sh/framework/modules/task" "infini.sh/framework/modules/ui" _ "infini.sh/framework/plugins" - setup1 "infini.sh/console/plugin/setup" - _ "infini.sh/console/plugin" api2 "infini.sh/gateway/api" _ "infini.sh/gateway/proxy" - _ "time/tzdata" ) var appConfig *config.AppConfig @@ -60,18 +60,18 @@ func main() { api := api2.GatewayAPI{} - modules:=[]module.Module{} - modules=append(modules,&stats.SimpleStatsModule{}) - modules=append(modules,&elastic2.ElasticModule{}) - modules=append(modules,&queue2.DiskQueue{}) - modules=append(modules,&redis.RedisModule{}) - modules=append(modules,&pipeline.PipeModule{}) - modules=append(modules,&task.TaskModule{}) - modules=append(modules,&agent.AgentModule{}) - modules=append(modules,&metrics.MetricsModule{}) - modules=append(modules,&security.Module{}) + modules := []module.Module{} + modules = append(modules, &stats.SimpleStatsModule{}) + modules = append(modules, &elastic2.ElasticModule{}) + modules = append(modules, &queue2.DiskQueue{}) + modules = append(modules, &redis.RedisModule{}) + modules = append(modules, &pipeline.PipeModule{}) + modules = append(modules, &task.TaskModule{}) + modules = append(modules, &agent.AgentModule{}) + modules = append(modules, &metrics.MetricsModule{}) + modules = append(modules, &security.Module{}) - uiModule:=&ui.UIModule{} + uiModule := &ui.UIModule{} if app.Setup(func() { @@ -79,11 +79,11 @@ func main() { module.RegisterSystemModule(&setup1.Module{}) module.RegisterSystemModule(uiModule) - if !global.Env().SetupRequired(){ + if !global.Env().SetupRequired() { for _, v := range modules { module.RegisterSystemModule(v) } - }else{ + } else { for _, v := range modules { v.Setup() } @@ -115,8 +115,7 @@ func main() { module.Start() - var initFunc= func() { - + var initFunc = func() { elastic2.InitTemplate(false) @@ -134,6 +133,7 @@ func main() { orm.RegisterSchemaWithIndexName(task1.Task{}, "task") orm.RegisterSchemaWithIndexName(task1.Log{}, "task-log") orm.RegisterSchemaWithIndexName(model.Layout{}, "layout") + orm.RegisterSchemaWithIndexName(model.Notification{}, "notification") api.RegisterSchema() if global.Env().SetupRequired() { @@ -142,7 +142,7 @@ func main() { } } - task1.RunWithinGroup("initialize_alerting",func(ctx context.Context) error { + task1.RunWithinGroup("initialize_alerting", func(ctx context.Context) error { err := alerting2.InitTasks() if err != nil { log.Errorf("init alerting task error: %v", err) @@ -151,13 +151,13 @@ func main() { }) } - if !global.Env().SetupRequired(){ + if !global.Env().SetupRequired() { initFunc() - }else{ + } else { setup1.RegisterSetupCallback(initFunc) } - if !global.Env().SetupRequired(){ + if !global.Env().SetupRequired() { err := bootstrapRequirementCheck() if err != nil { panic(err) diff --git a/model/notification.go b/model/notification.go new file mode 100644 index 00000000..e9bb141d --- /dev/null +++ b/model/notification.go @@ -0,0 +1,31 @@ +package model + +import ( + "infini.sh/framework/core/orm" +) + +type NotificationType string + +const ( + NotificationTypeProductNews NotificationType = "PRODUCT_NEWS" + NotificationTypeAlertTriggered NotificationType = "ALERT_TRIGGERED" + NotificationTypeDataMigration NotificationType = "DATA_MIGRATION" +) + +type NotificationStatus string + +const ( + NotificationStatusNew NotificationStatus = "NEW" + NotificationStatusRead NotificationStatus = "READ" +) + +type Notification struct { + orm.ORMObjectBase + + UserId string `json:"user_id,omitempty" elastic_mapping:"user_id: { type: keyword }"` + NotificationType NotificationType `json:"notification_type,omitempty" elastic_mapping:"notification_type:{type:keyword,fields:{text: {type: text}}}"` + Status NotificationStatus `json:"status,omitempty" elastic_mapping:"status: { type: keyword }"` + Title string `json:"title,omitempty" elastic_mapping:"title: { type: keyword }"` + Body string `json:"body,omitempty" elastic_mapping:"body: { type: keyword }"` + Link string `json:"link,omitempty" elastic_mapping:"link: { type: keyword }"` +} diff --git a/plugin/api/init.go b/plugin/api/init.go index a2f0b2e4..0125013b 100644 --- a/plugin/api/init.go +++ b/plugin/api/init.go @@ -1,15 +1,17 @@ package api import ( + "path" + "infini.sh/console/config" "infini.sh/console/plugin/api/alerting" "infini.sh/console/plugin/api/gateway" "infini.sh/console/plugin/api/index_management" "infini.sh/console/plugin/api/insight" "infini.sh/console/plugin/api/layout" + "infini.sh/console/plugin/api/notification" "infini.sh/framework/core/api" "infini.sh/framework/core/api/rbac/enum" - "path" ) func Init(cfg *config.AppConfig) { @@ -48,8 +50,8 @@ func Init(cfg *config.AppConfig) { api.HandleAPIMethod(api.POST, path.Join(pathPrefix, "elasticsearch/command"), handler.RequirePermission(handler.HandleAddCommonCommandAction, enum.PermissionCommandWrite)) api.HandleAPIMethod(api.PUT, path.Join(pathPrefix, "elasticsearch/command/:cid"), handler.RequirePermission(handler.HandleSaveCommonCommandAction, enum.PermissionCommandWrite)) api.HandleAPIMethod(api.GET, path.Join(pathPrefix, "elasticsearch/command"), handler.RequirePermission(handler.HandleQueryCommonCommandAction, enum.PermissionCommandRead)) - api.HandleAPIMethod(api.DELETE, path.Join(pathPrefix, "elasticsearch/command/:cid"), handler.RequirePermission(handler.HandleDeleteCommonCommandAction,enum.PermissionCommandWrite)) - api.HandleAPIMethod(api.GET, "/elasticsearch/overview/status", handler.RequireLogin(handler.ElasticsearchStatusSummaryAction)) + api.HandleAPIMethod(api.DELETE, path.Join(pathPrefix, "elasticsearch/command/:cid"), handler.RequirePermission(handler.HandleDeleteCommonCommandAction, enum.PermissionCommandWrite)) + api.HandleAPIMethod(api.GET, "/elasticsearch/overview/status", handler.RequireLogin(handler.ElasticsearchStatusSummaryAction)) //task.RegisterScheduleTask(task.ScheduleTask{ // Description: "sync reindex task result", @@ -61,12 +63,12 @@ func Init(cfg *config.AppConfig) { // }, //}) - alertAPI := alerting.AlertAPI{ - } + alertAPI := alerting.AlertAPI{} alertAPI.Init() gateway.InitAPI() insight.InitAPI() layout.InitAPI() + notification.InitAPI() } diff --git a/plugin/api/notification/api.go b/plugin/api/notification/api.go new file mode 100644 index 00000000..df9b63b2 --- /dev/null +++ b/plugin/api/notification/api.go @@ -0,0 +1,14 @@ +package notification + +import ( + "infini.sh/framework/core/api" +) + +type NotificationAPI struct { + api.Handler +} + +func InitAPI() { + notification := NotificationAPI{} + api.HandleAPIMethod(api.GET, "/notification", notification.RequireLogin(notification.listNotifications)) +} diff --git a/plugin/api/notification/notification.go b/plugin/api/notification/notification.go new file mode 100644 index 00000000..09ca3a1b --- /dev/null +++ b/plugin/api/notification/notification.go @@ -0,0 +1,67 @@ +package notification + +import ( + "errors" + "fmt" + "net/http" + "strconv" + + log "github.com/cihub/seelog" + "infini.sh/console/model" + "infini.sh/framework/core/api/rbac" + httprouter "infini.sh/framework/core/api/router" + "infini.sh/framework/core/orm" + "infini.sh/framework/core/util" +) + +func (h *NotificationAPI) listNotifications(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + user, err := rbac.FromUserContext(req.Context()) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + if user == nil { + log.Error(errors.New("no user info")) + h.WriteError(w, "no user info", http.StatusInternalServerError) + return + } + + var ( + queryDSL = `{ + "sort": [ + { "created": {"order": "desc"} } + ], + "query": { + "bool": { "must": [ + { "term": {"user_id": { "value": "%s" } } } + ] } + }, + "size": %d, "from": %d + }` + strSize = h.GetParameterOrDefault(req, "size", "20") + strFrom = h.GetParameterOrDefault(req, "from", "0") + ) + size, _ := strconv.Atoi(strSize) + if size <= 0 { + size = 20 + } + from, _ := strconv.Atoi(strFrom) + if from < 0 { + from = 0 + } + + q := orm.Query{} + queryDSL = fmt.Sprintf(queryDSL, user.UserId, size, from) + q.RawQuery = util.UnsafeStringToBytes(queryDSL) + + err, res := orm.Search(&model.Notification{}, &q) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + h.WriteJSONHeader(w) + h.Write(w, res.Raw) +} diff --git a/plugin/migration/pipeline.go b/plugin/migration/pipeline.go index d2ad25ce..c0aaea97 100644 --- a/plugin/migration/pipeline.go +++ b/plugin/migration/pipeline.go @@ -13,6 +13,8 @@ import ( "time" log "github.com/cihub/seelog" + "github.com/mitchellh/mapstructure" + "infini.sh/console/model" "infini.sh/framework/core/config" "infini.sh/framework/core/elastic" @@ -236,6 +238,7 @@ func (p *DispatcherProcessor) handleReadyMajorTask(taskItem *task2.Task) error { Timestamp: time.Now().UTC(), } taskItem.Status = task2.StatusRunning + p.sendMajorTaskNotification(taskItem) p.saveTaskAndWriteLog(taskItem, taskLog, "") return nil } @@ -268,6 +271,7 @@ func (p *DispatcherProcessor) handlePendingStopMajorTask(taskItem *task2.Task) e // all subtask stopped or error or complete if len(tasks) == 0 { taskItem.Status = task2.StatusStopped + p.sendMajorTaskNotification(taskItem) p.saveTaskAndWriteLog(taskItem, &task2.Log{ ID: util.GetUUID(), TaskId: taskItem.ID, @@ -290,6 +294,7 @@ func (p *DispatcherProcessor) handleRunningMajorTask(taskItem *task2.Task) error taskItem.Status = ts.Status tn := time.Now() taskItem.CompletedTime = &tn + p.sendMajorTaskNotification(taskItem) p.saveTaskAndWriteLog(taskItem, &task2.Log{ ID: util.GetUUID(), TaskId: taskItem.ID, @@ -1467,3 +1472,65 @@ func (p *DispatcherProcessor) getInstanceTaskState() (map[string]DispatcherState } return state, nil } + +func (p *DispatcherProcessor) sendMajorTaskNotification(taskItem *task2.Task) { + config, err := p.extractTaskConfig(taskItem) + if err != nil { + log.Errorf("failed to parse config info from major task, id: %s, err: %v", taskItem.ID, err) + return + } + + creatorID := config.Creator.Id + + var title, body string + body = fmt.Sprintf("From Cluster: [%s (%s)], To Cluster: [%s (%s)]", config.Cluster.Source.Id, config.Cluster.Source.Name, config.Cluster.Target.Id, config.Cluster.Target.Name) + link := fmt.Sprintf("/#/migration/data/%s/detail", taskItem.ID) + switch taskItem.Status { + case task2.StatusReady: + log.Debugf("skip sending notification for ready task, id: %s", taskItem.ID) + return + case task2.StatusStopped: + title = fmt.Sprintf("Data Migration Stopped") + case task2.StatusComplete: + title = fmt.Sprintf("Data Migration Completed") + case task2.StatusError: + title = fmt.Sprintf("Data Migration Failed") + case task2.StatusRunning: + title = fmt.Sprintf("Data Migration Started") + default: + log.Warnf("skip sending notification for invalid task status, id: %s", taskItem.ID) + return + } + notification := &model.Notification{ + UserId: util.ToString(creatorID), + NotificationType: model.NotificationTypeDataMigration, + Status: model.NotificationStatusNew, + Title: title, + Body: body, + Link: link, + } + err = orm.Create(nil, notification) + if err != nil { + log.Errorf("failed to create notification, err: %v", err) + return + } + return +} + +func (p *DispatcherProcessor) extractTaskConfig(taskItem *task2.Task) (*ElasticDataConfig, error) { + origConfig, ok := taskItem.Config.(ElasticDataConfig) + if ok { + return &origConfig, nil + } + rawConfig, ok := taskItem.Config.(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("failed to extract configuration from major task, id: %s, type: %T", taskItem.ID, taskItem.Config) + } + + config := &ElasticDataConfig{} + err := mapstructure.Decode(rawConfig, config) + if err != nil { + return nil, err + } + return config, nil +}