[comparison] fill execution_instance_id for pipeline tasks

This commit is contained in:
Kassian Sun 2023-05-19 15:25:32 +08:00
parent 02243264e8
commit be09c69212
1 changed files with 3 additions and 0 deletions

View File

@ -273,18 +273,21 @@ func (p *processor) handleScheduleSubTask(taskItem *task.Task) error {
p.cleanGatewayQueue(taskItem) p.cleanGatewayQueue(taskItem)
sourceDumpTask.RetryTimes = taskItem.RetryTimes sourceDumpTask.RetryTimes = taskItem.RetryTimes
sourceDumpTask.Metadata.Labels["execution_instance_id"] = instanceID
sourceDumpTask.Status = task.StatusReady sourceDumpTask.Status = task.StatusReady
err = orm.Update(nil, sourceDumpTask) err = orm.Update(nil, sourceDumpTask)
if err != nil { if err != nil {
return fmt.Errorf("update source dump pipeline task error: %w", err) return fmt.Errorf("update source dump pipeline task error: %w", err)
} }
targetDumpTask.RetryTimes = taskItem.RetryTimes targetDumpTask.RetryTimes = taskItem.RetryTimes
targetDumpTask.Metadata.Labels["execution_instance_id"] = instanceID
targetDumpTask.Status = task.StatusReady targetDumpTask.Status = task.StatusReady
err = orm.Update(nil, targetDumpTask) err = orm.Update(nil, targetDumpTask)
if err != nil { if err != nil {
return fmt.Errorf("update target dump pipeline task error: %w", err) return fmt.Errorf("update target dump pipeline task error: %w", err)
} }
diffTask.RetryTimes = taskItem.RetryTimes diffTask.RetryTimes = taskItem.RetryTimes
diffTask.Metadata.Labels["execution_instance_id"] = instanceID
diffTask.Status = task.StatusInit diffTask.Status = task.StatusInit
err = orm.Update(nil, diffTask) err = orm.Update(nil, diffTask)
if err != nil { if err != nil {