From cd85d21ef05df606e049d7c66541ade362e84539 Mon Sep 17 00:00:00 2001 From: Kassian Sun Date: Tue, 18 Apr 2023 15:28:34 +0800 Subject: [PATCH] [migration] split es_scroll/bulk_indexing pipeline task --- plugin/migration/api.go | 67 +- plugin/migration/{ => model}/model.go | 17 +- plugin/migration/model/processor.go | 7 + plugin/migration/pipeline.go | 1132 +++++++---------- .../migration/pipeline_task/pipeline_task.go | 402 ++++++ plugin/migration/util/util.go | 66 + 6 files changed, 955 insertions(+), 736 deletions(-) rename plugin/migration/{ => model}/model.go (93%) create mode 100644 plugin/migration/model/processor.go create mode 100644 plugin/migration/pipeline_task/pipeline_task.go create mode 100644 plugin/migration/util/util.go diff --git a/plugin/migration/api.go b/plugin/migration/api.go index 7a30d8af..5fce6440 100644 --- a/plugin/migration/api.go +++ b/plugin/migration/api.go @@ -8,24 +8,27 @@ import ( "context" "errors" "fmt" - elastic2 "infini.sh/framework/modules/elastic" "net/http" "strconv" "strings" "time" - "infini.sh/framework/core/api/rbac/enum" - log "github.com/cihub/seelog" + "infini.sh/console/model" + migration_model "infini.sh/console/plugin/migration/model" + migration_util "infini.sh/console/plugin/migration/util" + "infini.sh/framework/core/api" "infini.sh/framework/core/api/rbac" + "infini.sh/framework/core/api/rbac/enum" httprouter "infini.sh/framework/core/api/router" "infini.sh/framework/core/elastic" "infini.sh/framework/core/global" "infini.sh/framework/core/orm" task2 "infini.sh/framework/core/task" "infini.sh/framework/core/util" + elastic2 "infini.sh/framework/modules/elastic" ) func InitAPI() { @@ -53,7 +56,7 @@ type APIHandler struct { } func (h *APIHandler) createDataMigrationTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - clusterTaskConfig := &ClusterMigrationTaskConfig{} + clusterTaskConfig := &migration_model.ClusterMigrationTaskConfig{} err := h.DecodeJSON(req, clusterTaskConfig) if err != nil { log.Error(err) @@ -178,7 +181,7 @@ func (h *APIHandler) searchDataMigrationTask(w http.ResponseWriter, req *http.Re for _, hit := range searchRes.Hits.Hits { sourceM := util.MapStr(hit.Source) buf := util.MustToJSONBytes(sourceM["config"]) - dataConfig := ClusterMigrationTaskConfig{} + dataConfig := migration_model.ClusterMigrationTaskConfig{} err = util.FromJSONBytes(buf, &dataConfig) if err != nil { log.Error(err) @@ -246,7 +249,7 @@ func (h *APIHandler) startDataMigration(w http.ResponseWriter, req *http.Request return } - writeLog(&obj, &task2.TaskResult{ + migration_util.WriteLog(&obj, &task2.TaskResult{ Success: true, }, "task status manually set to ready") @@ -311,7 +314,7 @@ func (h *APIHandler) stopDataMigrationTask(w http.ResponseWriter, req *http.Requ h.WriteError(w, err.Error(), http.StatusInternalServerError) return } - writeLog(&obj, &task2.TaskResult{ + migration_util.WriteLog(&obj, &task2.TaskResult{ Success: true, }, "task status manually set to pending stop") @@ -320,17 +323,6 @@ func (h *APIHandler) stopDataMigrationTask(w http.ResponseWriter, req *http.Requ }, 200) } -func getTaskConfig(task *task2.Task, config interface{}) error { - if task.Config_ == nil { - return util.FromJSONBytes([]byte(task.ConfigString), config) - } - buf, err := util.ToJSONBytes(task.Config_) - if err != nil { - return err - } - return util.FromJSONBytes(buf, config) -} - func getIndexRefreshInterval(indexNames []string, targetESClient elastic.API) (map[string]string, error) { const step = 50 var ( @@ -383,8 +375,8 @@ func (h *APIHandler) getIndexRefreshIntervals(w http.ResponseWriter, req *http.R }, http.StatusNotFound) return } - taskConfig := &ClusterMigrationTaskConfig{} - err = getTaskConfig(&obj, taskConfig) + taskConfig := &migration_model.ClusterMigrationTaskConfig{} + err = migration_util.GetTaskConfig(&obj, taskConfig) if err != nil { log.Error(err) h.WriteError(w, err.Error(), http.StatusInternalServerError) @@ -421,8 +413,8 @@ func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.R }, http.StatusNotFound) return } - taskConfig := &ClusterMigrationTaskConfig{} - err = getTaskConfig(&obj, taskConfig) + taskConfig := &migration_model.ClusterMigrationTaskConfig{} + err = migration_util.GetTaskConfig(&obj, taskConfig) if err != nil { log.Error(err) h.WriteError(w, err.Error(), http.StatusInternalServerError) @@ -471,7 +463,7 @@ func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.R obj.Metadata.Labels["completed_indices"] = completedIndices h.WriteJSON(w, obj, http.StatusOK) } -func getMajorTaskInfoByIndex(taskID string) (map[string]IndexStateInfo, error) { +func getMajorTaskInfoByIndex(taskID string) (map[string]migration_model.IndexStateInfo, error) { query := util.MapStr{ "size": 0, "aggs": util.MapStr{ @@ -529,13 +521,13 @@ func getMajorTaskInfoByIndex(taskID string) (map[string]IndexStateInfo, error) { if err != nil { return nil, err } - resBody := map[string]IndexStateInfo{} + resBody := map[string]migration_model.IndexStateInfo{} if taskAgg, ok := searchRes.Aggregations["group_by_task"]; ok { for _, bk := range taskAgg.Buckets { if key, ok := bk["key"].(string); ok { //resBody[key] = int(bk["doc_count"].(float64)) - resBody[key] = IndexStateInfo{} + resBody[key] = migration_model.IndexStateInfo{} if statusAgg, ok := bk["group_by_status"].(map[string]interface{}); ok { if sbks, ok := statusAgg["buckets"].([]interface{}); ok { for _, sbk := range sbks { @@ -565,7 +557,7 @@ func getMajorTaskInfoByIndex(taskID string) (map[string]IndexStateInfo, error) { return resBody, nil } -func getIndexTaskDocCount(ctx context.Context, index *IndexConfig, targetESClient elastic.API) (int64, error) { +func getIndexTaskDocCount(ctx context.Context, index *migration_model.IndexConfig, targetESClient elastic.API) (int64, error) { targetIndexName := index.Target.Name if targetIndexName == "" { if v, ok := index.IndexRename[index.Source.Name].(string); ok { @@ -741,8 +733,8 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt startTime = subTasks[0].StartTimeInMillis } for i, ptask := range subTasks { - cfg := IndexMigrationTaskConfig{} - err := getTaskConfig(&ptask, &cfg) + cfg := migration_model.IndexMigrationTaskConfig{} + err := migration_util.GetTaskConfig(&ptask, &cfg) if err != nil { log.Errorf("failed to get task config, err: %v", err) continue @@ -950,6 +942,11 @@ func (h *APIHandler) validateMultiType(w http.ResponseWriter, req *http.Request, }, http.StatusOK) } +type InitIndexRequest struct { + Mappings map[string]interface{} `json:"mappings"` + Settings map[string]interface{} `json:"settings"` +} + func (h *APIHandler) initIndex(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { targetClusterID := ps.MustGetParameter("id") indexName := ps.MustGetParameter("index") @@ -978,7 +975,7 @@ func (h *APIHandler) initIndex(w http.ResponseWriter, req *http.Request, ps http } if ml := len(reqBody.Mappings); ml > 0 { var ( - docType = "" + docType = "" mapping interface{} = reqBody.Mappings ) if ml == 1 { @@ -989,7 +986,7 @@ func (h *APIHandler) initIndex(w http.ResponseWriter, req *http.Request, ps http } } } - mappingBytes := util.MustToJSONBytes(mapping) + mappingBytes := util.MustToJSONBytes(mapping) _, err = client.UpdateMapping(indexName, docType, mappingBytes) if err != nil { log.Error(err) @@ -997,7 +994,7 @@ func (h *APIHandler) initIndex(w http.ResponseWriter, req *http.Request, ps http return } } - }else{ + } else { indexSettings := map[string]interface{}{} if len(reqBody.Settings) > 0 { indexSettings["settings"] = reqBody.Settings @@ -1063,7 +1060,7 @@ func (h *APIHandler) deleteDataMigrationTask(w http.ResponseWriter, req *http.Re }, }, } - err = orm.DeleteBy( &obj, util.MustToJSONBytes(q)) + err = orm.DeleteBy(&obj, util.MustToJSONBytes(q)) if err != nil { h.WriteError(w, err.Error(), http.StatusInternalServerError) log.Error(err) @@ -1076,7 +1073,7 @@ func (h *APIHandler) deleteDataMigrationTask(w http.ResponseWriter, req *http.Re }, 200) } -func getMajorTaskStatsFromInstances(majorTaskID string) (taskStats MajorTaskState, err error) { +func getMajorTaskStatsFromInstances(majorTaskID string) (taskStats migration_model.MajorTaskState, err error) { taskQuery := util.MapStr{ "size": 500, "query": util.MapStr{ @@ -1179,7 +1176,7 @@ func getMajorTaskStatsFromInstances(majorTaskID string) (taskStats MajorTaskStat return taskStats, nil } -func getMajorTaskByIndexFromES(majorTaskID string) (map[string]IndexStateInfo, error) { +func getMajorTaskByIndexFromES(majorTaskID string) (map[string]migration_model.IndexStateInfo, error) { taskQuery := util.MapStr{ "size": 500, "query": util.MapStr{ @@ -1229,7 +1226,7 @@ func getMajorTaskByIndexFromES(majorTaskID string) (map[string]IndexStateInfo, e } } } - state := map[string]IndexStateInfo{} + state := map[string]migration_model.IndexStateInfo{} for instID, taskIDs := range pipelineTaskIDs { inst := &model.Instance{} inst.ID = instID diff --git a/plugin/migration/model.go b/plugin/migration/model/model.go similarity index 93% rename from plugin/migration/model.go rename to plugin/migration/model/model.go index 1486e482..3d2c36c5 100644 --- a/plugin/migration/model.go +++ b/plugin/migration/model/model.go @@ -2,7 +2,7 @@ * Web: https://infinilabs.com * Email: hello#infini.ltd */ -package migration +package model import ( "fmt" @@ -95,16 +95,6 @@ type ClusterInfo struct { Distribution string `json:"distribution,omitempty"` } -type TaskCompleteState struct { - IsComplete bool - Error string - PipelineIds []string - RunningPhase int - TotalDocs int64 - SuccessDocs int64 - ScrolledDocs int64 -} - type MajorTaskState struct { ScrolledDocs float64 IndexDocs float64 @@ -116,11 +106,6 @@ type IndexStateInfo struct { IndexDocs float64 } -type InitIndexRequest struct { - Mappings map[string]interface{} `json:"mappings"` - Settings map[string]interface{} `json:"settings"` -} - type IndexMigrationTaskConfig struct { Source IndexMigrationSourceConfig `json:"source"` Target IndexMigrationTargetConfig `json:"target"` diff --git a/plugin/migration/model/processor.go b/plugin/migration/model/processor.go new file mode 100644 index 00000000..477abdb4 --- /dev/null +++ b/plugin/migration/model/processor.go @@ -0,0 +1,7 @@ +package model + +import "infini.sh/framework/core/task" + +type Processor interface { + Process(t *task.Task) (err error) +} diff --git a/plugin/migration/pipeline.go b/plugin/migration/pipeline.go index 5b4e39ba..2077cf7f 100644 --- a/plugin/migration/pipeline.go +++ b/plugin/migration/pipeline.go @@ -8,17 +8,18 @@ import ( "errors" "fmt" "math" - "strings" - "syscall" "time" log "github.com/cihub/seelog" "infini.sh/console/model" + migration_model "infini.sh/console/plugin/migration/model" + "infini.sh/console/plugin/migration/pipeline_task" + migration_util "infini.sh/console/plugin/migration/util" + "infini.sh/framework/core/config" "infini.sh/framework/core/elastic" "infini.sh/framework/core/env" - "infini.sh/framework/core/event" "infini.sh/framework/core/global" "infini.sh/framework/core/orm" "infini.sh/framework/core/pipeline" @@ -30,7 +31,10 @@ import ( type DispatcherProcessor struct { id string config *DispatcherConfig - state map[string]DispatcherState + + state map[string]DispatcherState + + pipelineTaskProcessor migration_model.Processor } type DispatcherConfig struct { @@ -93,6 +97,7 @@ func newMigrationDispatcherProcessor(c *config.Config) (pipeline.Processor, erro return nil, err } processor.state = state + processor.pipelineTaskProcessor = pipeline_task.NewProcessor(cfg.Elasticsearch, cfg.IndexName, cfg.LogIndexName) return &processor, nil } @@ -108,10 +113,10 @@ func (p *DispatcherProcessor) Process(ctx *pipeline.Context) error { } tasks, err := p.getMigrationTasks(p.config.TaskBatchSize) if err != nil { + log.Errorf("failed to get migration tasks, err: %v", err) return err } if len(tasks) == 0 { - log.Debug("got zero cluster migration task from es") return nil } for _, t := range tasks { @@ -119,11 +124,13 @@ func (p *DispatcherProcessor) Process(ctx *pipeline.Context) error { return nil } if t.Metadata.Labels == nil { - log.Errorf("got migration task with empty labels, skip handling: %v", t) + log.Errorf("got migration task [%s] with empty labels, skip handling", t.ID) continue } - if t.Metadata.Labels["business_id"] == "cluster_migration" { - //handle major task + log.Debugf("start handling task [%s] (type: %s, status: %s)", t.ID, t.Metadata.Type, t.Status) + switch t.Metadata.Type { + case "cluster_migration": + // handle major task switch t.Status { case task2.StatusReady: err = p.handleReadyMajorTask(&t) @@ -135,18 +142,30 @@ func (p *DispatcherProcessor) Process(ctx *pipeline.Context) error { if err != nil { log.Errorf("failed to handling major task [%s]: [%v]", t.ID, err) } - } else if t.Metadata.Labels["business_id"] == "index_migration" { - //handle sub migration task + case "index_migration": + // handle sub migration task switch t.Status { case task2.StatusReady: + // split sub task err = p.handleReadySubTask(&t) + case task2.StatusReady1: + // update pipeline tasks to ready + err = p.handleReady1SubTask(&t) case task2.StatusRunning: + // check pipeline tasks status err = p.handleRunningSubTask(&t) case task2.StatusPendingStop: + // mark pipeline tasks as pending_stop err = p.handlePendingStopSubTask(&t) - if err != nil { - log.Errorf("failed to handling sub task [%s]: [%v]", t.ID, err) - } + } + if err != nil { + log.Errorf("failed to handling sub task [%s]: [%v]", t.ID, err) + } + case "pipeline": + // handle pipeline task + err = p.pipelineTaskProcessor.Process(&t) + if err != nil { + log.Errorf("failed to handling pipeline task [%s]: [%v]", t.ID, err) } } if err != nil { @@ -162,12 +181,10 @@ func (p *DispatcherProcessor) Process(ctx *pipeline.Context) error { //es index refresh time.Sleep(time.Millisecond * 1200) } + return nil } func (p *DispatcherProcessor) handleReadyMajorTask(taskItem *task2.Task) error { - if taskItem.Metadata.Labels == nil { - return fmt.Errorf("got migration task with empty labels, skip handling: %v", taskItem) - } if taskItem.Metadata.Labels["is_split"] != true { err := p.splitMajorMigrationTask(taskItem) if err != nil { @@ -229,76 +246,18 @@ func (p *DispatcherProcessor) handleReadyMajorTask(taskItem *task2.Task) error { } func (p *DispatcherProcessor) handlePendingStopMajorTask(taskItem *task2.Task) error { - //update status of subtask to pending stop - query := util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "parent_id": util.MapStr{ - "value": taskItem.ID, - }, - }, - }, - { - "terms": util.MapStr{ - "status": []string{task2.StatusRunning, task2.StatusReady}, - }, - }, - { - "term": util.MapStr{ - "metadata.labels.business_id": util.MapStr{ - "value": "index_migration", - }, - }, - }, - }, - }, - } - queryDsl := util.MapStr{ - "query": query, - "script": util.MapStr{ - "source": fmt.Sprintf("ctx._source['status'] = '%s'", task2.StatusPendingStop), - }, - } - - err := orm.UpdateBy(taskItem, util.MustToJSONBytes(queryDsl)) + err := p.updatePendingChildTasksToPendingStop(taskItem, "index_migration") if err != nil { log.Errorf("failed to update sub task status, err: %v", err) return nil } - //check whether all pipeline task is stopped or not, then update task status - q := util.MapStr{ - "size": 200, - "query": util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "parent_id": util.MapStr{ - "value": taskItem.ID, - }, - }, - }, - { - "term": util.MapStr{ - "metadata.labels.business_id": "index_migration", - }, - }, - { - "terms": util.MapStr{ - "status": []string{task2.StatusRunning, task2.StatusPendingStop, task2.StatusReady}, - }, - }, - }, - }, - }, - } - tasks, err := p.getTasks(q) + + tasks, err := p.getPendingChildTasks(taskItem, "index_migration") if err != nil { log.Errorf("failed to get sub tasks, err: %v", err) return nil } + // all subtask stopped or error or complete if len(tasks) == 0 { taskItem.Status = task2.StatusStopped @@ -324,348 +283,288 @@ func (p *DispatcherProcessor) handleRunningMajorTask(taskItem *task2.Task) error } func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error { - state, err := p.getTaskCompleteState(taskItem) + cfg := migration_model.IndexMigrationTaskConfig{} + err := migration_util.GetTaskConfig(taskItem, &cfg) if err != nil { - return err + log.Errorf("failed to get task config, err: %v", err) + return fmt.Errorf("got wrong config of task [%s]", taskItem.ID) } - if state.IsComplete { - if taskItem.Metadata.Labels != nil { - taskItem.Metadata.Labels["index_docs"] = state.SuccessDocs - taskItem.Metadata.Labels["scrolled_docs"] = state.ScrolledDocs - } - if state.Error != "" && state.TotalDocs != state.SuccessDocs { - taskItem.Status = task2.StatusError - } else { - taskItem.Status = task2.StatusComplete - } + totalDocs := cfg.Source.DocCount + instanceID, _ := util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"]) - tn := time.Now() - taskItem.CompletedTime = &tn - p.saveTaskAndWriteLog(taskItem, "", &task2.TaskResult{ - Success: state.Error == "", - Error: state.Error, - }, fmt.Sprintf("task [%s] finished with status [%s]", taskItem.ID, taskItem.Status)) - p.cleanGatewayPipelines(taskItem, state.PipelineIds) - } else { - if state.RunningPhase == 1 && taskItem.Metadata.Labels["running_phase"] == float64(1) { - ptasks, err := p.getPipelineTasks(taskItem.ID) - if err != nil { - log.Errorf("failed to get pipeline tasks, err: %v", err) - return nil - } - var bulkTask *task2.Task - for i, t := range ptasks { - if t.Metadata.Labels != nil { - if t.Metadata.Labels["pipeline_id"] == "bulk_indexing" { - bulkTask = &ptasks[i] - } - } - } - if bulkTask == nil { - return fmt.Errorf("can not found bulk_indexing pipeline of sub task [%s]", taskItem.ID) - } - if bulkTask.Metadata.Labels != nil { - if instID, ok := bulkTask.Metadata.Labels["execution_instance_id"].(string); ok { - inst := &model.Instance{} - inst.ID = instID - _, err = orm.Get(inst) - if err != nil { - log.Errorf("failed to get instance, err: %v", err) - return err - } - err = inst.CreatePipeline([]byte(bulkTask.ConfigString)) - if err != nil { - log.Errorf("failed to create bulk_indexing pipeline, err: %v", err) - return err - } - taskItem.Metadata.Labels["running_phase"] = 2 - p.saveTaskAndWriteLog(taskItem, "wait_for", nil, fmt.Sprintf("task [%s] started phase 2", taskItem.ID)) - } - } - } + if totalDocs == 0 { + taskItem.Status = task2.StatusComplete + taskItem.Metadata.Labels["index_docs"] = 0 + taskItem.Metadata.Labels["scrolled_docs"] = 0 + now := time.Now() + taskItem.CompletedTime = &now + + p.saveTaskAndWriteLog(taskItem, "wait_for", &task2.TaskResult{ + Success: true, + }, "empty index migration completed") + p.decrInstanceJobs(instanceID) + return nil } + + scrollTask, bulkTask, err := p.getScrollBulkPipelineTasks(taskItem) + if err != nil { + log.Errorf("failed to get pipeline tasks, err: %v", err) + return nil + } + + if scrollTask == nil || bulkTask == nil { + return errors.New("scroll/bulk pipeline task missing") + } + // NOTE: old-version pipeline tasks has empty status + if scrollTask.Status == task2.StatusError || scrollTask.Status == "" { + return fmt.Errorf("scroll pipeline failed") + } + if bulkTask.Status == task2.StatusError || bulkTask.Status == "" { + return errors.New("bulk pipeline failed") + } + + // scroll not finished yet + if scrollTask.Status != task2.StatusComplete { + return nil + } + + scrolledDocs := migration_util.GetMapIntValue(util.MapStr(scrollTask.Metadata.Labels), "scrolled_docs") + if scrolledDocs != totalDocs { + return fmt.Errorf("scroll complete but docs count unmatch: %d / %d", scrolledDocs, totalDocs) + } + + // start bulk as needed + if bulkTask.Status == task2.StatusInit { + bulkTask.Status = task2.StatusReady + p.saveTaskAndWriteLog(bulkTask, "", &task2.TaskResult{ + Success: true, + }, fmt.Sprintf("scroll completed, bulk pipeline started")) + return nil + } + // bulk not finished yet + if bulkTask.Status != task2.StatusComplete { + return nil + } + + successDocs := migration_util.GetMapIntValue(util.MapStr(bulkTask.Metadata.Labels), "success_docs") + if successDocs != totalDocs { + return fmt.Errorf("bulk complete but docs count unmatch: %d / %d", successDocs, totalDocs) + } + + taskItem.Status = task2.StatusComplete + taskItem.Metadata.Labels["index_docs"] = successDocs + taskItem.Metadata.Labels["scrolled_docs"] = scrolledDocs + now := time.Now() + taskItem.CompletedTime = &now + + p.saveTaskAndWriteLog(taskItem, "wait_for", &task2.TaskResult{ + Success: true, + }, "index migration completed") + p.decrInstanceJobs(instanceID) + return nil } func (p *DispatcherProcessor) handlePendingStopSubTask(taskItem *task2.Task) error { - //check whether all pipeline task is stopped or not, then update task status - ptasks, err := p.getPipelineTasks(taskItem.ID) + err := p.updatePendingChildTasksToPendingStop(taskItem, "pipeline") if err != nil { - log.Errorf("failed to get pipeline tasks, err: %v", err) - return err - } - var taskIDs []string - for _, t := range ptasks { - taskIDs = append(taskIDs, t.ID) - } - esClient := elastic.GetClient(p.config.Elasticsearch) - q := util.MapStr{ - "size": len(taskIDs), - "sort": []util.MapStr{ - { - "payload.pipeline.logging.steps": util.MapStr{ - "order": "desc", - }, - }, - }, - "collapse": util.MapStr{ - "field": "metadata.labels.task_id", - }, - "query": util.MapStr{ - "terms": util.MapStr{ - "metadata.labels.task_id": taskIDs, - }, - }, - } - searchRes, err := esClient.SearchWithRawQueryDSL(p.config.LogIndexName, util.MustToJSONBytes(q)) - if err != nil { - log.Errorf("failed to get latest pipeline status, err: %v", err) + log.Errorf("failed to update sub task status, err: %v", err) return nil } -MainLoop: - for _, hit := range searchRes.Hits.Hits { - status, _ := util.MapStr(hit.Source).GetValue("payload.pipeline.logging.status") - if status != "STOPPED" { - //call instance api to stop scroll/bulk_indexing pipeline task - if instID, ok := taskItem.Metadata.Labels["execution_instance_id"].(string); ok { - inst := model.Instance{} - inst.ID = instID - _, err = orm.Get(&inst) - if err != nil { - log.Errorf("failed to get instance, err: %v", err) - return nil - } - hasStopped := true - for _, pipelineID := range taskIDs { - err = inst.StopPipelineWithTimeout(pipelineID, time.Second) - if err != nil { - if !errors.Is(err, syscall.ECONNREFUSED) && !strings.Contains(err.Error(), "task not found") { - hasStopped = false - break - } - log.Errorf("failed to stop pipeline, err: %v", err) - } - } - if hasStopped { - break MainLoop - } - } - return nil - } - } - taskItem.Status = task2.StatusStopped - p.saveTaskAndWriteLog(taskItem, "", nil, fmt.Sprintf("task [%s] stopped", taskItem.ID)) - p.cleanGatewayPipelines(taskItem, taskIDs) + tasks, err := p.getPendingChildTasks(taskItem, "pipeline") + if err != nil { + log.Errorf("failed to get sub tasks, err: %v", err) + return nil + } + + // all subtask stopped or error or complete + if len(tasks) == 0 { + taskItem.Status = task2.StatusStopped + p.sendMajorTaskNotification(taskItem) + p.saveTaskAndWriteLog(taskItem, "", nil, fmt.Sprintf("task [%s] stopped", taskItem.ID)) + } return nil } -func (p *DispatcherProcessor) cleanGatewayPipelines(taskItem *task2.Task, pipelineIDs []string) { - var err error - //delete pipeline and clear queue - instanceID, ok := taskItem.Metadata.Labels["execution_instance_id"].(string) - if !ok { - log.Debugf("task %s not scheduled, skip cleaning gateway stuffs", taskItem.ID) - return - } - - inst := model.Instance{} - inst.ID = instanceID - _, err = orm.Get(&inst) - if err != nil { - log.Errorf("failed to get instance, err: %v", err) - return - } - - for _, pipelineID := range pipelineIDs { - err = inst.DeletePipeline(pipelineID) - if err != nil { - log.Errorf("delete pipeline failed, err: %v", err) - } - selector := util.MapStr{ - "labels": util.MapStr{ - "migration_task_id": taskItem.ID, - }, - } - //clear queue - err = inst.DeleteQueueBySelector(selector) - if err != nil { - log.Errorf("failed to delete queue, err: %v", err) - } - } - if st, ok := p.state[instanceID]; ok { - st.Total -= 1 - p.state[instanceID] = st - } -} - func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error { - if taskItem.Metadata.Labels == nil { - return fmt.Errorf("empty labels") - } - var ( - scrollTask *task2.Task - bulkTask *task2.Task - ) if taskItem.Metadata.Labels["is_split"] == true { - //query split pipeline task - ptasks, err := p.getPipelineTasks(taskItem.ID) - if err != nil { - log.Errorf("getPipelineTasks failed, err: %+v", err) - return nil - } - for i, t := range ptasks { - ptasks[i].RetryTimes = taskItem.RetryTimes + 1 - if t.Metadata.Labels != nil { - if t.Metadata.Labels["pipeline_id"] == "es_scroll" { - scrollTask = &ptasks[i] - } else if t.Metadata.Labels["pipeline_id"] == "bulk_indexing" { - bulkTask = &ptasks[i] - } - } - } - if scrollTask == nil || bulkTask == nil { - return fmt.Errorf("es_scroll or bulk_indexing pipeline task not found") - } - taskItem.RetryTimes++ - } else { - //split task to scroll/bulk_indexing pipeline and then persistent - var pids []string - pids = append(pids, taskItem.ParentId...) - pids = append(pids, taskItem.ID) - scrollID := util.GetUUID() - cfg := IndexMigrationTaskConfig{} - err := getTaskConfig(taskItem, &cfg) - if err != nil { - return fmt.Errorf("got wrong config [%v] with task [%s], err: %v", taskItem.ConfigString, taskItem.ID, err) - } - sourceClusterID := cfg.Source.ClusterId - targetClusterID := cfg.Target.ClusterId - esConfig := elastic.GetConfig(sourceClusterID) - esTargetConfig := elastic.GetConfig(targetClusterID) - docType := common.GetClusterDocType(targetClusterID) - if len(taskItem.ParentId) == 0 { - return fmt.Errorf("got wrong parent id of task [%v]", *taskItem) - } - queryDsl := cfg.Source.QueryDSL - scrollQueryDsl := util.MustToJSON(util.MapStr{ - "query": queryDsl, - }) - indexName := cfg.Source.Indices - scrollTask = &task2.Task{ - ParentId: pids, - Runnable: true, - Cancellable: true, - Metadata: task2.Metadata{ - Type: "pipeline", - Labels: util.MapStr{ - "cluster_id": sourceClusterID, - "pipeline_id": "es_scroll", - "index_name": indexName, - "unique_index_name": taskItem.Metadata.Labels["unique_index_name"], - }, - }, - RetryTimes: taskItem.RetryTimes, - ConfigString: util.MustToJSON(PipelineTaskConfig{ - Name: scrollID, - Logging: PipelineTaskLoggingConfig{ - Enabled: true, - }, - Labels: util.MapStr{ - "parent_task_id": pids, - "unique_index_name": taskItem.Metadata.Labels["unique_index_name"], - }, - AutoStart: true, - KeepRunning: false, - Processor: []util.MapStr{ - { - "es_scroll": util.MapStr{ - "remove_type": docType == "", - "slice_size": cfg.Source.SliceSize, - "batch_size": cfg.Source.BatchSize, - "indices": indexName, - "elasticsearch": sourceClusterID, - "elasticsearch_config": util.MapStr{ - "name": sourceClusterID, - "enabled": true, - "endpoint": esConfig.Endpoint, - "basic_auth": esConfig.BasicAuth, - }, - "queue": util.MapStr{ - "name": scrollID, - "labels": util.MapStr{ - "migration_task_id": taskItem.ID, - }, - }, - "partition_size": 1, - "scroll_time": cfg.Source.ScrollTime, - "query_dsl": scrollQueryDsl, - "index_rename": cfg.Source.IndexRename, - "type_rename": cfg.Source.TypeRename, - }, - }, - }, - }), - } - scrollTask.ID = scrollID + taskItem.Status = task2.StatusReady1 + p.saveTaskAndWriteLog(taskItem, "", &task2.TaskResult{ + Success: true, + }, fmt.Sprintf("sub task [%s] splitted, skip to ready1", taskItem.ID)) + return nil + } - bulkID := util.GetUUID() - bulkTask = &task2.Task{ - ParentId: pids, - Runnable: true, - Cancellable: true, - Metadata: task2.Metadata{ - Type: "pipeline", - Labels: util.MapStr{ - "cluster_id": targetClusterID, - "pipeline_id": "bulk_indexing", - "index_name": indexName, - "unique_index_name": taskItem.Metadata.Labels["unique_index_name"], - }, + //split task to scroll/bulk_indexing pipeline and then persistent + var pids []string + pids = append(pids, taskItem.ParentId...) + pids = append(pids, taskItem.ID) + scrollID := util.GetUUID() + cfg := migration_model.IndexMigrationTaskConfig{} + err := migration_util.GetTaskConfig(taskItem, &cfg) + if err != nil { + return fmt.Errorf("got wrong config [%v] with task [%s], err: %v", taskItem.ConfigString, taskItem.ID, err) + } + sourceClusterID := cfg.Source.ClusterId + targetClusterID := cfg.Target.ClusterId + esConfig := elastic.GetConfig(sourceClusterID) + esTargetConfig := elastic.GetConfig(targetClusterID) + docType := common.GetClusterDocType(targetClusterID) + if len(taskItem.ParentId) == 0 { + return fmt.Errorf("got wrong parent id of task [%v]", *taskItem) + } + queryDsl := cfg.Source.QueryDSL + scrollQueryDsl := util.MustToJSON(util.MapStr{ + "query": queryDsl, + }) + indexName := cfg.Source.Indices + scrollTask := &task2.Task{ + ParentId: pids, + Runnable: true, + Cancellable: true, + Metadata: task2.Metadata{ + Type: "pipeline", + Labels: util.MapStr{ + "cluster_id": sourceClusterID, + "pipeline_id": "es_scroll", + "index_name": indexName, + "unique_index_name": taskItem.Metadata.Labels["unique_index_name"], }, - RetryTimes: taskItem.RetryTimes, - ConfigString: util.MustToJSON(PipelineTaskConfig{ - Name: bulkID, - Logging: PipelineTaskLoggingConfig{ - Enabled: true, - }, - Labels: util.MapStr{ - "parent_task_id": pids, - "unique_index_name": taskItem.Metadata.Labels["unique_index_name"], - }, - AutoStart: true, - KeepRunning: false, - Processor: []util.MapStr{ - { - "bulk_indexing": util.MapStr{ - "detect_active_queue": false, - "bulk": util.MapStr{ - "batch_size_in_mb": cfg.Target.Bulk.BatchSizeInMB, - "batch_size_in_docs": cfg.Target.Bulk.BatchSizeInDocs, - "invalid_queue": "bulk_indexing_400", - "compress": cfg.Target.Bulk.Compress, - }, - "max_worker_size": cfg.Target.Bulk.MaxWorkerSize, - "num_of_slices": cfg.Target.Bulk.SliceSize, - "idle_timeout_in_seconds": cfg.Target.Bulk.IdleTimeoutInSeconds, - "elasticsearch": targetClusterID, - "elasticsearch_config": util.MapStr{ - "name": targetClusterID, - "enabled": true, - "endpoint": esTargetConfig.Endpoint, - "basic_auth": esTargetConfig.BasicAuth, - }, - "queues": util.MapStr{ - "type": "scroll_docs", + }, + Status: task2.StatusInit, + RetryTimes: taskItem.RetryTimes, + ConfigString: util.MustToJSON(migration_model.PipelineTaskConfig{ + Name: scrollID, + Logging: migration_model.PipelineTaskLoggingConfig{ + Enabled: true, + }, + Labels: util.MapStr{ + "parent_task_id": pids, + "unique_index_name": taskItem.Metadata.Labels["unique_index_name"], + }, + AutoStart: true, + KeepRunning: false, + Processor: []util.MapStr{ + { + "es_scroll": util.MapStr{ + "remove_type": docType == "", + "slice_size": cfg.Source.SliceSize, + "batch_size": cfg.Source.BatchSize, + "indices": indexName, + "elasticsearch": sourceClusterID, + "elasticsearch_config": util.MapStr{ + "name": sourceClusterID, + "enabled": true, + "endpoint": esConfig.Endpoint, + "basic_auth": esConfig.BasicAuth, + }, + "queue": util.MapStr{ + "name": scrollID, + "labels": util.MapStr{ "migration_task_id": taskItem.ID, }, }, + "partition_size": 1, + "scroll_time": cfg.Source.ScrollTime, + "query_dsl": scrollQueryDsl, + "index_rename": cfg.Source.IndexRename, + "type_rename": cfg.Source.TypeRename, }, }, - }), - } - bulkTask.ID = bulkID + }, + }), } + scrollTask.ID = scrollID + + bulkID := util.GetUUID() + bulkTask := &task2.Task{ + ParentId: pids, + Runnable: true, + Cancellable: true, + Metadata: task2.Metadata{ + Type: "pipeline", + Labels: util.MapStr{ + "cluster_id": targetClusterID, + "pipeline_id": "bulk_indexing", + "index_name": indexName, + "unique_index_name": taskItem.Metadata.Labels["unique_index_name"], + }, + }, + Status: task2.StatusInit, + RetryTimes: taskItem.RetryTimes, + ConfigString: util.MustToJSON(migration_model.PipelineTaskConfig{ + Name: bulkID, + Logging: migration_model.PipelineTaskLoggingConfig{ + Enabled: true, + }, + Labels: util.MapStr{ + "parent_task_id": pids, + "unique_index_name": taskItem.Metadata.Labels["unique_index_name"], + }, + AutoStart: true, + KeepRunning: false, + Processor: []util.MapStr{ + { + "bulk_indexing": util.MapStr{ + "detect_active_queue": false, + "bulk": util.MapStr{ + "batch_size_in_mb": cfg.Target.Bulk.BatchSizeInMB, + "batch_size_in_docs": cfg.Target.Bulk.BatchSizeInDocs, + "invalid_queue": "bulk_indexing_400", + "compress": cfg.Target.Bulk.Compress, + }, + "max_worker_size": cfg.Target.Bulk.MaxWorkerSize, + "num_of_slices": cfg.Target.Bulk.SliceSize, + "idle_timeout_in_seconds": cfg.Target.Bulk.IdleTimeoutInSeconds, + "elasticsearch": targetClusterID, + "elasticsearch_config": util.MapStr{ + "name": targetClusterID, + "enabled": true, + "endpoint": esTargetConfig.Endpoint, + "basic_auth": esTargetConfig.BasicAuth, + }, + "queues": util.MapStr{ + "type": "scroll_docs", + "migration_task_id": taskItem.ID, + }, + }, + }, + }, + }), + } + bulkTask.ID = bulkID + + err = orm.Create(nil, scrollTask) + if err != nil { + return fmt.Errorf("create scroll pipeline task error: %w", err) + } + err = orm.Create(nil, bulkTask) + if err != nil { + return fmt.Errorf("create bulk_indexing pipeline task error: %w", err) + } + + taskItem.Metadata.Labels["is_split"] = true + taskItem.Status = task2.StatusReady1 + + p.saveTaskAndWriteLog(taskItem, "wait_for", &task2.TaskResult{ + Success: true, + }, fmt.Sprintf("task [%s] splitted", taskItem.ID)) + return nil +} + +func (p *DispatcherProcessor) handleReady1SubTask(taskItem *task2.Task) error { + scrollTask, bulkTask, err := p.getScrollBulkPipelineTasks(taskItem) + if err != nil { + log.Errorf("failed to get pipeline tasks, err: %v", err) + return nil + } + if scrollTask == nil || bulkTask == nil { + // ES might not synced yet + log.Warnf("task [%s] es_scroll or bulk_indexing pipeline task not found", taskItem.ID) + return nil + } + instance, err := p.getPreferenceInstance(taskItem.ParentId[0]) if err != nil { return fmt.Errorf("get preference intance error: %w", err) @@ -674,56 +573,25 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error { log.Infof("hit max tasks per instance with %d, skip dispatch", p.config.MaxTasksPerInstance) return nil } + + // update scroll task to ready scrollTask.Metadata.Labels["execution_instance_id"] = instance.ID - bulkTask.Metadata.Labels["execution_instance_id"] = instance.ID - - //try to clear queue when tasks are retried - if taskItem.RetryTimes > 0 { - selector := util.MapStr{ - "labels": util.MapStr{ - "migration_task_id": taskItem.ID, - }, - } - _ = instance.DeleteQueueBySelector(selector) - } - - //call instance api to create pipeline task - err = instance.CreatePipeline([]byte(scrollTask.ConfigString)) + scrollTask.Status = task2.StatusReady + err = orm.Update(nil, scrollTask) if err != nil { - log.Errorf("create scroll pipeline failed, err: %+v", err) - return err + return fmt.Errorf("update scroll pipeline task error: %w", err) } - //err = instance.CreatePipeline(util.MustToJSONBytes(bulkTask.Config)) - //if err != nil { - // return err - //} - //save task info - if taskItem.Metadata.Labels["is_split"] != true { - err = orm.Create(nil, scrollTask) - if err != nil { - return fmt.Errorf("create scroll pipeline task error: %w", err) - } - err = orm.Create(nil, bulkTask) - if err != nil { - return fmt.Errorf("create bulk_indexing pipeline task error: %w", err) - } - } else { - err = orm.Update(nil, scrollTask) - if err != nil { - return fmt.Errorf("update scroll pipeline task error: %w", err) - } - err = orm.Update(nil, bulkTask) - if err != nil { - return fmt.Errorf("update bulk_indexing pipeline task error: %w", err) - } + + // update bulk task to init + bulkTask.Metadata.Labels["execution_instance_id"] = instance.ID + bulkTask.Status = task2.StatusInit + err = orm.Update(nil, bulkTask) + if err != nil { + return fmt.Errorf("update bulk_indexing pipeline task error: %w", err) } - taskItem.Metadata.Labels["is_split"] = true - taskItem.Metadata.Labels["running_phase"] = 1 - //update dispatcher state - instanceState := p.state[instance.ID] - instanceState.Total = instanceState.Total + 1 - p.state[instance.ID] = instanceState - //update sub migration task status to ready and save task log + + // update sub migration task status to running and save task log + taskItem.RetryTimes++ taskItem.Metadata.Labels["execution_instance_id"] = instance.ID taskItem.Metadata.Labels["index_docs"] = 0 taskItem.Metadata.Labels["scrolled_docs"] = 0 @@ -733,14 +601,11 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error { p.saveTaskAndWriteLog(taskItem, "wait_for", &task2.TaskResult{ Success: true, }, fmt.Sprintf("task [%s] started", taskItem.ID)) + // update dispatcher state + p.incrInstanceJobs(instance.ID) return nil } -func getMapValue(m util.MapStr, key string) interface{} { - v, _ := m.GetValue(key) - return v -} - func (p *DispatcherProcessor) getPreferenceInstance(majorTaskID string) (instance model.Instance, err error) { majorTask := task2.Task{} majorTask.ID = majorTaskID @@ -749,8 +614,8 @@ func (p *DispatcherProcessor) getPreferenceInstance(majorTaskID string) (instanc log.Errorf("failed to get major task, err: %v", err) return } - cfg := ClusterMigrationTaskConfig{} - err = getTaskConfig(&majorTask, &cfg) + cfg := migration_model.ClusterMigrationTaskConfig{} + err = migration_util.GetTaskConfig(&majorTask, &cfg) if err != nil { log.Errorf("failed to get task config, err: %v", err) return @@ -787,40 +652,8 @@ func (p *DispatcherProcessor) getPreferenceInstance(majorTaskID string) (instanc _, err = orm.Get(&instance) return } -func (p *DispatcherProcessor) getMigrationTasks(size int) ([]task2.Task, error) { - majorTaskQ := util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "metadata.labels.business_id": "cluster_migration", - }, - }, - { - "terms": util.MapStr{ - "status": []string{task2.StatusReady, task2.StatusRunning, task2.StatusPendingStop}, - }, - }, - }, - }, - } - subTaskQ := util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "metadata.labels.business_id": "index_migration", - }, - }, - { - "terms": util.MapStr{ - "status": []string{task2.StatusReady, task2.StatusRunning, task2.StatusPendingStop}, - }, - }, - }, - }, - } +func (p *DispatcherProcessor) getMigrationTasks(size int) ([]task2.Task, error) { queryDsl := util.MapStr{ "size": size, "sort": []util.MapStr{ @@ -832,10 +665,13 @@ func (p *DispatcherProcessor) getMigrationTasks(size int) ([]task2.Task, error) }, "query": util.MapStr{ "bool": util.MapStr{ - "should": []util.MapStr{ - majorTaskQ, subTaskQ, + "must": []util.MapStr{ + { + "terms": util.MapStr{ + "status": []string{task2.StatusReady, task2.StatusReady1, task2.StatusRunning, task2.StatusPendingStop}, + }, + }, }, - "minimum_should_match": 1, }, }, } @@ -849,41 +685,11 @@ func (p *DispatcherProcessor) saveTaskAndWriteLog(taskItem *task2.Task, refresh log.Errorf("failed to update task, err: %v", err) } if message != "" { - writeLog(taskItem, taskResult, message) + migration_util.WriteLog(taskItem, taskResult, message) } } -func writeLog(taskItem *task2.Task, taskResult *task2.TaskResult, message string) { - labels := util.MapStr{} - labels.Update(util.MapStr(taskItem.Metadata.Labels)) - labels["task_type"] = taskItem.Metadata.Type - labels["task_id"] = taskItem.ID - labels["parent_task_id"] = taskItem.ParentId - labels["retry_no"] = taskItem.RetryTimes - event.SaveLog(event.Event{ - Metadata: event.EventMetadata{ - Category: "task", - Name: "logging", - Datatype: "event", - Labels: labels, - }, - Fields: util.MapStr{ - "task": util.MapStr{ - "logging": util.MapStr{ - "config": taskItem.ConfigString, - "status": taskItem.Status, - "message": message, - "result": taskResult, - }, - }, - }, - }) -} - func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) error { - if taskItem.Metadata.Labels == nil { - return fmt.Errorf("empty metadata labels, unexpected cluster migration task: %s", util.MustToJSON(taskItem)) - } if taskItem.Metadata.Labels["is_split"] == true { return nil } @@ -892,8 +698,8 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro return nil } - clusterMigrationTask := ClusterMigrationTaskConfig{} - err := getTaskConfig(taskItem, &clusterMigrationTask) + clusterMigrationTask := migration_model.ClusterMigrationTaskConfig{} + err := migration_util.GetTaskConfig(taskItem, &clusterMigrationTask) if err != nil { log.Errorf("failed to get task config, err: %v", err) return err @@ -905,7 +711,7 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro targetType := common.GetClusterDocType(clusterMigrationTask.Cluster.Target.Id) for _, index := range clusterMigrationTask.Indices { - source := IndexMigrationSourceConfig{ + source := migration_model.IndexMigrationSourceConfig{ ClusterId: clusterMigrationTask.Cluster.Source.Id, Indices: index.Source.Name, SliceSize: clusterMigrationTask.Settings.Scroll.SliceSize, @@ -969,9 +775,9 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro }) } - target := IndexMigrationTargetConfig{ + target := migration_model.IndexMigrationTargetConfig{ ClusterId: clusterMigrationTask.Cluster.Target.Id, - Bulk: IndexMigrationBulkConfig{ + Bulk: migration_model.IndexMigrationBulkConfig{ BatchSizeInMB: clusterMigrationTask.Settings.Bulk.StoreSizeInMB, BatchSizeInDocs: clusterMigrationTask.Settings.Bulk.Docs, MaxWorkerSize: clusterMigrationTask.Settings.Bulk.MaxWorkerSize, @@ -1031,7 +837,7 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro "unique_index_name": index.Source.GetUniqueIndexName(), }, }, - ConfigString: util.MustToJSON(IndexMigrationTaskConfig{ + ConfigString: util.MustToJSON(migration_model.IndexMigrationTaskConfig{ Source: partitionSource, Target: target, Execution: clusterMigrationTask.Settings.Execution, @@ -1047,7 +853,7 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro } else { source.DocCount = index.Source.Docs - indexParameters := IndexMigrationTaskConfig{ + indexParameters := migration_model.IndexMigrationTaskConfig{ Source: source, Target: target, } @@ -1095,6 +901,13 @@ func (p *DispatcherProcessor) getPipelineTasks(subTaskID string) ([]task2.Task, }, }, }, + { + "term": util.MapStr{ + "metadata.type": util.MapStr{ + "value": "pipeline", + }, + }, + }, }, }, }, @@ -1130,155 +943,7 @@ func (p *DispatcherProcessor) getTasks(query interface{}) ([]task2.Task, error) return migrationTasks, nil } -func (p *DispatcherProcessor) getTaskCompleteState(subTask *task2.Task) (*TaskCompleteState, error) { - ptasks, err := p.getPipelineTasks(subTask.ID) - if err != nil { - log.Errorf("failed to get pipeline tasks, err: %v", err) - return nil, err - } - var pids []string - for _, t := range ptasks { - pids = append(pids, t.ID) - } - - if len(pids) == 0 { - return nil, fmt.Errorf("pipeline task not found") - } - query := util.MapStr{ - "sort": []util.MapStr{ - { - "timestamp": util.MapStr{ - "order": "desc", - }, - }, - { - "payload.pipeline.logging.steps": util.MapStr{ - "order": "desc", - }, - }, - }, - "collapse": util.MapStr{ - "field": "metadata.labels.task_id", - }, - "query": util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "terms": util.MapStr{ - "metadata.labels.task_id": pids, - }, - }, - { - "range": util.MapStr{ - "timestamp": util.MapStr{ - "gt": subTask.StartTimeInMillis - 30*1000, - }, - }, - }, - }, - }, - }, - } - esClient := elastic.GetClient(p.config.Elasticsearch) - res, err := esClient.SearchWithRawQueryDSL(p.config.LogIndexName, util.MustToJSONBytes(query)) - if err != nil { - log.Errorf("search task log from es failed, err: %v", err) - return nil, err - } - cfg := IndexMigrationTaskConfig{} - err = getTaskConfig(subTask, &cfg) - if err != nil { - log.Errorf("failed to get task config, err: %v", err) - return nil, fmt.Errorf("got wrong config of task %v", *subTask) - } - totalDocs := cfg.Source.DocCount - - var ( - indexDocs int64 - successDocs int64 - scrolledDocs int64 - state TaskCompleteState - ) - state.TotalDocs = totalDocs - state.PipelineIds = pids - var bulked, scrolled bool - for _, hit := range res.Hits.Hits { - if bulked && scrolled { - break - } - resultErr, _ := util.MapStr(hit.Source).GetValue("payload.pipeline.logging.result.error") - if errStr, ok := resultErr.(string); ok && errStr != "" { - state.Error = errStr - state.IsComplete = true - } - if !bulked { - for _, key := range []string{"payload.pipeline.logging.context.bulk_indexing.success.count", "payload.pipeline.logging.context.bulk_indexing.failure.count", "payload.pipeline.logging.context.bulk_indexing.invalid.count"} { - v, err := util.MapStr(hit.Source).GetValue(key) - if err == nil { - bulked = true - if fv, err := util.ExtractInt(v); err == nil { - indexDocs += fv - if key == "payload.pipeline.logging.context.bulk_indexing.success.count" { - successDocs = fv - state.SuccessDocs = successDocs - } - } else { - log.Errorf("got %s but failed to extract, err: %v", key, err) - } - } - } - } - - if !scrolled { - v, err := util.MapStr(hit.Source).GetValue("payload.pipeline.logging.context.es_scroll.scrolled_docs") - if err == nil { - scrolled = true - if vv, err := util.ExtractInt(v); err == nil { - scrolledDocs = vv - state.ScrolledDocs = vv - } else { - log.Errorf("got payload.pipeline.logging.context.es_scroll.scrolled_docs but failed to extract, err: %v", err) - } - } - } - } - if totalDocs == scrolledDocs { - state.RunningPhase = 1 - } - if (totalDocs == indexDocs || successDocs == totalDocs) && totalDocs == scrolledDocs { - if successDocs != totalDocs { - if state.Error == "" { - if successDocs > 0 { - state.Error = "partial complete" - } else { - state.Error = "invalid request" - } - } - } - state.IsComplete = true - return &state, nil - } - //check instance is available - if subTask.Metadata.Labels != nil { - if instID, ok := subTask.Metadata.Labels["execution_instance_id"].(string); ok { - inst := model.Instance{} - inst.ID = instID - _, err = orm.Get(&inst) - if err != nil { - log.Errorf("get instance failed, err: %v", err) - return nil, err - } - err = inst.TryConnectWithTimeout(time.Second * 3) - if err != nil && errors.Is(err, syscall.ECONNREFUSED) { - state.Error = fmt.Errorf("instance [%s] is unavailable: %w", instID, err).Error() - state.IsComplete = true - } - } - } - return &state, nil -} - -func (p *DispatcherProcessor) getMajorTaskState(majorTask *task2.Task) (taskState MajorTaskState, err error) { +func (p *DispatcherProcessor) getMajorTaskState(majorTask *task2.Task) (taskState migration_model.MajorTaskState, err error) { query := util.MapStr{ "size": 0, "aggs": util.MapStr{ @@ -1305,7 +970,7 @@ func (p *DispatcherProcessor) getMajorTaskState(majorTask *task2.Task) (taskStat }, { "term": util.MapStr{ - "metadata.labels.business_id": util.MapStr{ + "metadata.type": util.MapStr{ "value": "index_migration", }, }, @@ -1323,23 +988,18 @@ func (p *DispatcherProcessor) getMajorTaskState(majorTask *task2.Task) (taskStat if v, ok := res.Aggregations["total_docs"].Value.(float64); ok { taskState.IndexDocs = v } - var ( - hasError bool - ) for _, bk := range res.Aggregations["grp"].Buckets { - if bk["key"] == task2.StatusReady || bk["key"] == task2.StatusRunning { + status, _ := util.ExtractString(bk["key"]) + if migration_util.IsRunningState(status) { taskState.Status = task2.StatusRunning return taskState, nil } - if bk["key"] == task2.StatusError { - hasError = true + if status == task2.StatusError { + taskState.Status = task2.StatusError + return taskState, nil } } - if hasError { - taskState.Status = task2.StatusError - } else { - taskState.Status = task2.StatusComplete - } + taskState.Status = task2.StatusComplete return taskState, nil } @@ -1395,8 +1055,8 @@ func (p *DispatcherProcessor) getInstanceTaskState() (map[string]DispatcherState } func (p *DispatcherProcessor) sendMajorTaskNotification(taskItem *task2.Task) { - config := ClusterMigrationTaskConfig{} - err := getTaskConfig(taskItem, &config) + config := migration_model.ClusterMigrationTaskConfig{} + err := migration_util.GetTaskConfig(taskItem, &config) if err != nil { log.Errorf("failed to parse config info from major task, id: %s, err: %v", taskItem.ID, err) return @@ -1439,3 +1099,105 @@ func (p *DispatcherProcessor) sendMajorTaskNotification(taskItem *task2.Task) { } return } + +// update status of subtask to pending stop +func (p *DispatcherProcessor) updatePendingChildTasksToPendingStop(taskItem *task2.Task, taskType string) error { + query := util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "parent_id": util.MapStr{ + "value": taskItem.ID, + }, + }, + }, + { + "terms": util.MapStr{ + "status": []string{task2.StatusRunning, task2.StatusReady, task2.StatusReady1}, + }, + }, + { + "term": util.MapStr{ + "metadata.type": util.MapStr{ + "value": taskType, + }, + }, + }, + }, + }, + } + queryDsl := util.MapStr{ + "query": query, + "script": util.MapStr{ + "source": fmt.Sprintf("ctx._source['status'] = '%s'", task2.StatusPendingStop), + }, + } + + err := orm.UpdateBy(taskItem, util.MustToJSONBytes(queryDsl)) + if err != nil { + return err + } + return nil +} + +func (p *DispatcherProcessor) getPendingChildTasks(taskItem *task2.Task, taskType string) ([]task2.Task, error) { + + //check whether all pipeline task is stopped or not, then update task status + q := util.MapStr{ + "size": 200, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "parent_id": util.MapStr{ + "value": taskItem.ID, + }, + }, + }, + { + "term": util.MapStr{ + "metadata.type": taskType, + }, + }, + { + "terms": util.MapStr{ + "status": []string{task2.StatusRunning, task2.StatusPendingStop, task2.StatusReady, task2.StatusReady1}, + }, + }, + }, + }, + }, + } + return p.getTasks(q) +} + +func (p *DispatcherProcessor) getScrollBulkPipelineTasks(taskItem *task2.Task) (scrollTask *task2.Task, bulkTask *task2.Task, err error) { + ptasks, err := p.getPipelineTasks(taskItem.ID) + if err != nil { + log.Errorf("failed to get pipeline tasks, err: %v", err) + return + } + for i, ptask := range ptasks { + if ptask.Metadata.Labels["pipeline_id"] == "bulk_indexing" { + bulkTask = &ptasks[i] + } else if ptask.Metadata.Labels["pipeline_id"] == "es_scroll" { + scrollTask = &ptasks[i] + } + } + return +} + +func (p *DispatcherProcessor) decrInstanceJobs(instanceID string) { + if st, ok := p.state[instanceID]; ok { + st.Total -= 1 + p.state[instanceID] = st + } +} + +func (p *DispatcherProcessor) incrInstanceJobs(instanceID string) { + instanceState := p.state[instanceID] + instanceState.Total = instanceState.Total + 1 + p.state[instanceID] = instanceState +} diff --git a/plugin/migration/pipeline_task/pipeline_task.go b/plugin/migration/pipeline_task/pipeline_task.go new file mode 100644 index 00000000..a7143198 --- /dev/null +++ b/plugin/migration/pipeline_task/pipeline_task.go @@ -0,0 +1,402 @@ +package pipeline_task + +import ( + "errors" + "fmt" + "time" + + log "github.com/cihub/seelog" + + "infini.sh/console/model" + migration_model "infini.sh/console/plugin/migration/model" + migration_util "infini.sh/console/plugin/migration/util" + + "infini.sh/framework/core/elastic" + "infini.sh/framework/core/orm" + "infini.sh/framework/core/task" + "infini.sh/framework/core/util" +) + +type processor struct { + Elasticsearch string + IndexName string + LogIndexName string +} + +func NewProcessor(elasticsearch, indexName, logIndexName string) migration_model.Processor { + return &processor{ + Elasticsearch: elasticsearch, + IndexName: indexName, + LogIndexName: logIndexName, + } +} + +func (p *processor) Process(t *task.Task) (err error) { + switch t.Status { + case task.StatusReady: + // schedule pipeline task & create pipeline + err = p.handleReadyPipelineTask(t) + case task.StatusRunning: + // check pipeline log + err = p.handleRunningPipelineTask(t) + case task.StatusPendingStop: + // stop pipeline + err = p.handlePendingStopPipelineTask(t) + } + return err +} + +func (p *processor) handleReadyPipelineTask(taskItem *task.Task) error { + cleanPipeline, cleanQueue := true, false + + switch taskItem.Metadata.Labels["pipeline_id"] { + case "es_scroll": + // try to clear queue before running es_scroll + cleanQueue = true + case "bulk_indexing": + default: + return fmt.Errorf("task [%s] has unknown pipeline_id [%s]", taskItem.ID, taskItem.Metadata.Labels["pipeline_id"]) + } + + instance, err := p.cleanGatewayPipeline(taskItem, cleanPipeline, cleanQueue) + if err != nil { + log.Errorf("failed to prepare instance before running pipeline, err: %v", err) + return nil + } + + taskItem.RetryTimes++ + + cfg := migration_model.PipelineTaskConfig{} + err = migration_util.GetTaskConfig(taskItem, &cfg) + if err != nil { + log.Errorf("failed to get task config, err: %v", err) + return err + } + + cfg.Labels["retry_times"] = taskItem.RetryTimes + + // call instance api to create pipeline task + err = instance.CreatePipeline(util.MustToJSONBytes(cfg)) + if err != nil { + log.Errorf("create pipeline task [%s] failed, err: %+v", taskItem.ID, err) + return err + } + + taskItem.Status = task.StatusRunning + taskItem.StartTimeInMillis = time.Now().UnixMilli() + p.saveTaskAndWriteLog(taskItem, "wait_for", &task.TaskResult{ + Success: true, + }, fmt.Sprintf("pipeline task [%s] started", taskItem.ID)) + + return nil +} + +func (p *processor) handleRunningPipelineTask(taskItem *task.Task) error { + switch taskItem.Metadata.Labels["pipeline_id"] { + case "es_scroll": + return p.handleRunningEsScrollPipelineTask(taskItem) + case "bulk_indexing": + return p.handleRunningBulkIndexingPipelineTask(taskItem) + default: + return fmt.Errorf("task [%s] has unknown pipeline_id [%s]", taskItem.ID, taskItem.Metadata.Labels["pipeline_id"]) + } + return nil +} + +func (p *processor) handleRunningEsScrollPipelineTask(taskItem *task.Task) error { + scrolledDocs, totalHits, scrolled, err := p.getEsScrollTaskState(taskItem) + + if !scrolled { + return nil + } + + var errMsg string + if err != nil { + errMsg = err.Error() + } + if errMsg == "" { + if scrolledDocs < totalHits { + errMsg = fmt.Sprintf("scrolled finished but docs count unmatch: %d / %d", scrolledDocs, totalHits) + } + } + + now := time.Now() + taskItem.CompletedTime = &now + taskItem.Metadata.Labels["scrolled_docs"] = scrolledDocs + if errMsg != "" { + taskItem.Status = task.StatusError + } else { + taskItem.Status = task.StatusComplete + } + + p.saveTaskAndWriteLog(taskItem, "", &task.TaskResult{ + Success: errMsg == "", + Error: errMsg, + }, fmt.Sprintf("pipeline task [%s] completed", taskItem.ID)) + // clean queue if scroll failed + p.cleanGatewayPipeline(taskItem, true, taskItem.Status == task.StatusError) + return nil +} + +func (p *processor) handleRunningBulkIndexingPipelineTask(taskItem *task.Task) error { + successDocs, indexDocs, bulked, err := p.getBulkIndexingTaskState(taskItem) + if !bulked { + return nil + } + + var errMsg string + if err != nil { + errMsg = err.Error() + } + // TODO: handle multiple run bulk_indexing pipeline tasks and total_docs from index_migration + now := time.Now() + taskItem.CompletedTime = &now + taskItem.Metadata.Labels["index_docs"] = indexDocs + taskItem.Metadata.Labels["success_docs"] = successDocs + if errMsg != "" { + taskItem.Status = task.StatusError + } else { + taskItem.Status = task.StatusComplete + } + + p.saveTaskAndWriteLog(taskItem, "", &task.TaskResult{ + Success: errMsg == "", + Error: errMsg, + }, fmt.Sprintf("pipeline task [%s] completed", taskItem.ID)) + // clean queue if bulk completed + p.cleanGatewayPipeline(taskItem, true, taskItem.Status == task.StatusComplete) + return nil +} + +func (p *processor) handlePendingStopPipelineTask(taskItem *task.Task) error { + switch taskItem.Metadata.Labels["pipeline_id"] { + case "es_scroll": + case "bulk_indexing": + default: + return fmt.Errorf("task [%s] has unknown pipeline_id [%s]", taskItem.ID, taskItem.Metadata.Labels["pipeline_id"]) + } + + hits, err := p.getPipelineLogs(taskItem, []string{"STOPPED"}) + if err != nil { + log.Errorf("failed to get pipeline logs for task [%s], err: %v", taskItem.ID, err) + return nil + } + + stopped := len(hits) > 0 + + if stopped { + taskItem.Status = task.StatusStopped + p.saveTaskAndWriteLog(taskItem, "", nil, fmt.Sprintf("task [%s] stopped", taskItem.ID)) + // clean all stuffs if manually stopped + p.cleanGatewayPipeline(taskItem, true, true) + return nil + } + + _, instance, err := p.getPipelineExecutionInstance(taskItem) + if err != nil { + log.Errorf("failed to get execution instance for task [%s], err: %v", taskItem.ID, err) + return nil + } + + err = instance.StopPipelineWithTimeout(taskItem.ID, time.Second) + if err != nil { + log.Errorf("failed to stop pipeline, err: %v", err) + } + return nil +} + +func (p *processor) cleanGatewayPipeline(taskItem *task.Task, pipeline, queue bool) (instance model.Instance, err error) { + parentTask, instance, err := p.getPipelineExecutionInstance(taskItem) + if err != nil { + return + } + if pipeline { + err = instance.DeletePipeline(taskItem.ID) + if err != nil { + log.Errorf("delete pipeline failed, err: %v", err) + } + } + + if queue { + selector := util.MapStr{ + "labels": util.MapStr{ + "migration_task_id": parentTask.ID, + }, + } + err = instance.DeleteQueueBySelector(selector) + if err != nil { + log.Errorf("failed to delete queue, err: %v", err) + } + } + return instance, nil +} + +func (p *processor) getPipelineExecutionInstance(taskItem *task.Task) (parentTask *task.Task, instance model.Instance, err error) { + parentTask, err = p.getParentTask(taskItem, "index_migration") + if err != nil { + return + } + // Use sub task's execution instance + instanceID := parentTask.Metadata.Labels["execution_instance_id"] + instance.ID, _ = util.ExtractString(instanceID) + _, err = orm.Get(&instance) + if err != nil { + log.Errorf("failed to get instance, err: %v", err) + return + } + return +} + +func (p *processor) getParentTask(taskItem *task.Task, taskType string) (*task.Task, error) { + queryDsl := util.MapStr{ + "size": 1, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "terms": util.MapStr{ + "_id": taskItem.ParentId, + }, + }, + { + "term": util.MapStr{ + "metadata.type": taskType, + }, + }, + }, + }, + }, + } + + esClient := elastic.GetClient(p.Elasticsearch) + res, err := esClient.SearchWithRawQueryDSL(p.IndexName, util.MustToJSONBytes(queryDsl)) + if err != nil { + log.Errorf("query tasks from es failed, err: %v", err) + return nil, err + } + if res.GetTotal() == 0 { + return nil, errors.New("no parent task found") + } + for _, hit := range res.Hits.Hits { + buf, err := util.ToJSONBytes(hit.Source) + if err != nil { + log.Errorf("marshal task json failed, err: %v", err) + return nil, err + } + tk := task.Task{} + err = util.FromJSONBytes(buf, &tk) + if err != nil { + log.Errorf("unmarshal task json failed, err: %v", err) + return nil, err + } + return &tk, nil + } + return nil, errors.New("not reachable") +} + +func (p *processor) getEsScrollTaskState(taskItem *task.Task) (scrolledDocs int64, totalHits int64, scrolled bool, err error) { + hits, err := p.getPipelineLogs(taskItem, []string{"FINISHED", "FAILED"}) + if err != nil { + log.Errorf("failed to get pipeline logs for task [%s], err: %v", taskItem.ID, err) + err = nil + return + } + for _, hit := range hits { + scrolled = true + resultErr, _ := util.MapStr(hit.Source).GetValue("payload.pipeline.logging.result.error") + if errStr, ok := resultErr.(string); ok && errStr != "" { + err = errors.New(errStr) + return + } + m := util.MapStr(hit.Source) + scroll, total := migration_util.GetMapIntValue(m, "payload.pipeline.logging.context.es_scroll.scrolled_docs"), migration_util.GetMapIntValue(m, "payload.pipeline.logging.context.es_scroll.total_hits") + + scrolledDocs += scroll + totalHits += total + } + return +} + +func (p *processor) getBulkIndexingTaskState(taskItem *task.Task) (successDocs int64, indexDocs int64, bulked bool, err error) { + hits, err := p.getPipelineLogs(taskItem, []string{"FINISHED", "FAILED"}) + if err != nil { + log.Errorf("failed to get pipeline logs for task [%s], err: %v", taskItem.ID, err) + err = nil + return + } + for _, hit := range hits { + bulked = true + resultErr, _ := util.MapStr(hit.Source).GetValue("payload.pipeline.logging.result.error") + if errStr, ok := resultErr.(string); ok && errStr != "" { + err = errors.New(errStr) + return + } + + m := util.MapStr(hit.Source) + success, failure, invalid := migration_util.GetMapIntValue(m, "payload.pipeline.logging.context.bulk_indexing.success.count"), migration_util.GetMapIntValue(m, "payload.pipeline.logging.context.bulk_indexing.failure.count"), migration_util.GetMapIntValue(m, "payload.pipeline.logging.context.bulk_indexing.invalid.count") + successDocs += success + indexDocs += success + invalid + failure + } + return +} + +func (p *processor) getPipelineLogs(taskItem *task.Task, status []string) ([]elastic.IndexDocument, error) { + query := util.MapStr{ + "sort": []util.MapStr{ + { + "timestamp": util.MapStr{ + "order": "desc", + }, + }, + { + "payload.pipeline.logging.steps": util.MapStr{ + "order": "desc", + }, + }, + }, + "collapse": util.MapStr{ + "field": "metadata.labels.task_id", + }, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "metadata.labels.task_id": taskItem.ID, + }, + }, + { + "terms": util.MapStr{ + "payload.pipeline.logging.status": status, + }, + }, + { + "range": util.MapStr{ + "metadata.labels.retry_times": util.MapStr{ + "gte": taskItem.RetryTimes, + }, + }, + }, + }, + }, + }, + } + esClient := elastic.GetClient(p.Elasticsearch) + res, err := esClient.SearchWithRawQueryDSL(p.LogIndexName, util.MustToJSONBytes(query)) + if err != nil { + log.Errorf("search task log from es failed, err: %v", err) + return nil, err + } + return res.Hits.Hits, nil +} + +func (p *processor) saveTaskAndWriteLog(taskItem *task.Task, refresh string, taskResult *task.TaskResult, message string) { + esClient := elastic.GetClient(p.Elasticsearch) + _, err := esClient.Index(p.IndexName, "", taskItem.ID, taskItem, refresh) + if err != nil { + log.Errorf("failed to update task, err: %v", err) + } + if message != "" { + migration_util.WriteLog(taskItem, taskResult, message) + } +} diff --git a/plugin/migration/util/util.go b/plugin/migration/util/util.go new file mode 100644 index 00000000..386d2c6f --- /dev/null +++ b/plugin/migration/util/util.go @@ -0,0 +1,66 @@ +package util + +import ( + log "github.com/cihub/seelog" + + "infini.sh/framework/core/event" + "infini.sh/framework/core/task" + "infini.sh/framework/core/util" +) + +func WriteLog(taskItem *task.Task, taskResult *task.TaskResult, message string) { + labels := util.MapStr{} + labels.Update(util.MapStr(taskItem.Metadata.Labels)) + labels["task_type"] = taskItem.Metadata.Type + labels["task_id"] = taskItem.ID + labels["parent_task_id"] = taskItem.ParentId + labels["retry_no"] = taskItem.RetryTimes + event.SaveLog(event.Event{ + Metadata: event.EventMetadata{ + Category: "task", + Name: "logging", + Datatype: "event", + Labels: labels, + }, + Fields: util.MapStr{ + "task": util.MapStr{ + "logging": util.MapStr{ + "config": taskItem.ConfigString, + "status": taskItem.Status, + "message": message, + "result": taskResult, + }, + }, + }, + }) +} + +var runningTaskStatus = []string{task.StatusRunning, task.StatusReady, task.StatusReady1} + +func IsRunningState(status string) bool { + return util.StringInArray(runningTaskStatus, status) +} + +func GetTaskConfig(task *task.Task, config interface{}) error { + if task.Config_ == nil { + return util.FromJSONBytes([]byte(task.ConfigString), config) + } + buf, err := util.ToJSONBytes(task.Config_) + if err != nil { + return err + } + return util.FromJSONBytes(buf, config) +} + +func GetMapIntValue(m util.MapStr, key string) int64 { + v, err := m.GetValue(key) + if err != nil { + return 0 + } + vv, err := util.ExtractInt(v) + if err != nil { + log.Errorf("got %s but failed to extract, err: %v", key, err) + return 0 + } + return vv +}