Merge pull request '[migration] get es configs dynamically before creating pipeline' (#79) from feature/migration into master

This commit is contained in:
silenceqi 2023-04-28 16:20:08 +08:00
commit 62b5b94c56
2 changed files with 36 additions and 14 deletions

View File

@ -473,8 +473,6 @@ func (p *DispatcherProcessor) handleSplitSubTask(taskItem *task2.Task) error {
}
sourceClusterID := cfg.Source.ClusterId
targetClusterID := cfg.Target.ClusterId
esConfig := elastic.GetConfig(sourceClusterID)
esTargetConfig := elastic.GetConfig(targetClusterID)
docType := common.GetClusterDocType(targetClusterID)
if len(taskItem.ParentId) == 0 {
return fmt.Errorf("got wrong parent id of task [%v]", *taskItem)
@ -518,12 +516,6 @@ func (p *DispatcherProcessor) handleSplitSubTask(taskItem *task2.Task) error {
"batch_size": cfg.Source.BatchSize,
"indices": indexName,
"elasticsearch": sourceClusterID,
"elasticsearch_config": util.MapStr{
"name": sourceClusterID,
"enabled": true,
"endpoint": esConfig.Endpoint,
"basic_auth": esConfig.BasicAuth,
},
"queue": util.MapStr{
"name": scrollID,
"labels": util.MapStr{
@ -583,12 +575,6 @@ func (p *DispatcherProcessor) handleSplitSubTask(taskItem *task2.Task) error {
"num_of_slices": cfg.Target.Bulk.SliceSize,
"idle_timeout_in_seconds": cfg.Target.Bulk.IdleTimeoutInSeconds,
"elasticsearch": targetClusterID,
"elasticsearch_config": util.MapStr{
"name": targetClusterID,
"enabled": true,
"endpoint": esTargetConfig.Endpoint,
"basic_auth": esTargetConfig.BasicAuth,
},
"queues": util.MapStr{
"type": "scroll_docs",
"migration_task_id": taskItem.ID,

View File

@ -68,6 +68,12 @@ func (p *processor) handleReadyPipelineTask(taskItem *task.Task) error {
return err
}
err = p.fillDynamicESConfig(taskItem, &cfg)
if err != nil {
log.Errorf("failed to update task config, err: %v", err)
return err
}
cfg.Labels["retry_times"] = taskItem.RetryTimes
// call instance api to create pipeline task
@ -434,3 +440,33 @@ func (p *processor) saveTaskAndWriteLog(taskItem *task.Task, taskResult *task.Ta
migration_util.WriteLog(taskItem, taskResult, message)
}
}
// TODO: remove after implementing dynamic register elasticsearch configs at gateway
func (p *processor) fillDynamicESConfig(taskItem *task.Task, pipelineTaskConfig *migration_model.PipelineTaskConfig) error {
for _, p := range pipelineTaskConfig.Processor {
for k, v := range p {
v, ok := v.(map[string]interface{})
if !ok {
return errors.New("invalid processor config")
}
processorConfig := util.MapStr(v)
if k == "bulk_indexing" || k == "es_scroll" {
elasticsearchID := migration_util.GetMapStringValue(processorConfig, "elasticsearch")
if elasticsearchID == "" {
return fmt.Errorf("invalid task config found for task [%s]", taskItem.ID)
}
esConfig := elastic.GetConfigNoPanic(elasticsearchID)
if esConfig == nil {
return fmt.Errorf("can't load elasticsearch config of [%s] for task task [%s]", elasticsearchID, taskItem.ID)
}
processorConfig["elasticsearch_config"] = util.MapStr{
"name": elasticsearchID,
"enabled": true,
"endpoint": esConfig.Endpoint,
"basic_auth": esConfig.BasicAuth,
}
}
}
}
return nil
}