diff --git a/plugin/migration/index_comparison/index_comparison.go b/plugin/migration/index_comparison/index_comparison.go index 3505a75a..f31d6ffe 100644 --- a/plugin/migration/index_comparison/index_comparison.go +++ b/plugin/migration/index_comparison/index_comparison.go @@ -344,6 +344,7 @@ func (p *processor) handleRunningSubTask(taskItem *task.Task) error { p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ Success: false, }, fmt.Sprintf("index comparison failed, source/target doc count unmatch: %d / %d", sourceDocs, targetDocs)) + p.cleanGatewayQueue(taskItem) p.scheduler.DecrInstanceJobs(instanceID) return nil } @@ -372,6 +373,7 @@ func (p *processor) handleRunningSubTask(taskItem *task.Task) error { Success: false, Error: "data unmatch", }, fmt.Sprintf("index comparison failed, only in source: %d, only in target: %d, diff in both: %d (sample doc ids: [%s], [%s] [%s])", onlyInSource, onlyInTarget, diffBoth, onlyInSourceKeys, onlyInTargetKeys, diffBothKeys)) + p.cleanGatewayQueue(taskItem) p.scheduler.DecrInstanceJobs(instanceID) return nil } @@ -385,6 +387,7 @@ func (p *processor) handleRunningSubTask(taskItem *task.Task) error { Success: false, Error: "pipeline task failed", }, "index comparison failed") + p.cleanGatewayQueue(taskItem) p.scheduler.DecrInstanceJobs(instanceID) return nil } @@ -395,6 +398,7 @@ func (p *processor) handleRunningSubTask(taskItem *task.Task) error { p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ Success: true, }, "index comparison completed") + p.cleanGatewayQueue(taskItem) p.scheduler.DecrInstanceJobs(instanceID) return nil