From 7c375a8d7314dd780495434bb8d3a8c6d1a9bff3 Mon Sep 17 00:00:00 2001 From: Kassian Sun Date: Fri, 28 Apr 2023 16:13:30 +0800 Subject: [PATCH] [migration] get es configs dynamically before creating pipeline --- plugin/migration/pipeline.go | 14 -------- .../migration/pipeline_task/pipeline_task.go | 36 +++++++++++++++++++ 2 files changed, 36 insertions(+), 14 deletions(-) diff --git a/plugin/migration/pipeline.go b/plugin/migration/pipeline.go index 0ad15909..a65c7e7c 100644 --- a/plugin/migration/pipeline.go +++ b/plugin/migration/pipeline.go @@ -473,8 +473,6 @@ func (p *DispatcherProcessor) handleSplitSubTask(taskItem *task2.Task) error { } sourceClusterID := cfg.Source.ClusterId targetClusterID := cfg.Target.ClusterId - esConfig := elastic.GetConfig(sourceClusterID) - esTargetConfig := elastic.GetConfig(targetClusterID) docType := common.GetClusterDocType(targetClusterID) if len(taskItem.ParentId) == 0 { return fmt.Errorf("got wrong parent id of task [%v]", *taskItem) @@ -518,12 +516,6 @@ func (p *DispatcherProcessor) handleSplitSubTask(taskItem *task2.Task) error { "batch_size": cfg.Source.BatchSize, "indices": indexName, "elasticsearch": sourceClusterID, - "elasticsearch_config": util.MapStr{ - "name": sourceClusterID, - "enabled": true, - "endpoint": esConfig.Endpoint, - "basic_auth": esConfig.BasicAuth, - }, "queue": util.MapStr{ "name": scrollID, "labels": util.MapStr{ @@ -583,12 +575,6 @@ func (p *DispatcherProcessor) handleSplitSubTask(taskItem *task2.Task) error { "num_of_slices": cfg.Target.Bulk.SliceSize, "idle_timeout_in_seconds": cfg.Target.Bulk.IdleTimeoutInSeconds, "elasticsearch": targetClusterID, - "elasticsearch_config": util.MapStr{ - "name": targetClusterID, - "enabled": true, - "endpoint": esTargetConfig.Endpoint, - "basic_auth": esTargetConfig.BasicAuth, - }, "queues": util.MapStr{ "type": "scroll_docs", "migration_task_id": taskItem.ID, diff --git a/plugin/migration/pipeline_task/pipeline_task.go b/plugin/migration/pipeline_task/pipeline_task.go index 3925473b..f776b945 100644 --- a/plugin/migration/pipeline_task/pipeline_task.go +++ b/plugin/migration/pipeline_task/pipeline_task.go @@ -68,6 +68,12 @@ func (p *processor) handleReadyPipelineTask(taskItem *task.Task) error { return err } + err = p.fillDynamicESConfig(taskItem, &cfg) + if err != nil { + log.Errorf("failed to update task config, err: %v", err) + return err + } + cfg.Labels["retry_times"] = taskItem.RetryTimes // call instance api to create pipeline task @@ -434,3 +440,33 @@ func (p *processor) saveTaskAndWriteLog(taskItem *task.Task, taskResult *task.Ta migration_util.WriteLog(taskItem, taskResult, message) } } + +// TODO: remove after implementing dynamic register elasticsearch configs at gateway +func (p *processor) fillDynamicESConfig(taskItem *task.Task, pipelineTaskConfig *migration_model.PipelineTaskConfig) error { + for _, p := range pipelineTaskConfig.Processor { + for k, v := range p { + v, ok := v.(map[string]interface{}) + if !ok { + return errors.New("invalid processor config") + } + processorConfig := util.MapStr(v) + if k == "bulk_indexing" || k == "es_scroll" { + elasticsearchID := migration_util.GetMapStringValue(processorConfig, "elasticsearch") + if elasticsearchID == "" { + return fmt.Errorf("invalid task config found for task [%s]", taskItem.ID) + } + esConfig := elastic.GetConfigNoPanic(elasticsearchID) + if esConfig == nil { + return fmt.Errorf("can't load elasticsearch config of [%s] for task task [%s]", elasticsearchID, taskItem.ID) + } + processorConfig["elasticsearch_config"] = util.MapStr{ + "name": elasticsearchID, + "enabled": true, + "endpoint": esConfig.Endpoint, + "basic_auth": esConfig.BasicAuth, + } + } + } + } + return nil +}