From 87d28c9cfc31ccce95ca7964b01136259faa0560 Mon Sep 17 00:00:00 2001 From: Kassian Sun Date: Tue, 4 Apr 2023 16:02:31 +0800 Subject: [PATCH 1/2] [migration] fix pending_stop checking logic --- plugin/migration/pipeline.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/plugin/migration/pipeline.go b/plugin/migration/pipeline.go index 2b12b8b9..36ebc4bc 100644 --- a/plugin/migration/pipeline.go +++ b/plugin/migration/pipeline.go @@ -274,6 +274,13 @@ func (p *DispatcherProcessor) handlePendingStopMajorTask(taskItem *task2.Task) e "query": util.MapStr{ "bool": util.MapStr{ "must": []util.MapStr{ + { + "term": util.MapStr{ + "parent_id": util.MapStr{ + "value": taskItem.ID, + }, + }, + }, { "term": util.MapStr{ "metadata.labels.business_id": "index_migration", From b16ff4e888c0f855066cc883e46ded0dfaac4178 Mon Sep 17 00:00:00 2001 From: Kassian Sun Date: Tue, 4 Apr 2023 16:17:49 +0800 Subject: [PATCH 2/2] [migration] track user actions in task logging --- plugin/migration/api.go | 7 +++++ plugin/migration/pipeline.go | 50 +++++++++++++++++++----------------- 2 files changed, 34 insertions(+), 23 deletions(-) diff --git a/plugin/migration/api.go b/plugin/migration/api.go index a458e7f4..45ff1057 100644 --- a/plugin/migration/api.go +++ b/plugin/migration/api.go @@ -242,6 +242,10 @@ func (h *APIHandler) startDataMigration(w http.ResponseWriter, req *http.Request return } + writeLog(&obj, &task2.TaskResult{ + Success: true, + }, "task status manually set to ready") + if obj.Metadata.Labels != nil && obj.Metadata.Labels["business_id"] == "index_migration" && len(obj.ParentId) > 0 { //update status of major task to running query := util.MapStr{ @@ -303,6 +307,9 @@ func (h *APIHandler) stopDataMigrationTask(w http.ResponseWriter, req *http.Requ h.WriteError(w, err.Error(), http.StatusInternalServerError) return } + writeLog(&obj, &task2.TaskResult{ + Success: true, + }, "task status manually set to pending stop") h.WriteJSON(w, util.MapStr{ "success": true, diff --git a/plugin/migration/pipeline.go b/plugin/migration/pipeline.go index 36ebc4bc..c3e167b5 100644 --- a/plugin/migration/pipeline.go +++ b/plugin/migration/pipeline.go @@ -887,32 +887,36 @@ func (p *DispatcherProcessor) saveTaskAndWriteLog(taskItem *task2.Task, refresh log.Errorf("failed to update task, err: %v", err) } if message != "" { - event.SaveLog(event.Event{ - Metadata: event.EventMetadata{ - Category: "task", - Name: "logging", - Datatype: "event", - Labels: util.MapStr{ - "task_type": taskItem.Metadata.Type, - "task_id": taskItem.ID, - "parent_task_id": taskItem.ParentId, - "retry_no": taskItem.RetryTimes, - }, - }, - Fields: util.MapStr{ - "task": util.MapStr{ - "logging": util.MapStr{ - "config": util.MustToJSON(taskItem.Config), - "status": taskItem.Status, - "message": message, - "result": taskResult, - }, - }, - }, - }) + writeLog(taskItem, taskResult, message) } } +func writeLog(taskItem *task2.Task, taskResult *task2.TaskResult, message string) { + event.SaveLog(event.Event{ + Metadata: event.EventMetadata{ + Category: "task", + Name: "logging", + Datatype: "event", + Labels: util.MapStr{ + "task_type": taskItem.Metadata.Type, + "task_id": taskItem.ID, + "parent_task_id": taskItem.ParentId, + "retry_no": taskItem.RetryTimes, + }, + }, + Fields: util.MapStr{ + "task": util.MapStr{ + "logging": util.MapStr{ + "config": util.MustToJSON(taskItem.Config), + "status": taskItem.Status, + "message": message, + "result": taskResult, + }, + }, + }, + }) +} + func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) error { if taskItem.Metadata.Labels == nil { return fmt.Errorf("empty metadata labels, unexpected cluster migration task: %s", util.MustToJSON(taskItem))