From 4b12d372f1e434a3eb0f10e529d2e0b4b8ce6b6a Mon Sep 17 00:00:00 2001 From: Kassian Sun Date: Tue, 11 Apr 2023 08:30:59 +0800 Subject: [PATCH] [migration] task log copy task labels --- plugin/migration/pipeline.go | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/plugin/migration/pipeline.go b/plugin/migration/pipeline.go index f3b84b69..f0623e19 100644 --- a/plugin/migration/pipeline.go +++ b/plugin/migration/pipeline.go @@ -675,7 +675,7 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error { "batch_size_in_mb": getMapValue(cfgm, "target.bulk.batch_size_in_mb"), "batch_size_in_docs": getMapValue(cfgm, "target.bulk.batch_size_in_docs"), "invalid_queue": "bulk_indexing_400", - "compress": getMapValue(cfgm, "target.bulk.compress"), + "compress": getMapValue(cfgm, "target.bulk.compress"), //"retry_rules": util.MapStr{ // "default": false, // "retry_4xx": false, @@ -893,17 +893,18 @@ func (p *DispatcherProcessor) saveTaskAndWriteLog(taskItem *task2.Task, refresh } func writeLog(taskItem *task2.Task, taskResult *task2.TaskResult, message string) { + labels := util.MapStr{} + labels.Update(util.MapStr(taskItem.Metadata.Labels)) + labels["task_type"] = taskItem.Metadata.Type + labels["task_id"] = taskItem.ID + labels["parent_task_id"] = taskItem.ParentId + labels["retry_no"] = taskItem.RetryTimes event.SaveLog(event.Event{ Metadata: event.EventMetadata{ Category: "task", Name: "logging", Datatype: "event", - Labels: util.MapStr{ - "task_type": taskItem.Metadata.Type, - "task_id": taskItem.ID, - "parent_task_id": taskItem.ParentId, - "retry_no": taskItem.RetryTimes, - }, + Labels: labels, }, Fields: util.MapStr{ "task": util.MapStr{ @@ -1015,7 +1016,7 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro "max_worker_size": clusterMigrationTask.Settings.Bulk.MaxWorkerSize, "idle_timeout_in_seconds": clusterMigrationTask.Settings.Bulk.IdleTimeoutInSeconds, "slice_size": clusterMigrationTask.Settings.Bulk.SliceSize, - "compress": clusterMigrationTask.Settings.Bulk.Compress, + "compress": clusterMigrationTask.Settings.Bulk.Compress, }, } indexParameters := util.MapStr{