diff --git a/main.go b/main.go index eef232f8..d383d906 100644 --- a/main.go +++ b/main.go @@ -131,7 +131,6 @@ func main() { orm.RegisterSchemaWithIndexName(insight.Visualization{}, "visualization") orm.RegisterSchemaWithIndexName(insight.Dashboard{}, "dashboard") orm.RegisterSchemaWithIndexName(task1.Task{}, "task") - orm.RegisterSchemaWithIndexName(task1.Log{}, "task-log") orm.RegisterSchemaWithIndexName(model.Layout{}, "layout") orm.RegisterSchemaWithIndexName(model.Notification{}, "notification") api.RegisterSchema() diff --git a/model/notification.go b/model/notification.go index 711477e9..49b14f28 100644 --- a/model/notification.go +++ b/model/notification.go @@ -7,33 +7,33 @@ import ( type NotificationType string const ( - NotificationTypeNotification NotificationType = "NOTIFICATION" - NotificationTypeTodo NotificationType = "TODO" + NotificationTypeNotification NotificationType = "notification" + NotificationTypeTodo NotificationType = "todo" ) type MessageType string const ( - MessageTypeNews MessageType = "NEWS" - MessageTypeAlerting MessageType = "ALERTING" - MessageTypeMigration MessageType = "MIGRATION" + MessageTypeNews MessageType = "news" + MessageTypeAlerting MessageType = "alerting" + MessageTypeMigration MessageType = "migration" ) type NotificationStatus string const ( - NotificationStatusNew NotificationStatus = "NEW" - NotificationStatusRead NotificationStatus = "READ" + NotificationStatusNew NotificationStatus = "new" + NotificationStatusRead NotificationStatus = "read" ) type Notification struct { orm.ORMObjectBase - UserId string `json:"user_id,omitempty" elastic_mapping:"user_id: { type: keyword }"` - NotificationType NotificationType `json:"notification_type,omitempty" elastic_mapping:"notification_type:{type:keyword,fields:{text: {type: text}}}"` - MessageType MessageType `json:"message_type,omitempty" elastic_mapping:"message_type:{type:keyword,fields:{text: {type: text}}}"` - Status NotificationStatus `json:"status,omitempty" elastic_mapping:"status: { type: keyword }"` - Title string `json:"title,omitempty" elastic_mapping:"title: { type: keyword }"` - Body string `json:"body,omitempty" elastic_mapping:"body: { type: keyword }"` - Link string `json:"link,omitempty" elastic_mapping:"link: { type: keyword }"` + UserId string `json:"user_id,omitempty" elastic_mapping:"user_id: { type: keyword }"` + Type NotificationType `json:"type,omitempty" elastic_mapping:"type:{type:keyword,fields:{text: {type: text}}}"` + MessageType MessageType `json:"message_type,omitempty" elastic_mapping:"message_type:{type:keyword,fields:{text: {type: text}}}"` + Status NotificationStatus `json:"status,omitempty" elastic_mapping:"status: { type: keyword }"` + Title string `json:"title,omitempty" elastic_mapping:"title: { type: keyword }"` + Body string `json:"body,omitempty" elastic_mapping:"body: { type: keyword }"` + Link string `json:"link,omitempty" elastic_mapping:"link: { type: keyword }"` } diff --git a/plugin/api/notification/api.go b/plugin/api/notification/api.go index 5ac6a5a3..176ba865 100644 --- a/plugin/api/notification/api.go +++ b/plugin/api/notification/api.go @@ -10,6 +10,6 @@ type NotificationAPI struct { func InitAPI() { notification := NotificationAPI{} - api.HandleAPIMethod(api.GET, "/notification/_search", notification.RequireLogin(notification.listNotifications)) + api.HandleAPIMethod(api.POST, "/notification/_search", notification.RequireLogin(notification.listNotifications)) api.HandleAPIMethod(api.POST, "/notification/read", notification.RequireLogin(notification.setNotificationsRead)) } diff --git a/plugin/api/notification/notification.go b/plugin/api/notification/notification.go index 9dc3c1d4..c0a0d1f4 100644 --- a/plugin/api/notification/notification.go +++ b/plugin/api/notification/notification.go @@ -4,7 +4,6 @@ import ( "errors" "fmt" "net/http" - "strconv" "time" log "github.com/cihub/seelog" @@ -15,6 +14,13 @@ import ( "infini.sh/framework/core/util" ) +type ListNotificationsRequest struct { + From int `json:"from"` + Size int `json:"size"` + Status []model.NotificationStatus `json:"status"` + Types []model.NotificationType `json:"types"` +} + func (h *NotificationAPI) listNotifications(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { user, err := rbac.FromUserContext(req.Context()) if err != nil { @@ -29,34 +35,40 @@ func (h *NotificationAPI) listNotifications(w http.ResponseWriter, req *http.Req return } - var ( - queryDSL = `{ - "sort": [ - { "created": {"order": "desc"} } - ], - "query": { - "bool": { "must": [ - { "term": {"user_id": { "value": "%s" } } }, - { "term": {"status": { "value": "%s" } } } - ] } - }, - "size": %d, "from": %d - }` - strSize = h.GetParameterOrDefault(req, "size", "20") - strFrom = h.GetParameterOrDefault(req, "from", "0") - ) - size, _ := strconv.Atoi(strSize) - if size <= 0 { - size = 20 + var reqData = ListNotificationsRequest{ + From: 0, + Size: 20, + Status: []model.NotificationStatus{model.NotificationStatusNew}, + Types: []model.NotificationType{model.NotificationTypeNotification}, } - from, _ := strconv.Atoi(strFrom) - if from < 0 { - from = 0 + err = h.DecodeJSON(req, &reqData) + if err != nil { + log.Error("failed to parse request: ", err) + h.WriteError(w, err.Error(), http.StatusBadRequest) + return } + var ( + queryDSL = util.MapStr{ + "sort": []util.MapStr{ + {"created": util.MapStr{"order": "desc"}}, + }, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + {"term": util.MapStr{"user_id": util.MapStr{"value": user.UserId}}}, + {"terms": util.MapStr{"status": reqData.Status}}, + {"terms": util.MapStr{"type": reqData.Types}}, + }, + }, + }, + "size": reqData.Size, "from": reqData.From, + } + ) + q := orm.Query{} - queryDSL = fmt.Sprintf(queryDSL, user.UserId, model.NotificationStatusNew, size, from) - q.RawQuery = util.UnsafeStringToBytes(queryDSL) + log.Infof(util.MustToJSON(queryDSL)) + q.RawQuery = util.MustToJSONBytes(queryDSL) err, res := orm.Search(&model.Notification{}, &q) if err != nil { @@ -69,7 +81,8 @@ func (h *NotificationAPI) listNotifications(w http.ResponseWriter, req *http.Req } type SetNotificationsReadRequest struct { - Ids []string `json:"ids"` + Ids []string `json:"ids"` + Types []model.NotificationType `json:"types"` } func (h *NotificationAPI) setNotificationsRead(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { @@ -98,20 +111,45 @@ func (h *NotificationAPI) setNotificationsRead(w http.ResponseWriter, req *http. queryDsl := util.MapStr{ "query": util.MapStr{ "bool": util.MapStr{ - "must": []util.MapStr{ - { - "terms": util.MapStr{ - "_id": reqData.Ids, + "should": []util.MapStr{ + util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "terms": util.MapStr{ + "_id": reqData.Ids, + }, + }, + { + "term": util.MapStr{ + "status": util.MapStr{ + "value": model.NotificationStatusNew, + }, + }, + }, + }, }, }, - { - "term": util.MapStr{ - "status": util.MapStr{ - "value": model.NotificationStatusNew, + util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "terms": util.MapStr{ + "notification_type": reqData.Types, + }, + }, + { + "term": util.MapStr{ + "status": util.MapStr{ + "value": model.NotificationStatusNew, + }, + }, + }, }, }, }, }, + "minimum_should_match": 1, }, }, "script": util.MapStr{ diff --git a/plugin/migration/api.go b/plugin/migration/api.go index e988ef76..a458e7f4 100644 --- a/plugin/migration/api.go +++ b/plugin/migration/api.go @@ -7,12 +7,13 @@ package migration import ( "context" "fmt" - "infini.sh/framework/core/api/rbac/enum" "net/http" "strconv" "strings" "time" + "infini.sh/framework/core/api/rbac/enum" + log "github.com/cihub/seelog" "infini.sh/console/model" "infini.sh/framework/core/api" @@ -81,7 +82,7 @@ func (h *APIHandler) createDataMigrationTask(w http.ResponseWriter, req *http.Re clusterTaskConfig.Cluster.Target.Distribution = dstClusterCfg.Distribution t := task2.Task{ Metadata: task2.Metadata{ - Type: "pipeline", + Type: "cluster_migration", Labels: util.MapStr{ "business_id": "cluster_migration", "source_cluster_id": clusterTaskConfig.Cluster.Source.Id, @@ -289,62 +290,12 @@ func (h *APIHandler) stopDataMigrationTask(w http.ResponseWriter, req *http.Requ }, http.StatusNotFound) return } - //if task2.IsEnded(obj.Status) { - // h.WriteJSON(w, util.MapStr{ - // "success": true, - // }, 200) - // return - //} - //query all pipeline task(scroll/bulk_indexing) and then stop it - err = stopPipelineTasks(id) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) + if task2.IsEnded(obj.Status) { + h.WriteJSON(w, util.MapStr{ + "success": true, + }, 200) return } - if obj.Metadata.Labels != nil && obj.Metadata.Labels["business_id"] == "cluster_migration" { - //update status of subtask to pending stop - query := util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "parent_id": util.MapStr{ - "value": obj.ID, - }, - }, - }, - { - "term": util.MapStr{ - "status": util.MapStr{ - "value": task2.StatusRunning, - }, - }, - }, - { - "term": util.MapStr{ - "metadata.labels.business_id": util.MapStr{ - "value": "index_migration", - }, - }, - }, - }, - }, - } - queryDsl := util.MapStr{ - "query": query, - "script": util.MapStr{ - "source": fmt.Sprintf("ctx._source['status'] = '%s'", task2.StatusPendingStop), - }, - } - - err = orm.UpdateBy(obj, util.MustToJSONBytes(queryDsl)) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - } obj.Status = task2.StatusPendingStop err = orm.Update(nil, &obj) if err != nil { @@ -912,48 +863,6 @@ func (h *APIHandler) countDocuments(w http.ResponseWriter, req *http.Request, ps h.WriteJSON(w, countRes, http.StatusOK) } -func (h *APIHandler) getMigrationTaskLog(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - id := ps.MustGetParameter("task_id") - query := util.MapStr{ - "sort": []util.MapStr{ - { - "timestamp": util.MapStr{ - "order": "asc", - }, - }, - }, - "query": util.MapStr{ - "bool": util.MapStr{ - "minimum_should_match": 1, - "should": []util.MapStr{ - { - "term": util.MapStr{ - "parent_id": util.MapStr{ - "value": id, - }, - }, - }, { - "term": util.MapStr{ - "id": util.MapStr{ - "value": id, - }, - }, - }, - }, - }, - }, - } - - q := &orm.Query{ - RawQuery: util.MustToJSONBytes(query), - } - err, _ := orm.Search(task2.Log{}, q) - if err != nil { - h.WriteError(w, err.Error(), http.StatusInternalServerError) - log.Error(err) - } -} - func (h *APIHandler) updateDataMigrationTaskStatus(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { id := ps.MustGetParameter("task_id") @@ -1029,65 +938,6 @@ func (h *APIHandler) validateMultiType(w http.ResponseWriter, req *http.Request, }, http.StatusOK) } -func stopPipelineTasks(parentID string) error { - queryDsl := util.MapStr{ - "size": 1000, - "query": util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "parent_id": util.MapStr{ - "value": parentID, - }, - }, - }, - { - "terms": util.MapStr{ - "metadata.labels.pipeline_id": []string{"es_scroll", "bulk_indexing"}, - }, - }, - }, - }, - }, - } - q := orm.Query{ - RawQuery: util.MustToJSONBytes(queryDsl), - } - err, result := orm.Search(task2.Task{}, &q) - if err != nil { - return err - } - - for _, hit := range result.Result { - buf, err := util.ToJSONBytes(hit) - if err != nil { - return err - } - tk := task2.Task{} - err = util.FromJSONBytes(buf, &tk) - if err != nil { - return err - } - if tk.Metadata.Labels != nil { - if instID, ok := tk.Metadata.Labels["execution_instance_id"].(string); ok { - inst := model.Instance{} - inst.ID = instID - _, err = orm.Get(&inst) - if err != nil { - return err - } - err = inst.StopPipeline(context.Background(), tk.ID) - if err != nil { - log.Error(err) - continue - } - } - } - } - return nil -} - func getMajorTaskStatsFromInstances(majorTaskID string) (taskStats MajorTaskState, err error) { taskQuery := util.MapStr{ "size": 500, diff --git a/plugin/migration/pipeline.go b/plugin/migration/pipeline.go index f7ccfccd..2b12b8b9 100644 --- a/plugin/migration/pipeline.go +++ b/plugin/migration/pipeline.go @@ -154,19 +154,10 @@ func (p *DispatcherProcessor) Process(ctx *pipeline.Context) error { t.Status = task2.StatusError tn := time.Now() t.CompletedTime = &tn - p.saveTaskAndWriteLog(&t, &task2.Log{ - ID: util.GetUUID(), - TaskId: t.ID, - Status: task2.StatusError, - Type: t.Metadata.Type, - Config: t.Config, - Result: &task2.LogResult{ - Success: false, - Error: err.Error(), - }, - Message: fmt.Sprintf("failed to handling task [%s]: [%v]", t.ID, err), - Timestamp: time.Now().UTC(), - }, "") + p.saveTaskAndWriteLog(&t, "", &task2.TaskResult{ + Success: false, + Error: err.Error(), + }, fmt.Sprintf("failed to handling task [%s]: [%v]", t.ID, err)) } } //es index refresh @@ -220,30 +211,63 @@ func (p *DispatcherProcessor) handleReadyMajorTask(taskItem *task2.Task) error { }, } + // saved is_split if the following steps failed + defer func() { + p.sendMajorTaskNotification(taskItem) + p.saveTaskAndWriteLog(taskItem, "", &task2.TaskResult{ + Success: true, + }, fmt.Sprintf("success to start task [%s]", taskItem.ID)) + }() + esClient := elastic.GetClient(p.config.Elasticsearch) _, err := esClient.UpdateByQuery(p.config.IndexName, util.MustToJSONBytes(queryDsl)) if err != nil { - return err - } - taskLog := &task2.Log{ - ID: util.GetUUID(), - TaskId: taskItem.ID, - Status: task2.StatusRunning, - Type: taskItem.Metadata.Type, - Config: taskItem.Config, - Result: &task2.LogResult{ - Success: true, - }, - Message: fmt.Sprintf("success to start task [%s]", taskItem.ID), - Timestamp: time.Now().UTC(), + log.Errorf("failed to update sub task status, err: %v", err) + return nil } taskItem.Status = task2.StatusRunning - p.sendMajorTaskNotification(taskItem) - p.saveTaskAndWriteLog(taskItem, taskLog, "") return nil } func (p *DispatcherProcessor) handlePendingStopMajorTask(taskItem *task2.Task) error { + //update status of subtask to pending stop + query := util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "parent_id": util.MapStr{ + "value": taskItem.ID, + }, + }, + }, + { + "terms": util.MapStr{ + "status": []string{task2.StatusRunning, task2.StatusReady}, + }, + }, + { + "term": util.MapStr{ + "metadata.labels.business_id": util.MapStr{ + "value": "index_migration", + }, + }, + }, + }, + }, + } + queryDsl := util.MapStr{ + "query": query, + "script": util.MapStr{ + "source": fmt.Sprintf("ctx._source['status'] = '%s'", task2.StatusPendingStop), + }, + } + + err := orm.UpdateBy(taskItem, util.MustToJSONBytes(queryDsl)) + if err != nil { + log.Errorf("failed to update sub task status, err: %v", err) + return nil + } //check whether all pipeline task is stopped or not, then update task status q := util.MapStr{ "size": 200, @@ -257,7 +281,7 @@ func (p *DispatcherProcessor) handlePendingStopMajorTask(taskItem *task2.Task) e }, { "terms": util.MapStr{ - "status": []string{task2.StatusRunning, task2.StatusPendingStop}, + "status": []string{task2.StatusRunning, task2.StatusPendingStop, task2.StatusReady}, }, }, }, @@ -266,21 +290,14 @@ func (p *DispatcherProcessor) handlePendingStopMajorTask(taskItem *task2.Task) e } tasks, err := p.getTasks(q) if err != nil { - return err + log.Errorf("failed to get sub tasks, err: %v", err) + return nil } // all subtask stopped or error or complete if len(tasks) == 0 { taskItem.Status = task2.StatusStopped p.sendMajorTaskNotification(taskItem) - p.saveTaskAndWriteLog(taskItem, &task2.Log{ - ID: util.GetUUID(), - TaskId: taskItem.ID, - Status: task2.StatusStopped, - Type: taskItem.Metadata.Type, - Config: taskItem.Config, - Message: fmt.Sprintf("task [%s] is stopped", taskItem.ID), - Timestamp: time.Now().UTC(), - }, "") + p.saveTaskAndWriteLog(taskItem, "", nil, fmt.Sprintf("task [%s] is stopped", taskItem.ID)) } return nil } @@ -295,15 +312,7 @@ func (p *DispatcherProcessor) handleRunningMajorTask(taskItem *task2.Task) error tn := time.Now() taskItem.CompletedTime = &tn p.sendMajorTaskNotification(taskItem) - p.saveTaskAndWriteLog(taskItem, &task2.Log{ - ID: util.GetUUID(), - TaskId: taskItem.ID, - Status: taskItem.Status, - Type: taskItem.Metadata.Type, - Config: taskItem.Config, - Message: fmt.Sprintf("task [%s] is complete", taskItem.ID), - Timestamp: time.Now().UTC(), - }, "") + p.saveTaskAndWriteLog(taskItem, "", nil, fmt.Sprintf("task [%s] is complete", taskItem.ID)) } return nil } @@ -355,24 +364,16 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error { tn := time.Now() taskItem.CompletedTime = &tn - p.saveTaskAndWriteLog(taskItem, &task2.Log{ - ID: util.GetUUID(), - TaskId: taskItem.ID, - Status: taskItem.Status, - Type: taskItem.Metadata.Type, - Config: taskItem.Config, - Result: &task2.LogResult{ - Success: state.Error == "", - Error: state.Error, - }, - Message: fmt.Sprintf("task [%s] is complete", taskItem.ID), - Timestamp: time.Now().UTC(), - }, "") + p.saveTaskAndWriteLog(taskItem, "", &task2.TaskResult{ + Success: state.Error == "", + Error: state.Error, + }, fmt.Sprintf("task [%s] is complete", taskItem.ID)) } else { if state.RunningPhase == 1 && taskItem.Metadata.Labels["running_phase"] == float64(1) { ptasks, err := p.getPipelineTasks(taskItem.ID) if err != nil { - return err + log.Errorf("failed to get pipeline tasks, err: %v", err) + return nil } var bulkTask *task2.Task for i, t := range ptasks { @@ -391,16 +392,18 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error { inst.ID = instID _, err = orm.Get(inst) if err != nil { + log.Errorf("failed to get instance, err: %v", err) return err } err = inst.CreatePipeline(util.MustToJSONBytes(bulkTask.Config)) if err != nil { + log.Errorf("failed to create bulk_indexing pipeline, err: %v", err) return err } taskItem.Metadata.Labels["running_phase"] = 2 } } - p.saveTaskAndWriteLog(taskItem, nil, "wait_for") + p.saveTaskAndWriteLog(taskItem, "wait_for", nil, "") } } return nil @@ -410,6 +413,7 @@ func (p *DispatcherProcessor) handlePendingStopSubTask(taskItem *task2.Task) err //check whether all pipeline task is stopped or not, then update task status ptasks, err := p.getPipelineTasks(taskItem.ID) if err != nil { + log.Errorf("failed to get pipeline tasks, err: %v", err) return err } var taskIDs []string @@ -437,24 +441,7 @@ func (p *DispatcherProcessor) handlePendingStopSubTask(taskItem *task2.Task) err } searchRes, err := esClient.SearchWithRawQueryDSL(p.config.LogIndexName, util.MustToJSONBytes(q)) if err != nil { - return err - } - if len(searchRes.Hits.Hits) == 0 { - //check instance available - if instID, ok := taskItem.Metadata.Labels["execution_instance_id"].(string); ok { - inst := model.Instance{} - inst.ID = instID - _, err = orm.Get(&inst) - if err != nil { - return err - } - err = inst.TryConnectWithTimeout(time.Second) - if err != nil { - if errors.Is(err, syscall.ECONNREFUSED) { - return fmt.Errorf("stoping task [%s] error: %w", taskItem.ID, err) - } - } - } + log.Errorf("failed to get latest pipeline status, err: %v", err) return nil } MainLoop: @@ -467,7 +454,8 @@ MainLoop: inst.ID = instID _, err = orm.Get(&inst) if err != nil { - return err + log.Errorf("failed to get instance, err: %v", err) + return nil } hasStopped := true for _, pipelineID := range taskIDs { @@ -519,15 +507,7 @@ MainLoop: p.state[instanceID] = st } } - p.saveTaskAndWriteLog(taskItem, &task2.Log{ - ID: util.GetUUID(), - TaskId: taskItem.ID, - Status: task2.StatusStopped, - Type: taskItem.Metadata.Type, - Config: taskItem.Config, - Message: fmt.Sprintf("task [%s] is stopped", taskItem.ID), - Timestamp: time.Now().UTC(), - }, "") + p.saveTaskAndWriteLog(taskItem, "", nil, fmt.Sprintf("task [%s] is stopped", taskItem.ID)) return nil } @@ -544,7 +524,7 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error { ptasks, err := p.getPipelineTasks(taskItem.ID) if err != nil { log.Errorf("getPipelineTasks failed, err: %+v", err) - return err + return nil } for i, t := range ptasks { if t.Metadata.Labels != nil { @@ -779,19 +759,9 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error { taskItem.Status = task2.StatusRunning taskItem.StartTimeInMillis = time.Now().UnixMilli() - taskLog := &task2.Log{ - ID: util.GetUUID(), - TaskId: taskItem.ID, - Status: task2.StatusRunning, - Type: taskItem.Metadata.Type, - Config: taskItem.Config, - Result: &task2.LogResult{ - Success: true, - }, - Message: fmt.Sprintf("dispatch task [%s] to instance ", taskItem.ID), - Timestamp: time.Now().UTC(), - } - p.saveTaskAndWriteLog(taskItem, taskLog, "wait_for") + p.saveTaskAndWriteLog(taskItem, "wait_for", &task2.TaskResult{ + Success: true, + }, fmt.Sprintf("dispatch task [%s] to instance [%s]", taskItem.ID, instance.ID)) return nil } @@ -903,32 +873,32 @@ func (p *DispatcherProcessor) getMigrationTasks(size int) ([]task2.Task, error) return p.getTasks(queryDsl) } -func (p *DispatcherProcessor) saveTaskAndWriteLog(taskItem *task2.Task, logItem *task2.Log, refresh string) { +func (p *DispatcherProcessor) saveTaskAndWriteLog(taskItem *task2.Task, refresh string, taskResult *task2.TaskResult, message string) { esClient := elastic.GetClient(p.config.Elasticsearch) _, err := esClient.Index(p.config.IndexName, "", taskItem.ID, taskItem, refresh) if err != nil { log.Errorf("failed to update task, err: %v", err) } - if logItem != nil { + if message != "" { event.SaveLog(event.Event{ Metadata: event.EventMetadata{ - Category: "migration", + Category: "task", Name: "logging", Datatype: "event", Labels: util.MapStr{ - "task_id": logItem.TaskId, + "task_type": taskItem.Metadata.Type, + "task_id": taskItem.ID, "parent_task_id": taskItem.ParentId, "retry_no": taskItem.RetryTimes, }, }, Fields: util.MapStr{ - "migration": util.MapStr{ + "task": util.MapStr{ "logging": util.MapStr{ - "config": logItem.Config, - "context": logItem.Context, - "status": logItem.Status, - "message": logItem.Message, - "result": logItem.Result, + "config": util.MustToJSON(taskItem.Config), + "status": taskItem.Status, + "message": message, + "result": taskResult, }, }, }, @@ -1046,7 +1016,7 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro Status: task2.StatusReady, StartTimeInMillis: time.Now().UnixMilli(), Metadata: task2.Metadata{ - Type: "pipeline", + Type: "index_migration", Labels: util.MapStr{ "business_id": "index_migration", "source_cluster_id": clusterMigrationTask.Cluster.Source.Id, @@ -1134,7 +1104,7 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro Runnable: true, Status: task2.StatusReady, Metadata: task2.Metadata{ - Type: "pipeline", + Type: "index_migration", Labels: util.MapStr{ "business_id": "index_migration", "source_cluster_id": clusterMigrationTask.Cluster.Source.Id, @@ -1397,7 +1367,7 @@ func (p *DispatcherProcessor) getMajorTaskState(majorTask *task2.Task) (taskStat res, err := esClient.SearchWithRawQueryDSL(p.config.IndexName, util.MustToJSONBytes(query)) if err != nil { log.Errorf("search es failed, err: %v", err) - return taskState, err + return taskState, nil } if v, ok := res.Aggregations["total_docs"].Value.(float64); ok { taskState.IndexDocs = v @@ -1502,13 +1472,13 @@ func (p *DispatcherProcessor) sendMajorTaskNotification(taskItem *task2.Task) { return } notification := &model.Notification{ - UserId: util.ToString(creatorID), - NotificationType: model.NotificationTypeNotification, - MessageType: model.MessageTypeMigration, - Status: model.NotificationStatusNew, - Title: title, - Body: body, - Link: link, + UserId: util.ToString(creatorID), + Type: model.NotificationTypeNotification, + MessageType: model.MessageTypeMigration, + Status: model.NotificationStatusNew, + Title: title, + Body: body, + Link: link, } err = orm.Create(nil, notification) if err != nil {