[migration] index_migration & pipeline define struct

This commit is contained in:
Kassian Sun 2023-04-13 11:25:40 +08:00
parent db6d33f22b
commit c73c44724d
3 changed files with 197 additions and 188 deletions

View File

@ -49,7 +49,7 @@ type APIHandler struct {
}
func (h *APIHandler) createDataMigrationTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
clusterTaskConfig := &ElasticDataConfig{}
clusterTaskConfig := &ClusterMigrationTaskConfig{}
err := h.DecodeJSON(req, clusterTaskConfig)
if err != nil {
log.Error(err)
@ -90,10 +90,10 @@ func (h *APIHandler) createDataMigrationTask(w http.ResponseWriter, req *http.Re
"source_total_docs": totalDocs,
},
},
Cancellable: true,
Runnable: false,
Status: task2.StatusInit,
Config: clusterTaskConfig,
Cancellable: true,
Runnable: false,
Status: task2.StatusInit,
ConfigString: util.MustToJSON(clusterTaskConfig),
}
t.ID = util.GetUUID()
err = orm.Create(nil, &t)
@ -174,7 +174,7 @@ func (h *APIHandler) searchDataMigrationTask(w http.ResponseWriter, req *http.Re
for _, hit := range searchRes.Hits.Hits {
sourceM := util.MapStr(hit.Source)
buf := util.MustToJSONBytes(sourceM["config"])
dataConfig := ElasticDataConfig{}
dataConfig := ClusterMigrationTaskConfig{}
err = util.FromJSONBytes(buf, &dataConfig)
if err != nil {
log.Error(err)
@ -317,12 +317,14 @@ func (h *APIHandler) stopDataMigrationTask(w http.ResponseWriter, req *http.Requ
}
func getTaskConfig(task *task2.Task, config interface{}) error {
configBytes, err := util.ToJSONBytes(task.Config)
if task.Config_ == nil {
return util.FromJSONBytes([]byte(task.ConfigString), config)
}
buf, err := util.ToJSONBytes(task.Config_)
if err != nil {
return err
}
return util.FromJSONBytes(configBytes, config)
return util.FromJSONBytes(buf, config)
}
func getIndexRefreshInterval(indexNames []string, targetESClient elastic.API) (map[string]string, error) {
@ -377,7 +379,7 @@ func (h *APIHandler) getIndexRefreshIntervals(w http.ResponseWriter, req *http.R
}, http.StatusNotFound)
return
}
taskConfig := &ElasticDataConfig{}
taskConfig := &ClusterMigrationTaskConfig{}
err = getTaskConfig(&obj, taskConfig)
if err != nil {
log.Error(err)
@ -415,7 +417,7 @@ func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.R
}, http.StatusNotFound)
return
}
taskConfig := &ElasticDataConfig{}
taskConfig := &ClusterMigrationTaskConfig{}
err = getTaskConfig(&obj, taskConfig)
if err != nil {
log.Error(err)
@ -461,7 +463,7 @@ func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.R
"index_name": migrationConfig.LogIndexName,
}
}
obj.Config = taskConfig
obj.ConfigString = util.MustToJSON(taskConfig)
obj.Metadata.Labels["completed_indices"] = completedIndices
h.WriteJSON(w, obj, http.StatusOK)
}
@ -735,17 +737,16 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt
startTime = subTasks[0].StartTimeInMillis
}
for i, ptask := range subTasks {
var (
cfg map[string]interface{}
ok bool
)
if cfg, ok = ptask.Config.(map[string]interface{}); !ok {
cfg := IndexMigrationTaskConfig{}
err := getTaskConfig(&ptask, &cfg)
if err != nil {
log.Errorf("failed to get task config, err: %v", err)
continue
}
start, _ := util.MapStr(cfg).GetValue("source.start")
end, _ := util.MapStr(cfg).GetValue("source.end")
start := cfg.Source.Start
end := cfg.Source.End
if i == 0 {
step, _ := util.MapStr(cfg).GetValue("source.step")
step := cfg.Source.Step
taskInfo["step"] = step
}
var durationInMS int64 = 0
@ -786,7 +787,7 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt
}
}
partitionTotalDocs, _ := util.MapStr(cfg).GetValue("source.doc_count")
partitionTotalDocs := cfg.Source.DocCount
partitionTaskInfos = append(partitionTaskInfos, util.MapStr{
"task_id": ptask.ID,
"status": ptask.Status,

View File

@ -4,9 +4,13 @@
package migration
import "fmt"
import (
"fmt"
type ElasticDataConfig struct {
"infini.sh/framework/core/util"
)
type ClusterMigrationTaskConfig struct {
Cluster struct {
Source ClusterInfo `json:"source"`
Target ClusterInfo `json:"target"`
@ -20,15 +24,8 @@ type ElasticDataConfig struct {
Docs int `json:"docs"`
Timeout string `json:"timeout"`
} `json:"scroll"`
Bulk struct {
Docs int `json:"docs"`
StoreSizeInMB int `json:"store_size_in_mb"`
MaxWorkerSize int `json:"max_worker_size"`
IdleTimeoutInSeconds int `json:"idle_timeout_in_seconds"`
SliceSize int `json:"slice_size"`
Compress bool `json:"compress"`
} `json:"bulk"`
Execution ExecutionConfig `json:"execution"`
Bulk ClusterMigrationBulkConfig `json:"bulk"`
Execution ExecutionConfig `json:"execution"`
} `json:"settings"`
Creator struct {
Name string `json:"name"`
@ -36,6 +33,15 @@ type ElasticDataConfig struct {
} `json:"creator"`
}
type ClusterMigrationBulkConfig struct {
Docs int `json:"docs"`
StoreSizeInMB int `json:"store_size_in_mb"`
MaxWorkerSize int `json:"max_worker_size"`
IdleTimeoutInSeconds int `json:"idle_timeout_in_seconds"`
SliceSize int `json:"slice_size"`
Compress bool `json:"compress"`
}
type ExecutionConfig struct {
TimeWindow []TimeWindowItem `json:"time_window"`
Nodes struct {
@ -65,6 +71,7 @@ type IndexConfig struct {
Percent float64 `json:"percent,omitempty"`
ErrorPartitions int `json:"error_partitions,omitempty"`
}
type IndexPartition struct {
FieldType string `json:"field_type"`
FieldName string `json:"field_name"`
@ -108,3 +115,57 @@ type IndexStateInfo struct {
ErrorPartitions int
IndexDocs float64
}
type IndexMigrationTaskConfig struct {
Source IndexMigrationSourceConfig `json:"source"`
Target IndexMigrationTargetConfig `json:"target"`
Execution ExecutionConfig `json:"execution"`
}
type IndexMigrationSourceConfig struct {
ClusterId string `json:"cluster_id"`
Indices string `json:"indices"`
SliceSize int `json:"slice_size"`
BatchSize int `json:"batch_size"`
ScrollTime string `json:"scroll_time"`
IndexRename util.MapStr `json:"index_rename,omitempty"`
TypeRename util.MapStr `json:"type_rename,omitempty"`
QueryString string `json:"query_string,omitempty'`
QueryDSL util.MapStr `json:"query_dsl,omitempty"`
// Parition configs
Start float64 `json:"start"`
End float64 `json:"end"`
Docs int64 `json:"docs"`
DocCount int64 `json:"doc_count"`
Step interface{} `json:"step"`
PartitionId int `json:"partition_id"`
}
type IndexMigrationBulkConfig struct {
BatchSizeInDocs int `json:"batch_size_in_docs"`
BatchSizeInMB int `json:"batch_size_in_mb"`
MaxWorkerSize int `json:"max_worker_size"`
IdleTimeoutInSeconds int `json:"idle_timeout_in_seconds"`
SliceSize int `json:"slice_size"`
Compress bool `json:"compress"`
}
type IndexMigrationTargetConfig struct {
ClusterId string `json:"cluster_id"`
Bulk IndexMigrationBulkConfig `json:"bulk"`
QueryDSL util.MapStr `json:"query_dsl,omitempty"`
}
type PipelineTaskLoggingConfig struct {
Enabled bool `json:"enabled"`
}
type PipelineTaskConfig struct {
Name string `json:"name"`
Logging PipelineTaskLoggingConfig `json:"logging"`
Labels util.MapStr `json:"labels"`
AutoStart bool `json:"auto_start"`
KeepRunning bool `json:"keep_running"`
Processor []util.MapStr `json:"processor"`
}

View File

@ -13,7 +13,6 @@ import (
"time"
log "github.com/cihub/seelog"
"github.com/mitchellh/mapstructure"
"infini.sh/console/model"
"infini.sh/framework/core/config"
@ -345,7 +344,7 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error {
p.saveTaskAndWriteLog(taskItem, "", &task2.TaskResult{
Success: state.Error == "",
Error: state.Error,
}, fmt.Sprintf("task [%s] completed", taskItem.ID))
}, fmt.Sprintf("task [%s] finished with status [%s]", taskItem.ID, taskItem.Status))
p.cleanGatewayPipelines(taskItem, state.PipelineIds)
} else {
if state.RunningPhase == 1 && taskItem.Metadata.Labels["running_phase"] == float64(1) {
@ -374,7 +373,7 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error {
log.Errorf("failed to get instance, err: %v", err)
return err
}
err = inst.CreatePipeline(util.MustToJSONBytes(bulkTask.Config))
err = inst.CreatePipeline([]byte(bulkTask.ConfigString))
if err != nil {
log.Errorf("failed to create bulk_indexing pipeline, err: %v", err)
return err
@ -516,10 +515,8 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error {
return nil
}
for i, t := range ptasks {
ptasks[i].RetryTimes = taskItem.RetryTimes + 1
if t.Metadata.Labels != nil {
if cfg, ok := ptasks[i].Config.(map[string]interface{}); ok {
util.MapStr(cfg).Put("labels.retry_no", taskItem.RetryTimes+1)
}
if t.Metadata.Labels["pipeline_id"] == "es_scroll" {
scrollTask = &ptasks[i]
} else if t.Metadata.Labels["pipeline_id"] == "bulk_indexing" {
@ -537,35 +534,24 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error {
pids = append(pids, taskItem.ParentId...)
pids = append(pids, taskItem.ID)
scrollID := util.GetUUID()
var (
cfg map[string]interface{}
ok bool
)
if cfg, ok = taskItem.Config.(map[string]interface{}); !ok {
return fmt.Errorf("got wrong config [%v] with task [%s]", taskItem.Config, taskItem.ID)
}
cfgm := util.MapStr(cfg)
var (
sourceClusterID string
targetClusterID string
)
if sourceClusterID, ok = getMapValue(cfgm, "source.cluster_id").(string); !ok {
return fmt.Errorf("got wrong source cluster id of task [%v]", *taskItem)
}
if targetClusterID, ok = getMapValue(cfgm, "target.cluster_id").(string); !ok {
return fmt.Errorf("got wrong target cluster id of task [%v]", *taskItem)
cfg := IndexMigrationTaskConfig{}
err := getTaskConfig(taskItem, &cfg)
if err != nil {
return fmt.Errorf("got wrong config [%v] with task [%s], err: %v", taskItem.ConfigString, taskItem.ID, err)
}
sourceClusterID := cfg.Source.ClusterId
targetClusterID := cfg.Target.ClusterId
esConfig := elastic.GetConfig(sourceClusterID)
esTargetConfig := elastic.GetConfig(targetClusterID)
docType := common.GetClusterDocType(targetClusterID)
if len(taskItem.ParentId) == 0 {
return fmt.Errorf("got wrong parent id of task [%v]", *taskItem)
}
queryDsl := getMapValue(cfgm, "source.query_dsl")
queryDsl := cfg.Source.QueryDSL
scrollQueryDsl := util.MustToJSON(util.MapStr{
"query": queryDsl,
})
indexName := getMapValue(cfgm, "source.indices")
indexName := cfg.Source.Indices
scrollTask = &task2.Task{
ParentId: pids,
Runnable: true,
@ -579,24 +565,24 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error {
"unique_index_name": taskItem.Metadata.Labels["unique_index_name"],
},
},
Config: util.MapStr{
"name": scrollID,
"logging": util.MapStr{
"enabled": true,
RetryTimes: taskItem.RetryTimes,
ConfigString: util.MustToJSON(PipelineTaskConfig{
Name: scrollID,
Logging: PipelineTaskLoggingConfig{
Enabled: true,
},
"labels": util.MapStr{
Labels: util.MapStr{
"parent_task_id": pids,
"unique_index_name": taskItem.Metadata.Labels["unique_index_name"],
"retry_no": taskItem.RetryTimes,
},
"auto_start": true,
"keep_running": false,
"processor": []util.MapStr{
AutoStart: true,
KeepRunning: false,
Processor: []util.MapStr{
{
"es_scroll": util.MapStr{
"remove_type": docType == "",
"slice_size": getMapValue(cfgm, "source.slice_size"),
"batch_size": getMapValue(cfgm, "source.batch_size"),
"slice_size": cfg.Source.SliceSize,
"batch_size": cfg.Source.BatchSize,
"indices": indexName,
"elasticsearch": sourceClusterID,
"elasticsearch_config": util.MapStr{
@ -612,14 +598,14 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error {
},
},
"partition_size": 1,
"scroll_time": getMapValue(cfgm, "source.scroll_time"),
"scroll_time": cfg.Source.ScrollTime,
"query_dsl": scrollQueryDsl,
"index_rename": getMapValue(cfgm, "source.index_rename"),
"type_rename": getMapValue(cfgm, "source.type_rename"),
"index_rename": cfg.Source.IndexRename,
"type_rename": cfg.Source.TypeRename,
},
},
},
},
}),
}
scrollTask.ID = scrollID
@ -637,36 +623,31 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error {
"unique_index_name": taskItem.Metadata.Labels["unique_index_name"],
},
},
Config: util.MapStr{
"name": bulkID,
"logging": util.MapStr{
"enabled": true,
RetryTimes: taskItem.RetryTimes,
ConfigString: util.MustToJSON(PipelineTaskConfig{
Name: bulkID,
Logging: PipelineTaskLoggingConfig{
Enabled: true,
},
"labels": util.MapStr{
Labels: util.MapStr{
"parent_task_id": pids,
"unique_index_name": taskItem.Metadata.Labels["unique_index_name"],
"retry_no": taskItem.RetryTimes,
},
"auto_start": true,
"keep_running": false,
"processor": []util.MapStr{
AutoStart: true,
KeepRunning: false,
Processor: []util.MapStr{
{
"bulk_indexing": util.MapStr{
"detect_active_queue": false,
"bulk": util.MapStr{
"batch_size_in_mb": getMapValue(cfgm, "target.bulk.batch_size_in_mb"),
"batch_size_in_docs": getMapValue(cfgm, "target.bulk.batch_size_in_docs"),
"batch_size_in_mb": cfg.Target.Bulk.BatchSizeInMB,
"batch_size_in_docs": cfg.Target.Bulk.BatchSizeInDocs,
"invalid_queue": "bulk_indexing_400",
"compress": getMapValue(cfgm, "target.bulk.compress"),
//"retry_rules": util.MapStr{
// "default": false,
// "retry_4xx": false,
// "retry_429": true,
//},
"compress": cfg.Target.Bulk.Compress,
},
"max_worker_size": getMapValue(cfgm, "target.bulk.max_worker_size"),
"num_of_slices": getMapValue(cfgm, "target.bulk.slice_size"),
"idle_timeout_in_seconds": getMapValue(cfgm, "target.bulk.idle_timeout_in_seconds"),
"max_worker_size": cfg.Target.Bulk.MaxWorkerSize,
"num_of_slices": cfg.Target.Bulk.SliceSize,
"idle_timeout_in_seconds": cfg.Target.Bulk.IdleTimeoutInSeconds,
"elasticsearch": targetClusterID,
"elasticsearch_config": util.MapStr{
"name": targetClusterID,
@ -681,7 +662,7 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error {
},
},
},
},
}),
}
bulkTask.ID = bulkID
}
@ -707,7 +688,7 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error {
}
//call instance api to create pipeline task
err = instance.CreatePipeline(util.MustToJSONBytes(scrollTask.Config))
err = instance.CreatePipeline([]byte(scrollTask.ConfigString))
if err != nil {
log.Errorf("create scroll pipeline failed, err: %+v", err)
return err
@ -765,15 +746,13 @@ func (p *DispatcherProcessor) getPreferenceInstance(majorTaskID string) (instanc
majorTask.ID = majorTaskID
_, err = orm.Get(&majorTask)
if err != nil {
log.Errorf("failed to get major task, err: %v", err)
return
}
cfg := ElasticDataConfig{}
buf, err := util.ToJSONBytes(majorTask.Config)
if err != nil {
return
}
err = util.FromJSONBytes(buf, &cfg)
cfg := ClusterMigrationTaskConfig{}
err = getTaskConfig(&majorTask, &cfg)
if err != nil {
log.Errorf("failed to get task config, err: %v", err)
return
}
var (
@ -891,7 +870,7 @@ func writeLog(taskItem *task2.Task, taskResult *task2.TaskResult, message string
Fields: util.MapStr{
"task": util.MapStr{
"logging": util.MapStr{
"config": util.MustToJSON(taskItem.Config),
"config": taskItem.ConfigString,
"status": taskItem.Status,
"message": message,
"result": taskResult,
@ -913,40 +892,40 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro
return nil
}
buf := util.MustToJSONBytes(taskItem.Config)
clusterMigrationTask := ElasticDataConfig{}
err := util.FromJSONBytes(buf, &clusterMigrationTask)
clusterMigrationTask := ClusterMigrationTaskConfig{}
err := getTaskConfig(taskItem, &clusterMigrationTask)
if err != nil {
log.Errorf("failed to get task config, err: %v", err)
return err
}
defer func() {
taskItem.Config = clusterMigrationTask
taskItem.ConfigString = util.MustToJSON(clusterMigrationTask)
}()
esSourceClient := elastic.GetClient(clusterMigrationTask.Cluster.Source.Id)
targetType := common.GetClusterDocType(clusterMigrationTask.Cluster.Target.Id)
for _, index := range clusterMigrationTask.Indices {
source := util.MapStr{
"cluster_id": clusterMigrationTask.Cluster.Source.Id,
"indices": index.Source.Name,
"slice_size": clusterMigrationTask.Settings.Scroll.SliceSize,
"batch_size": clusterMigrationTask.Settings.Scroll.Docs,
"scroll_time": clusterMigrationTask.Settings.Scroll.Timeout,
source := IndexMigrationSourceConfig{
ClusterId: clusterMigrationTask.Cluster.Source.Id,
Indices: index.Source.Name,
SliceSize: clusterMigrationTask.Settings.Scroll.SliceSize,
BatchSize: clusterMigrationTask.Settings.Scroll.Docs,
ScrollTime: clusterMigrationTask.Settings.Scroll.Timeout,
}
if index.IndexRename != nil {
source["index_rename"] = index.IndexRename
source.IndexRename = index.IndexRename
}
if index.Target.Name != "" {
source["index_rename"] = util.MapStr{
source.IndexRename = util.MapStr{
index.Source.Name: index.Target.Name,
}
}
if index.TypeRename != nil {
source["type_rename"] = index.TypeRename
source.TypeRename = index.TypeRename
}
if v, ok := index.RawFilter.(string); ok {
source["query_string"] = v
source.QueryString = v
} else {
var must []interface{}
if index.RawFilter != nil {
@ -954,7 +933,7 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro
}
if index.Source.DocType != "" {
if index.Target.DocType != "" {
source["type_rename"] = util.MapStr{
source.TypeRename = util.MapStr{
index.Source.DocType: index.Target.DocType,
}
}
@ -965,13 +944,13 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro
})
} else {
if targetType != "" {
source["type_rename"] = util.MapStr{
source.TypeRename = util.MapStr{
"*": index.Target.DocType,
}
}
}
if len(must) > 0 {
source["query_dsl"] = util.MapStr{
source.QueryDSL = util.MapStr{
"bool": util.MapStr{
"must": must,
},
@ -990,20 +969,20 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro
})
}
target := util.MapStr{
"cluster_id": clusterMigrationTask.Cluster.Target.Id,
"bulk": util.MapStr{
"batch_size_in_mb": clusterMigrationTask.Settings.Bulk.StoreSizeInMB,
"batch_size_in_docs": clusterMigrationTask.Settings.Bulk.Docs,
"max_worker_size": clusterMigrationTask.Settings.Bulk.MaxWorkerSize,
"idle_timeout_in_seconds": clusterMigrationTask.Settings.Bulk.IdleTimeoutInSeconds,
"slice_size": clusterMigrationTask.Settings.Bulk.SliceSize,
"compress": clusterMigrationTask.Settings.Bulk.Compress,
target := IndexMigrationTargetConfig{
ClusterId: clusterMigrationTask.Cluster.Target.Id,
Bulk: IndexMigrationBulkConfig{
BatchSizeInMB: clusterMigrationTask.Settings.Bulk.StoreSizeInMB,
BatchSizeInDocs: clusterMigrationTask.Settings.Bulk.Docs,
MaxWorkerSize: clusterMigrationTask.Settings.Bulk.MaxWorkerSize,
IdleTimeoutInSeconds: clusterMigrationTask.Settings.Bulk.IdleTimeoutInSeconds,
SliceSize: clusterMigrationTask.Settings.Bulk.SliceSize,
Compress: clusterMigrationTask.Settings.Bulk.Compress,
},
}
indexParameters := util.MapStr{
"source": source,
"target": target,
indexParameters := IndexMigrationTaskConfig{
Source: source,
Target: target,
}
indexMigrationTask := task2.Task{
ParentId: []string{taskItem.ID},
@ -1022,7 +1001,7 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro
"unique_index_name": index.Source.GetUniqueIndexName(),
},
},
Config: indexParameters,
ConfigString: util.MustToJSON(indexParameters),
}
indexMigrationTask.ID = util.GetUUID()
@ -1034,7 +1013,7 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro
FieldType: index.Partition.FieldType,
Step: index.Partition.Step,
//Filter: index.RawFilter,
Filter: source["query_dsl"],
Filter: source.QueryDSL,
}
partitions, err := elastic.GetPartitions(partitionQ, esSourceClient)
if err != nil {
@ -1052,20 +1031,14 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro
continue
}
partitionID++
partitionSource := util.MapStr{
"start": partition.Start,
"end": partition.End,
"doc_count": partition.Docs,
"step": index.Partition.Step,
"partition_id": partitionID,
}
for k, v := range source {
if k == "query_string" {
continue
}
partitionSource[k] = v
}
partitionSource["query_dsl"] = partition.Filter
partitionSource := source
partitionSource.Start = partition.Start
partitionSource.End = partition.End
partitionSource.DocCount = partition.Docs
partitionSource.Step = index.Partition.Step
partitionSource.PartitionId = partitionID
partitionSource.QueryDSL = partition.Filter
partitionSource.QueryString = ""
var must []interface{}
if partition.Other {
@ -1084,8 +1057,9 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro
if targetMust != nil {
must = append(must, targetMust...)
}
partitionTarget := target
if len(must) > 0 {
target["query_dsl"] = util.MapStr{
partitionTarget.QueryDSL = util.MapStr{
"query": util.MapStr{
"bool": util.MapStr{
"must": must,
@ -1109,22 +1083,22 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro
"unique_index_name": index.Source.GetUniqueIndexName(),
},
},
Config: util.MapStr{
"source": partitionSource,
"target": target,
"execution": clusterMigrationTask.Settings.Execution,
},
ConfigString: util.MustToJSON(IndexMigrationTaskConfig{
Source: partitionSource,
Target: partitionTarget,
Execution: clusterMigrationTask.Settings.Execution,
}),
}
partitionMigrationTask.ID = util.GetUUID()
err = orm.Create(nil, &partitionMigrationTask)
delete(target, "query_dsl")
target.QueryDSL = nil
if err != nil {
return fmt.Errorf("store index migration task(partition) error: %w", err)
}
}
} else {
source["doc_count"] = index.Source.Docs
source.DocCount = index.Source.Docs
err = orm.Create(nil, &indexMigrationTask)
if err != nil {
return fmt.Errorf("store index migration task error: %w", err)
@ -1237,23 +1211,13 @@ func (p *DispatcherProcessor) getTaskCompleteState(subTask *task2.Task) (*TaskCo
log.Errorf("search task log from es failed, err: %v", err)
return nil, err
}
var (
cfg map[string]interface{}
ok bool
)
if cfg, ok = subTask.Config.(map[string]interface{}); !ok {
cfg := IndexMigrationTaskConfig{}
err = getTaskConfig(subTask, &cfg)
if err != nil {
log.Errorf("failed to get task config, err: %v", err)
return nil, fmt.Errorf("got wrong config of task %v", *subTask)
}
totalDocsVal, err := util.MapStr(cfg).GetValue("source.doc_count")
if err != nil {
log.Errorf("failed to get source.doc_count, err: %v", err)
return nil, err
}
totalDocs, err := util.ExtractInt(totalDocsVal)
if err != nil {
log.Errorf("failed to extract source.doc_count, err: %v", err)
return nil, err
}
totalDocs := cfg.Source.DocCount
var (
indexDocs int64
@ -1457,7 +1421,8 @@ func (p *DispatcherProcessor) getInstanceTaskState() (map[string]DispatcherState
}
func (p *DispatcherProcessor) sendMajorTaskNotification(taskItem *task2.Task) {
config, err := p.extractTaskConfig(taskItem)
config := ClusterMigrationTaskConfig{}
err := getTaskConfig(taskItem, &config)
if err != nil {
log.Errorf("failed to parse config info from major task, id: %s, err: %v", taskItem.ID, err)
return
@ -1500,21 +1465,3 @@ func (p *DispatcherProcessor) sendMajorTaskNotification(taskItem *task2.Task) {
}
return
}
func (p *DispatcherProcessor) extractTaskConfig(taskItem *task2.Task) (*ElasticDataConfig, error) {
origConfig, ok := taskItem.Config.(ElasticDataConfig)
if ok {
return &origConfig, nil
}
rawConfig, ok := taskItem.Config.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("failed to extract configuration from major task, id: %s, type: %T", taskItem.ID, taskItem.Config)
}
config := &ElasticDataConfig{}
err := mapstructure.Decode(rawConfig, config)
if err != nil {
return nil, err
}
return config, nil
}