[migration] get es configs dynamically before creating pipeline
This commit is contained in:
parent
9b3005e005
commit
7c375a8d73
|
@ -473,8 +473,6 @@ func (p *DispatcherProcessor) handleSplitSubTask(taskItem *task2.Task) error {
|
||||||
}
|
}
|
||||||
sourceClusterID := cfg.Source.ClusterId
|
sourceClusterID := cfg.Source.ClusterId
|
||||||
targetClusterID := cfg.Target.ClusterId
|
targetClusterID := cfg.Target.ClusterId
|
||||||
esConfig := elastic.GetConfig(sourceClusterID)
|
|
||||||
esTargetConfig := elastic.GetConfig(targetClusterID)
|
|
||||||
docType := common.GetClusterDocType(targetClusterID)
|
docType := common.GetClusterDocType(targetClusterID)
|
||||||
if len(taskItem.ParentId) == 0 {
|
if len(taskItem.ParentId) == 0 {
|
||||||
return fmt.Errorf("got wrong parent id of task [%v]", *taskItem)
|
return fmt.Errorf("got wrong parent id of task [%v]", *taskItem)
|
||||||
|
@ -518,12 +516,6 @@ func (p *DispatcherProcessor) handleSplitSubTask(taskItem *task2.Task) error {
|
||||||
"batch_size": cfg.Source.BatchSize,
|
"batch_size": cfg.Source.BatchSize,
|
||||||
"indices": indexName,
|
"indices": indexName,
|
||||||
"elasticsearch": sourceClusterID,
|
"elasticsearch": sourceClusterID,
|
||||||
"elasticsearch_config": util.MapStr{
|
|
||||||
"name": sourceClusterID,
|
|
||||||
"enabled": true,
|
|
||||||
"endpoint": esConfig.Endpoint,
|
|
||||||
"basic_auth": esConfig.BasicAuth,
|
|
||||||
},
|
|
||||||
"queue": util.MapStr{
|
"queue": util.MapStr{
|
||||||
"name": scrollID,
|
"name": scrollID,
|
||||||
"labels": util.MapStr{
|
"labels": util.MapStr{
|
||||||
|
@ -583,12 +575,6 @@ func (p *DispatcherProcessor) handleSplitSubTask(taskItem *task2.Task) error {
|
||||||
"num_of_slices": cfg.Target.Bulk.SliceSize,
|
"num_of_slices": cfg.Target.Bulk.SliceSize,
|
||||||
"idle_timeout_in_seconds": cfg.Target.Bulk.IdleTimeoutInSeconds,
|
"idle_timeout_in_seconds": cfg.Target.Bulk.IdleTimeoutInSeconds,
|
||||||
"elasticsearch": targetClusterID,
|
"elasticsearch": targetClusterID,
|
||||||
"elasticsearch_config": util.MapStr{
|
|
||||||
"name": targetClusterID,
|
|
||||||
"enabled": true,
|
|
||||||
"endpoint": esTargetConfig.Endpoint,
|
|
||||||
"basic_auth": esTargetConfig.BasicAuth,
|
|
||||||
},
|
|
||||||
"queues": util.MapStr{
|
"queues": util.MapStr{
|
||||||
"type": "scroll_docs",
|
"type": "scroll_docs",
|
||||||
"migration_task_id": taskItem.ID,
|
"migration_task_id": taskItem.ID,
|
||||||
|
|
|
@ -68,6 +68,12 @@ func (p *processor) handleReadyPipelineTask(taskItem *task.Task) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err = p.fillDynamicESConfig(taskItem, &cfg)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to update task config, err: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
cfg.Labels["retry_times"] = taskItem.RetryTimes
|
cfg.Labels["retry_times"] = taskItem.RetryTimes
|
||||||
|
|
||||||
// call instance api to create pipeline task
|
// call instance api to create pipeline task
|
||||||
|
@ -434,3 +440,33 @@ func (p *processor) saveTaskAndWriteLog(taskItem *task.Task, taskResult *task.Ta
|
||||||
migration_util.WriteLog(taskItem, taskResult, message)
|
migration_util.WriteLog(taskItem, taskResult, message)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: remove after implementing dynamic register elasticsearch configs at gateway
|
||||||
|
func (p *processor) fillDynamicESConfig(taskItem *task.Task, pipelineTaskConfig *migration_model.PipelineTaskConfig) error {
|
||||||
|
for _, p := range pipelineTaskConfig.Processor {
|
||||||
|
for k, v := range p {
|
||||||
|
v, ok := v.(map[string]interface{})
|
||||||
|
if !ok {
|
||||||
|
return errors.New("invalid processor config")
|
||||||
|
}
|
||||||
|
processorConfig := util.MapStr(v)
|
||||||
|
if k == "bulk_indexing" || k == "es_scroll" {
|
||||||
|
elasticsearchID := migration_util.GetMapStringValue(processorConfig, "elasticsearch")
|
||||||
|
if elasticsearchID == "" {
|
||||||
|
return fmt.Errorf("invalid task config found for task [%s]", taskItem.ID)
|
||||||
|
}
|
||||||
|
esConfig := elastic.GetConfigNoPanic(elasticsearchID)
|
||||||
|
if esConfig == nil {
|
||||||
|
return fmt.Errorf("can't load elasticsearch config of [%s] for task task [%s]", elasticsearchID, taskItem.ID)
|
||||||
|
}
|
||||||
|
processorConfig["elasticsearch_config"] = util.MapStr{
|
||||||
|
"name": elasticsearchID,
|
||||||
|
"enabled": true,
|
||||||
|
"endpoint": esConfig.Endpoint,
|
||||||
|
"basic_auth": esConfig.BasicAuth,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue