[comparison] always clean queue after task ended

This commit is contained in:
Kassian Sun 2023-05-19 18:34:25 +08:00 committed by Gitea
parent 6e98f7287b
commit 86c8615537
1 changed files with 4 additions and 0 deletions

View File

@ -344,6 +344,7 @@ func (p *processor) handleRunningSubTask(taskItem *task.Task) error {
p.saveTaskAndWriteLog(taskItem, &task.TaskResult{
Success: false,
}, fmt.Sprintf("index comparison failed, source/target doc count unmatch: %d / %d", sourceDocs, targetDocs))
p.cleanGatewayQueue(taskItem)
p.scheduler.DecrInstanceJobs(instanceID)
return nil
}
@ -372,6 +373,7 @@ func (p *processor) handleRunningSubTask(taskItem *task.Task) error {
Success: false,
Error: "data unmatch",
}, fmt.Sprintf("index comparison failed, only in source: %d, only in target: %d, diff in both: %d (sample doc ids: [%s], [%s] [%s])", onlyInSource, onlyInTarget, diffBoth, onlyInSourceKeys, onlyInTargetKeys, diffBothKeys))
p.cleanGatewayQueue(taskItem)
p.scheduler.DecrInstanceJobs(instanceID)
return nil
}
@ -385,6 +387,7 @@ func (p *processor) handleRunningSubTask(taskItem *task.Task) error {
Success: false,
Error: "pipeline task failed",
}, "index comparison failed")
p.cleanGatewayQueue(taskItem)
p.scheduler.DecrInstanceJobs(instanceID)
return nil
}
@ -395,6 +398,7 @@ func (p *processor) handleRunningSubTask(taskItem *task.Task) error {
p.saveTaskAndWriteLog(taskItem, &task.TaskResult{
Success: true,
}, "index comparison completed")
p.cleanGatewayQueue(taskItem)
p.scheduler.DecrInstanceJobs(instanceID)
return nil