From be09c69212a4797493d2e99e85ce37ed904eb1d2 Mon Sep 17 00:00:00 2001 From: Kassian Sun Date: Fri, 19 May 2023 15:25:32 +0800 Subject: [PATCH] [comparison] fill execution_instance_id for pipeline tasks --- plugin/migration/index_comparison/index_comparison.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/plugin/migration/index_comparison/index_comparison.go b/plugin/migration/index_comparison/index_comparison.go index 1e5cff57..3505a75a 100644 --- a/plugin/migration/index_comparison/index_comparison.go +++ b/plugin/migration/index_comparison/index_comparison.go @@ -273,18 +273,21 @@ func (p *processor) handleScheduleSubTask(taskItem *task.Task) error { p.cleanGatewayQueue(taskItem) sourceDumpTask.RetryTimes = taskItem.RetryTimes + sourceDumpTask.Metadata.Labels["execution_instance_id"] = instanceID sourceDumpTask.Status = task.StatusReady err = orm.Update(nil, sourceDumpTask) if err != nil { return fmt.Errorf("update source dump pipeline task error: %w", err) } targetDumpTask.RetryTimes = taskItem.RetryTimes + targetDumpTask.Metadata.Labels["execution_instance_id"] = instanceID targetDumpTask.Status = task.StatusReady err = orm.Update(nil, targetDumpTask) if err != nil { return fmt.Errorf("update target dump pipeline task error: %w", err) } diffTask.RetryTimes = taskItem.RetryTimes + diffTask.Metadata.Labels["execution_instance_id"] = instanceID diffTask.Status = task.StatusInit err = orm.Update(nil, diffTask) if err != nil {