[migration] task log copy task labels

This commit is contained in:
Kassian Sun 2023-04-11 08:30:59 +08:00
parent 4eb742d47e
commit 4b12d372f1
1 changed files with 9 additions and 8 deletions

View File

@ -675,7 +675,7 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error {
"batch_size_in_mb": getMapValue(cfgm, "target.bulk.batch_size_in_mb"), "batch_size_in_mb": getMapValue(cfgm, "target.bulk.batch_size_in_mb"),
"batch_size_in_docs": getMapValue(cfgm, "target.bulk.batch_size_in_docs"), "batch_size_in_docs": getMapValue(cfgm, "target.bulk.batch_size_in_docs"),
"invalid_queue": "bulk_indexing_400", "invalid_queue": "bulk_indexing_400",
"compress": getMapValue(cfgm, "target.bulk.compress"), "compress": getMapValue(cfgm, "target.bulk.compress"),
//"retry_rules": util.MapStr{ //"retry_rules": util.MapStr{
// "default": false, // "default": false,
// "retry_4xx": false, // "retry_4xx": false,
@ -893,17 +893,18 @@ func (p *DispatcherProcessor) saveTaskAndWriteLog(taskItem *task2.Task, refresh
} }
func writeLog(taskItem *task2.Task, taskResult *task2.TaskResult, message string) { func writeLog(taskItem *task2.Task, taskResult *task2.TaskResult, message string) {
labels := util.MapStr{}
labels.Update(util.MapStr(taskItem.Metadata.Labels))
labels["task_type"] = taskItem.Metadata.Type
labels["task_id"] = taskItem.ID
labels["parent_task_id"] = taskItem.ParentId
labels["retry_no"] = taskItem.RetryTimes
event.SaveLog(event.Event{ event.SaveLog(event.Event{
Metadata: event.EventMetadata{ Metadata: event.EventMetadata{
Category: "task", Category: "task",
Name: "logging", Name: "logging",
Datatype: "event", Datatype: "event",
Labels: util.MapStr{ Labels: labels,
"task_type": taskItem.Metadata.Type,
"task_id": taskItem.ID,
"parent_task_id": taskItem.ParentId,
"retry_no": taskItem.RetryTimes,
},
}, },
Fields: util.MapStr{ Fields: util.MapStr{
"task": util.MapStr{ "task": util.MapStr{
@ -1015,7 +1016,7 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro
"max_worker_size": clusterMigrationTask.Settings.Bulk.MaxWorkerSize, "max_worker_size": clusterMigrationTask.Settings.Bulk.MaxWorkerSize,
"idle_timeout_in_seconds": clusterMigrationTask.Settings.Bulk.IdleTimeoutInSeconds, "idle_timeout_in_seconds": clusterMigrationTask.Settings.Bulk.IdleTimeoutInSeconds,
"slice_size": clusterMigrationTask.Settings.Bulk.SliceSize, "slice_size": clusterMigrationTask.Settings.Bulk.SliceSize,
"compress": clusterMigrationTask.Settings.Bulk.Compress, "compress": clusterMigrationTask.Settings.Bulk.Compress,
}, },
} }
indexParameters := util.MapStr{ indexParameters := util.MapStr{