diff --git a/plugin/migration/api.go b/plugin/migration/api.go index a458e7f4..45ff1057 100644 --- a/plugin/migration/api.go +++ b/plugin/migration/api.go @@ -242,6 +242,10 @@ func (h *APIHandler) startDataMigration(w http.ResponseWriter, req *http.Request return } + writeLog(&obj, &task2.TaskResult{ + Success: true, + }, "task status manually set to ready") + if obj.Metadata.Labels != nil && obj.Metadata.Labels["business_id"] == "index_migration" && len(obj.ParentId) > 0 { //update status of major task to running query := util.MapStr{ @@ -303,6 +307,9 @@ func (h *APIHandler) stopDataMigrationTask(w http.ResponseWriter, req *http.Requ h.WriteError(w, err.Error(), http.StatusInternalServerError) return } + writeLog(&obj, &task2.TaskResult{ + Success: true, + }, "task status manually set to pending stop") h.WriteJSON(w, util.MapStr{ "success": true, diff --git a/plugin/migration/pipeline.go b/plugin/migration/pipeline.go index 2b12b8b9..c3e167b5 100644 --- a/plugin/migration/pipeline.go +++ b/plugin/migration/pipeline.go @@ -274,6 +274,13 @@ func (p *DispatcherProcessor) handlePendingStopMajorTask(taskItem *task2.Task) e "query": util.MapStr{ "bool": util.MapStr{ "must": []util.MapStr{ + { + "term": util.MapStr{ + "parent_id": util.MapStr{ + "value": taskItem.ID, + }, + }, + }, { "term": util.MapStr{ "metadata.labels.business_id": "index_migration", @@ -880,32 +887,36 @@ func (p *DispatcherProcessor) saveTaskAndWriteLog(taskItem *task2.Task, refresh log.Errorf("failed to update task, err: %v", err) } if message != "" { - event.SaveLog(event.Event{ - Metadata: event.EventMetadata{ - Category: "task", - Name: "logging", - Datatype: "event", - Labels: util.MapStr{ - "task_type": taskItem.Metadata.Type, - "task_id": taskItem.ID, - "parent_task_id": taskItem.ParentId, - "retry_no": taskItem.RetryTimes, - }, - }, - Fields: util.MapStr{ - "task": util.MapStr{ - "logging": util.MapStr{ - "config": util.MustToJSON(taskItem.Config), - "status": taskItem.Status, - "message": message, - "result": taskResult, - }, - }, - }, - }) + writeLog(taskItem, taskResult, message) } } +func writeLog(taskItem *task2.Task, taskResult *task2.TaskResult, message string) { + event.SaveLog(event.Event{ + Metadata: event.EventMetadata{ + Category: "task", + Name: "logging", + Datatype: "event", + Labels: util.MapStr{ + "task_type": taskItem.Metadata.Type, + "task_id": taskItem.ID, + "parent_task_id": taskItem.ParentId, + "retry_no": taskItem.RetryTimes, + }, + }, + Fields: util.MapStr{ + "task": util.MapStr{ + "logging": util.MapStr{ + "config": util.MustToJSON(taskItem.Config), + "status": taskItem.Status, + "message": message, + "result": taskResult, + }, + }, + }, + }) +} + func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) error { if taskItem.Metadata.Labels == nil { return fmt.Errorf("empty metadata labels, unexpected cluster migration task: %s", util.MustToJSON(taskItem))