From 86c86155379dca5a7d13ee523fb32f3d58872467 Mon Sep 17 00:00:00 2001 From: Kassian Sun Date: Fri, 19 May 2023 18:34:25 +0800 Subject: [PATCH] [comparison] always clean queue after task ended --- plugin/migration/index_comparison/index_comparison.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/plugin/migration/index_comparison/index_comparison.go b/plugin/migration/index_comparison/index_comparison.go index 3505a75a..f31d6ffe 100644 --- a/plugin/migration/index_comparison/index_comparison.go +++ b/plugin/migration/index_comparison/index_comparison.go @@ -344,6 +344,7 @@ func (p *processor) handleRunningSubTask(taskItem *task.Task) error { p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ Success: false, }, fmt.Sprintf("index comparison failed, source/target doc count unmatch: %d / %d", sourceDocs, targetDocs)) + p.cleanGatewayQueue(taskItem) p.scheduler.DecrInstanceJobs(instanceID) return nil } @@ -372,6 +373,7 @@ func (p *processor) handleRunningSubTask(taskItem *task.Task) error { Success: false, Error: "data unmatch", }, fmt.Sprintf("index comparison failed, only in source: %d, only in target: %d, diff in both: %d (sample doc ids: [%s], [%s] [%s])", onlyInSource, onlyInTarget, diffBoth, onlyInSourceKeys, onlyInTargetKeys, diffBothKeys)) + p.cleanGatewayQueue(taskItem) p.scheduler.DecrInstanceJobs(instanceID) return nil } @@ -385,6 +387,7 @@ func (p *processor) handleRunningSubTask(taskItem *task.Task) error { Success: false, Error: "pipeline task failed", }, "index comparison failed") + p.cleanGatewayQueue(taskItem) p.scheduler.DecrInstanceJobs(instanceID) return nil } @@ -395,6 +398,7 @@ func (p *processor) handleRunningSubTask(taskItem *task.Task) error { p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ Success: true, }, "index comparison completed") + p.cleanGatewayQueue(taskItem) p.scheduler.DecrInstanceJobs(instanceID) return nil