From 8fddcdaf09d614ff3ba0637c09fc747a7f55f655 Mon Sep 17 00:00:00 2001 From: sunjiacheng Date: Sun, 2 Apr 2023 15:02:30 +0800 Subject: [PATCH] [notification] add /notification/read, add message_type (#47) [notification] add /notification/read, add message_type Co-authored-by: Kassian Sun --- model/notification.go | 14 ++- plugin/api/notification/api.go | 1 + plugin/api/notification/notification.go | 69 ++++++++++- plugin/migration/api.go | 154 ++++++++++++------------ plugin/migration/pipeline.go | 3 +- 5 files changed, 155 insertions(+), 86 deletions(-) diff --git a/model/notification.go b/model/notification.go index e9bb141d..711477e9 100644 --- a/model/notification.go +++ b/model/notification.go @@ -7,9 +7,16 @@ import ( type NotificationType string const ( - NotificationTypeProductNews NotificationType = "PRODUCT_NEWS" - NotificationTypeAlertTriggered NotificationType = "ALERT_TRIGGERED" - NotificationTypeDataMigration NotificationType = "DATA_MIGRATION" + NotificationTypeNotification NotificationType = "NOTIFICATION" + NotificationTypeTodo NotificationType = "TODO" +) + +type MessageType string + +const ( + MessageTypeNews MessageType = "NEWS" + MessageTypeAlerting MessageType = "ALERTING" + MessageTypeMigration MessageType = "MIGRATION" ) type NotificationStatus string @@ -24,6 +31,7 @@ type Notification struct { UserId string `json:"user_id,omitempty" elastic_mapping:"user_id: { type: keyword }"` NotificationType NotificationType `json:"notification_type,omitempty" elastic_mapping:"notification_type:{type:keyword,fields:{text: {type: text}}}"` + MessageType MessageType `json:"message_type,omitempty" elastic_mapping:"message_type:{type:keyword,fields:{text: {type: text}}}"` Status NotificationStatus `json:"status,omitempty" elastic_mapping:"status: { type: keyword }"` Title string `json:"title,omitempty" elastic_mapping:"title: { type: keyword }"` Body string `json:"body,omitempty" elastic_mapping:"body: { type: keyword }"` diff --git a/plugin/api/notification/api.go b/plugin/api/notification/api.go index df9b63b2..dd1719cf 100644 --- a/plugin/api/notification/api.go +++ b/plugin/api/notification/api.go @@ -11,4 +11,5 @@ type NotificationAPI struct { func InitAPI() { notification := NotificationAPI{} api.HandleAPIMethod(api.GET, "/notification", notification.RequireLogin(notification.listNotifications)) + api.HandleAPIMethod(api.POST, "/notification/read", notification.RequireLogin(notification.setNotificationsRead)) } diff --git a/plugin/api/notification/notification.go b/plugin/api/notification/notification.go index 09ca3a1b..9dc3c1d4 100644 --- a/plugin/api/notification/notification.go +++ b/plugin/api/notification/notification.go @@ -5,6 +5,7 @@ import ( "fmt" "net/http" "strconv" + "time" log "github.com/cihub/seelog" "infini.sh/console/model" @@ -17,7 +18,7 @@ import ( func (h *NotificationAPI) listNotifications(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { user, err := rbac.FromUserContext(req.Context()) if err != nil { - log.Error(err) + log.Error("failed to get user from context, err: %v", err) h.WriteError(w, err.Error(), http.StatusInternalServerError) return } @@ -35,7 +36,8 @@ func (h *NotificationAPI) listNotifications(w http.ResponseWriter, req *http.Req ], "query": { "bool": { "must": [ - { "term": {"user_id": { "value": "%s" } } } + { "term": {"user_id": { "value": "%s" } } }, + { "term": {"status": { "value": "%s" } } } ] } }, "size": %d, "from": %d @@ -53,7 +55,7 @@ func (h *NotificationAPI) listNotifications(w http.ResponseWriter, req *http.Req } q := orm.Query{} - queryDSL = fmt.Sprintf(queryDSL, user.UserId, size, from) + queryDSL = fmt.Sprintf(queryDSL, user.UserId, model.NotificationStatusNew, size, from) q.RawQuery = util.UnsafeStringToBytes(queryDSL) err, res := orm.Search(&model.Notification{}, &q) @@ -65,3 +67,64 @@ func (h *NotificationAPI) listNotifications(w http.ResponseWriter, req *http.Req h.WriteJSONHeader(w) h.Write(w, res.Raw) } + +type SetNotificationsReadRequest struct { + Ids []string `json:"ids"` +} + +func (h *NotificationAPI) setNotificationsRead(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + user, err := rbac.FromUserContext(req.Context()) + if err != nil { + log.Error("failed to get user from context, err: %v", err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + if user == nil { + log.Error(errors.New("no user info")) + h.WriteError(w, "no user info", http.StatusInternalServerError) + return + } + + var reqData = SetNotificationsReadRequest{} + err = h.DecodeJSON(req, &reqData) + if err != nil { + log.Error("failed to parse request: ", err) + h.WriteError(w, err.Error(), http.StatusBadRequest) + return + } + now := time.Now().Format(time.RFC3339Nano) + + queryDsl := util.MapStr{ + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "terms": util.MapStr{ + "_id": reqData.Ids, + }, + }, + { + "term": util.MapStr{ + "status": util.MapStr{ + "value": model.NotificationStatusNew, + }, + }, + }, + }, + }, + }, + "script": util.MapStr{ + "source": fmt.Sprintf("ctx._source['status'] = '%s';ctx._source['updated'] = '%s'", model.NotificationStatusRead, now), + }, + } + + err = orm.UpdateBy(model.Notification{}, util.MustToJSONBytes(queryDsl)) + if err != nil { + log.Errorf("failed to update notifications, err: %v", err) + h.WriteError(w, "update notifications failed", http.StatusInternalServerError) + return + } + + h.WriteAckOKJSON(w) +} diff --git a/plugin/migration/api.go b/plugin/migration/api.go index 5060c9bc..9e19c942 100644 --- a/plugin/migration/api.go +++ b/plugin/migration/api.go @@ -7,6 +7,11 @@ package migration import ( "context" "fmt" + "net/http" + "strconv" + "strings" + "time" + log "github.com/cihub/seelog" "infini.sh/console/model" "infini.sh/framework/core/api" @@ -17,28 +22,22 @@ import ( "infini.sh/framework/core/orm" task2 "infini.sh/framework/core/task" "infini.sh/framework/core/util" - "net/http" - "strconv" - "strings" - "time" ) - func InitAPI() { - handler := APIHandler{ - } - api.HandleAPIMethod(api.GET, "/migration/data/_search", handler.RequireLogin(handler.searchDataMigrationTask)) + handler := APIHandler{} + api.HandleAPIMethod(api.GET, "/migration/data/_search", handler.RequireLogin(handler.searchDataMigrationTask)) api.HandleAPIMethod(api.POST, "/migration/data", handler.RequireLogin(handler.createDataMigrationTask)) - api.HandleAPIMethod(api.POST, "/migration/data/_validate", handler.RequireLogin(handler.validateDataMigration)) + api.HandleAPIMethod(api.POST, "/migration/data/_validate", handler.RequireLogin(handler.validateDataMigration)) api.HandleAPIMethod(api.POST, "/elasticsearch/:id/index/:index/_partition", handler.getIndexPartitionInfo) api.HandleAPIMethod(api.POST, "/elasticsearch/:id/index/:index/_count", handler.countDocuments) - api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_start", handler.RequireLogin(handler.startDataMigration)) - api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_stop", handler.RequireLogin(handler.stopDataMigrationTask)) + api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_start", handler.RequireLogin(handler.startDataMigration)) + api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_stop", handler.RequireLogin(handler.stopDataMigrationTask)) //api.HandleAPIMethod(api.GET, "/migration/data/:task_id", handler.getMigrationTask) - api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info", handler.RequireLogin(handler.getDataMigrationTaskInfo)) - api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info/:index", handler.RequireLogin(handler.getDataMigrationTaskOfIndex)) - api.HandleAPIMethod(api.PUT, "/migration/data/:task_id/status", handler.RequireLogin(handler.updateDataMigrationTaskStatus)) + api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info", handler.RequireLogin(handler.getDataMigrationTaskInfo)) + api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info/:index", handler.RequireLogin(handler.getDataMigrationTaskOfIndex)) + api.HandleAPIMethod(api.PUT, "/migration/data/:task_id/status", handler.RequireLogin(handler.updateDataMigrationTaskStatus)) } @@ -47,7 +46,7 @@ type APIHandler struct { bulkResultIndexName string } -func (h *APIHandler) createDataMigrationTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ +func (h *APIHandler) createDataMigrationTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { clusterTaskConfig := &ElasticDataConfig{} err := h.DecodeJSON(req, clusterTaskConfig) if err != nil { @@ -59,7 +58,7 @@ func (h *APIHandler) createDataMigrationTask(w http.ResponseWriter, req *http.Re h.WriteError(w, "indices must not be empty", http.StatusInternalServerError) return } - user, err := rbac.FromUserContext(req.Context()) + user, err := rbac.FromUserContext(req.Context()) if err != nil { log.Error(err) h.WriteError(w, err.Error(), http.StatusInternalServerError) @@ -83,16 +82,16 @@ func (h *APIHandler) createDataMigrationTask(w http.ResponseWriter, req *http.Re Metadata: task2.Metadata{ Type: "pipeline", Labels: util.MapStr{ - "business_id": "cluster_migration", + "business_id": "cluster_migration", "source_cluster_id": clusterTaskConfig.Cluster.Source.Id, "target_cluster_id": clusterTaskConfig.Cluster.Target.Id, "source_total_docs": totalDocs, }, }, Cancellable: true, - Runnable: false, - Status: task2.StatusInit, - Config: clusterTaskConfig, + Runnable: false, + Status: task2.StatusInit, + Config: clusterTaskConfig, } t.ID = util.GetUUID() err = orm.Create(nil, &t) @@ -107,9 +106,9 @@ func (h *APIHandler) createDataMigrationTask(w http.ResponseWriter, req *http.Re func (h *APIHandler) searchDataMigrationTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { var ( keyword = h.GetParameterOrDefault(req, "keyword", "") - strSize = h.GetParameterOrDefault(req, "size", "20") - strFrom = h.GetParameterOrDefault(req, "from", "0") - mustQ []interface{} + strSize = h.GetParameterOrDefault(req, "size", "20") + strFrom = h.GetParameterOrDefault(req, "from", "0") + mustQ []interface{} ) mustQ = append(mustQ, util.MapStr{ "term": util.MapStr{ @@ -194,9 +193,9 @@ func (h *APIHandler) searchDataMigrationTask(w http.ResponseWriter, req *http.Re h.WriteJSON(w, searchRes, http.StatusOK) } -func (h *APIHandler) getIndexPartitionInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ +func (h *APIHandler) getIndexPartitionInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { var ( - index = ps.MustGetParameter("index") + index = ps.MustGetParameter("index") clusterID = ps.MustGetParameter("id") ) client := elastic.GetClient(clusterID) @@ -218,14 +217,14 @@ func (h *APIHandler) getIndexPartitionInfo(w http.ResponseWriter, req *http.Requ h.WriteJSON(w, partitions, http.StatusOK) } -func (h *APIHandler) startDataMigration(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ +func (h *APIHandler) startDataMigration(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { taskID := ps.MustGetParameter("task_id") obj := task2.Task{} obj.ID = taskID exists, err := orm.Get(&obj) if !exists || err != nil { - h.WriteError(w, fmt.Sprintf("task [%s] not found", taskID), http.StatusInternalServerError) + h.WriteError(w, fmt.Sprintf("task [%s] not found", taskID), http.StatusInternalServerError) return } obj.Status = task2.StatusReady @@ -348,7 +347,7 @@ func (h *APIHandler) stopDataMigrationTask(w http.ResponseWriter, req *http.Requ }, 200) } -func getTaskConfig(task *task2.Task, config interface{}) error{ +func getTaskConfig(task *task2.Task, config interface{}) error { configBytes, err := util.ToJSONBytes(task.Config) if err != nil { return err @@ -357,16 +356,16 @@ func getTaskConfig(task *task2.Task, config interface{}) error{ return util.FromJSONBytes(configBytes, config) } -func getIndexRefreshInterval(indexNames []string, targetESClient elastic.API)(map[string]string, error){ +func getIndexRefreshInterval(indexNames []string, targetESClient elastic.API) (map[string]string, error) { const step = 50 var ( length = len(indexNames) - end int + end int ) refreshIntervals := map[string]string{} for i := 0; i < length; i += step { end = i + step - if end > length - 1 { + if end > length-1 { end = length } tempNames := indexNames[i:end] @@ -395,7 +394,7 @@ func getIndexRefreshInterval(indexNames []string, targetESClient elastic.API)(ma } -func (h *APIHandler) getIndexRefreshIntervals(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ +func (h *APIHandler) getIndexRefreshIntervals(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { id := ps.MustGetParameter("task_id") obj := task2.Task{} @@ -433,7 +432,7 @@ func (h *APIHandler) getIndexRefreshIntervals(w http.ResponseWriter, req *http.R h.WriteJSON(w, vals, http.StatusOK) } -func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ +func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { id := ps.MustGetParameter("task_id") obj := task2.Task{} @@ -480,7 +479,7 @@ func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.R taskConfig.Indices[i].Percent = util.ToFixed(percent, 2) taskConfig.Indices[i].ErrorPartitions = indexState[indexName].ErrorPartitions if int64(count) == index.Source.Docs { - completedIndices ++ + completedIndices++ } } cfg := global.MustLookup("cluster_migration_config") @@ -497,14 +496,14 @@ func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.R obj.Metadata.Labels["completed_indices"] = completedIndices h.WriteJSON(w, obj, http.StatusOK) } -func getMajorTaskInfoByIndex(taskID string) (map[string]IndexStateInfo, error){ +func getMajorTaskInfoByIndex(taskID string) (map[string]IndexStateInfo, error) { query := util.MapStr{ "size": 0, "aggs": util.MapStr{ "group_by_task": util.MapStr{ "terms": util.MapStr{ "field": "metadata.labels.unique_index_name", - "size": 100, + "size": 100, }, "aggs": util.MapStr{ "group_by_status": util.MapStr{ @@ -591,7 +590,7 @@ func getMajorTaskInfoByIndex(taskID string) (map[string]IndexStateInfo, error){ return resBody, nil } -func getIndexTaskDocCount(index *IndexConfig, targetESClient elastic.API) (int64, error) { +func getIndexTaskDocCount(ctx context.Context, index *IndexConfig, targetESClient elastic.API) (int64, error) { targetIndexName := index.Target.Name if targetIndexName == "" { if v, ok := index.IndexRename[index.Source.Name].(string); ok { @@ -622,8 +621,6 @@ func getIndexTaskDocCount(index *IndexConfig, targetESClient elastic.API) (int64 body = util.MustToJSONBytes(query) } - ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) - defer cancel() countRes, err := targetESClient.Count(ctx, targetIndexName, body) if err != nil { return 0, err @@ -634,7 +631,7 @@ func getIndexTaskDocCount(index *IndexConfig, targetESClient elastic.API) (int64 return countRes.Count, nil } -func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ +func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { id := ps.MustGetParameter("task_id") uniqueIndexName := ps.MustGetParameter("index") majorTask := task2.Task{} @@ -648,7 +645,7 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt var completedTime int64 taskInfo := util.MapStr{ - "task_id": id, + "task_id": id, "start_time": majorTask.StartTimeInMillis, } partitionTaskQuery := util.MapStr{ @@ -709,7 +706,7 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt subTaskStatus[subTask.ID] = subTask.Status continue } - if subTask.Metadata.Labels["pipeline_id"] == "es_scroll" || subTask.Metadata.Labels["pipeline_id"] == "bulk_indexing" { + if subTask.Metadata.Labels["pipeline_id"] == "es_scroll" || subTask.Metadata.Labels["pipeline_id"] == "bulk_indexing" { if instID, ok := subTask.Metadata.Labels["execution_instance_id"].(string); ok { pipelineTaskIDs[instID] = append(pipelineTaskIDs[instID], subTask.ID) if pl := len(subTask.ParentId); pl > 0 { @@ -722,9 +719,9 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt } } taskInfo["data_partition"] = len(subTasks) - var taskStats = map[string]struct{ + var taskStats = map[string]struct { ScrolledDocs float64 - IndexDocs float64 + IndexDocs float64 }{} for instID, taskIDs := range pipelineTaskIDs { inst := &model.Instance{} @@ -761,9 +758,9 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt } var ( - partitionTaskInfos []util.MapStr + partitionTaskInfos []util.MapStr completedPartitions int - startTime int64 + startTime int64 ) if len(subTasks) > 0 { startTime = subTasks[0].StartTimeInMillis @@ -771,7 +768,7 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt for i, ptask := range subTasks { var ( cfg map[string]interface{} - ok bool + ok bool ) if cfg, ok = ptask.Config.(map[string]interface{}); !ok { continue @@ -794,18 +791,18 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt } var ( scrollDocs float64 - indexDocs float64 + indexDocs float64 ) if stat, ok := taskStats[ptask.ID]; ok { scrollDocs = stat.ScrolledDocs indexDocs = stat.IndexDocs - }else{ + } else { if ptask.Status == task2.StatusComplete || ptask.Status == task2.StatusError { if ptask.Metadata.Labels != nil { - if v, ok := ptask.Metadata.Labels["scrolled_docs"].(float64); ok{ + if v, ok := ptask.Metadata.Labels["scrolled_docs"].(float64); ok { scrollDocs = v } - if v, ok := ptask.Metadata.Labels["index_docs"].(float64); ok{ + if v, ok := ptask.Metadata.Labels["index_docs"].(float64); ok { indexDocs = v } } @@ -822,16 +819,16 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt partitionTotalDocs, _ := util.MapStr(cfg).GetValue("source.doc_count") partitionTaskInfos = append(partitionTaskInfos, util.MapStr{ - "task_id": ptask.ID, - "status": ptask.Status, - "start_time": ptask.StartTimeInMillis, + "task_id": ptask.ID, + "status": ptask.Status, + "start_time": ptask.StartTimeInMillis, "completed_time": subCompletedTime, - "start": start, - "end": end, - "duration": durationInMS, - "scroll_docs": scrollDocs, - "index_docs": indexDocs, - "total_docs": partitionTotalDocs, + "start": start, + "end": end, + "duration": durationInMS, + "scroll_docs": scrollDocs, + "index_docs": indexDocs, + "total_docs": partitionTotalDocs, }) if ptask.Status == task2.StatusComplete || ptask.Status == task2.StatusError { completedPartitions++ @@ -840,7 +837,7 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt if len(subTasks) == completedPartitions { taskInfo["completed_time"] = completedTime taskInfo["duration"] = completedTime - startTime - }else{ + } else { taskInfo["duration"] = time.Now().UnixMilli() - startTime } taskInfo["start_time"] = startTime @@ -849,7 +846,7 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt h.WriteJSON(w, taskInfo, http.StatusOK) } -func (h *APIHandler) getMigrationTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ +func (h *APIHandler) getMigrationTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { id := ps.MustGetParameter("task_id") obj := task2.Task{} @@ -871,9 +868,9 @@ func (h *APIHandler) getMigrationTask(w http.ResponseWriter, req *http.Request, }, 200) } -func (h *APIHandler) countDocuments(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ +func (h *APIHandler) countDocuments(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { var ( - index = ps.MustGetParameter("index") + index = ps.MustGetParameter("index") clusterID = ps.MustGetParameter("id") ) client := elastic.GetClient(clusterID) @@ -893,8 +890,8 @@ func (h *APIHandler) countDocuments(w http.ResponseWriter, req *http.Request, ps }) } - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - defer cancel() + ctx := req.Context() + countRes, err := client.Count(ctx, index, query) if err != nil { log.Error(err) @@ -904,7 +901,7 @@ func (h *APIHandler) countDocuments(w http.ResponseWriter, req *http.Request, ps h.WriteJSON(w, countRes, http.StatusOK) } -func (h *APIHandler) getMigrationTaskLog(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ +func (h *APIHandler) getMigrationTaskLog(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { id := ps.MustGetParameter("task_id") query := util.MapStr{ "sort": []util.MapStr{ @@ -924,7 +921,7 @@ func (h *APIHandler) getMigrationTaskLog(w http.ResponseWriter, req *http.Reques "value": id, }, }, - },{ + }, { "term": util.MapStr{ "id": util.MapStr{ "value": id, @@ -946,7 +943,7 @@ func (h *APIHandler) getMigrationTaskLog(w http.ResponseWriter, req *http.Reques } } -func (h *APIHandler) updateDataMigrationTaskStatus(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ +func (h *APIHandler) updateDataMigrationTaskStatus(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { id := ps.MustGetParameter("task_id") obj := task2.Task{} @@ -982,7 +979,7 @@ func (h *APIHandler) updateDataMigrationTaskStatus(w http.ResponseWriter, req *h }, 200) } -func (h *APIHandler) validateDataMigration(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ +func (h *APIHandler) validateDataMigration(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { typ := h.GetParameter(req, "type") switch typ { case "multi_type": @@ -992,9 +989,9 @@ func (h *APIHandler) validateDataMigration(w http.ResponseWriter, req *http.Requ h.WriteError(w, "unknown parameter type", http.StatusOK) } -func (h *APIHandler) validateMultiType(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ +func (h *APIHandler) validateMultiType(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { var reqBody = struct { - Cluster struct{ + Cluster struct { SourceID string `json:"source_id"` TargetID string `json:"target_id"` } `json:"cluster"` @@ -1018,7 +1015,7 @@ func (h *APIHandler) validateMultiType(w http.ResponseWriter, req *http.Request, h.WriteJSON(w, util.MapStr{ "result": typeInfo, - } , http.StatusOK) + }, http.StatusOK) } func stopPipelineTasks(parentID string) error { @@ -1036,7 +1033,7 @@ func stopPipelineTasks(parentID string) error { }, { "terms": util.MapStr{ - "metadata.labels.pipeline_id": []string{"es_scroll","bulk_indexing"}, + "metadata.labels.pipeline_id": []string{"es_scroll", "bulk_indexing"}, }, }, }, @@ -1059,7 +1056,7 @@ func stopPipelineTasks(parentID string) error { tk := task2.Task{} err = util.FromJSONBytes(buf, &tk) if err != nil { - return err + return err } if tk.Metadata.Labels != nil { if instID, ok := tk.Metadata.Labels["execution_instance_id"].(string); ok { @@ -1125,7 +1122,6 @@ func getMajorTaskStatsFromInstances(majorTaskID string) (taskStats MajorTaskStat }, }, }, - }, }, }, @@ -1184,7 +1180,7 @@ func getMajorTaskStatsFromInstances(majorTaskID string) (taskStats MajorTaskStat return taskStats, nil } -func getMajorTaskByIndexFromES(majorTaskID string)(map[string]IndexStateInfo, error){ +func getMajorTaskByIndexFromES(majorTaskID string) (map[string]IndexStateInfo, error) { taskQuery := util.MapStr{ "size": 500, "query": util.MapStr{ @@ -1252,7 +1248,7 @@ func getMajorTaskByIndexFromES(majorTaskID string)(map[string]IndexStateInfo, er for pipelineID, status := range pipelines { indexName := pipelineIndexNames[pipelineID] if v, err := status.Context.GetValue("bulk_indexing.success.count"); err == nil { - if vv, ok := v.(float64); ok && indexName != ""{ + if vv, ok := v.(float64); ok && indexName != "" { st := state[indexName] st.IndexDocs += vv state[indexName] = st @@ -1261,4 +1257,4 @@ func getMajorTaskByIndexFromES(majorTaskID string)(map[string]IndexStateInfo, er } } return state, nil -} \ No newline at end of file +} diff --git a/plugin/migration/pipeline.go b/plugin/migration/pipeline.go index c0aaea97..f7ccfccd 100644 --- a/plugin/migration/pipeline.go +++ b/plugin/migration/pipeline.go @@ -1503,7 +1503,8 @@ func (p *DispatcherProcessor) sendMajorTaskNotification(taskItem *task2.Task) { } notification := &model.Notification{ UserId: util.ToString(creatorID), - NotificationType: model.NotificationTypeDataMigration, + NotificationType: model.NotificationTypeNotification, + MessageType: model.MessageTypeMigration, Status: model.NotificationStatusNew, Title: title, Body: body,