From 69cf1d67a93fa6386b74bc19c7025f1fc6945de6 Mon Sep 17 00:00:00 2001 From: sunjiacheng Date: Tue, 28 Mar 2023 17:54:35 +0800 Subject: [PATCH] migration_v2 (#38) bulk write after docs was scrolled complete update default config with migration dispatcher add bulk parameter idle_timeout_in_seconds, slice_size clear queue before creating pipeline check instance available when task state is running add check_instance_available config for migration dispatcher get migration task progress info by index_name_unique(index_name+doctype) get instance list only by gateway get migration task progress info from es and instance pipeline context rewrite logic of handling running migration major task calc complete time in api getDataMigrationTaskOfIndex init Co-authored-by: liugq --- config/system_config.tpl | 40 +- model/instance.go | 104 +++ plugin/api/gateway/instance.go | 138 +--- plugin/elastic/activity.go | 2 +- plugin/migration/api.go | 918 ++++++++++++----------- plugin/migration/model.go | 35 +- plugin/migration/module.go | 4 +- plugin/migration/pipeline.go | 1293 +++++++++++++++++++++++++++----- 8 files changed, 1797 insertions(+), 737 deletions(-) diff --git a/config/system_config.tpl b/config/system_config.tpl index c5840724..fb9333dc 100644 --- a/config/system_config.tpl +++ b/config/system_config.tpl @@ -44,6 +44,7 @@ pipeline: fetch_max_messages: 100 queues: type: indexing_merge + tag: "metrics" when: cluster_available: ["$[[CLUSTER_ID]]"] - name: metadata_ingest @@ -78,11 +79,46 @@ pipeline: group: activity when: cluster_available: ["$[[CLUSTER_ID]]"] - - name: cluster_migration_split + - name: migration_task_dispatcher auto_start: true keep_running: true processor: - - cluster_migration: + - migration_dispatcher: elasticsearch: "$[[CLUSTER_ID]]" + check_instance_available: true + max_tasks_per_instance: 10 + task_batch_size: 50 + when: + cluster_available: ["$[[CLUSTER_ID]]"] + + - name: logging_indexing_merge + auto_start: true + keep_running: true + processor: + - indexing_merge: + input_queue: "logging" + idle_timeout_in_seconds: 1 + elasticsearch: "$[[CLUSTER_ID]]" + index_name: "$[[INDEX_PREFIX]]logs" + output_queue: + name: "pipeline-logs" + label: + tag: "request_logging" + worker_size: 1 + bulk_size_in_kb: 1 + - name: consume-logging_requests + auto_start: true + keep_running: true + processor: + - bulk_indexing: + bulk: + compress: true + batch_size_in_mb: 1 + batch_size_in_docs: 1 + consumer: + fetch_max_messages: 100 + queues: + type: indexing_merge + tag: "request_logging" when: cluster_available: ["$[[CLUSTER_ID]]"] \ No newline at end of file diff --git a/model/instance.go b/model/instance.go index 31da501f..92ac9d79 100644 --- a/model/instance.go +++ b/model/instance.go @@ -5,8 +5,14 @@ package model import ( + "context" + "fmt" "infini.sh/framework/core/agent" "infini.sh/framework/core/orm" + "infini.sh/framework/core/util" + "infini.sh/framework/modules/pipeline" + "net/http" + "time" ) @@ -22,3 +28,101 @@ type Instance struct { Tags [] string `json:"tags,omitempty"` Description string `json:"description,omitempty" config:"description" elastic_mapping:"description:{type:keyword}"` } + +func (inst *Instance) CreatePipeline(body []byte) error { + req := &util.Request{ + Method: http.MethodPost, + Body: body, + Url: inst.Endpoint + "/pipeline/tasks/", + } + return inst.doRequest(req, nil) +} + +func (inst *Instance) StopPipeline(ctx context.Context, pipelineID string) error { + req := &util.Request{ + Method: http.MethodPost, + Url: fmt.Sprintf("%s/pipeline/task/%s/_stop", inst.Endpoint, pipelineID), + Context: ctx, + } + return inst.doRequest(req, nil) +} + +func (inst *Instance) StopPipelineWithTimeout(pipelineID string, duration time.Duration) error { + ctx, cancel := context.WithTimeout(context.Background(), duration) + defer cancel() + return inst.StopPipeline(ctx, pipelineID) +} + +func (inst *Instance) StartPipeline(pipelineID string) error { + req := &util.Request{ + Method: http.MethodPost, + Url: fmt.Sprintf("%s/pipeline/task/%s/_start", inst.Endpoint, pipelineID), + } + return inst.doRequest(req, nil) +} + + +func (inst *Instance) DeletePipeline(pipelineID string) error { + req := &util.Request{ + Method: http.MethodDelete, + Url: fmt.Sprintf("%s/pipeline/task/%s", inst.Endpoint, pipelineID), + } + return inst.doRequest(req, nil) +} + +func (inst *Instance) GetPipelinesByIDs(pipelineIDs []string) (pipeline.GetPipelinesResponse, error) { + body := util.MustToJSONBytes(util.MapStr{ + "ids": pipelineIDs, + }) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + req := &util.Request{ + Method: http.MethodPost, + Url: fmt.Sprintf("%s/pipeline/tasks/_search", inst.Endpoint), + Body: body, + Context: ctx, + } + res := pipeline.GetPipelinesResponse{} + err := inst.doRequest(req, &res) + return res, err +} + +func (inst *Instance) DeleteQueueBySelector(selector util.MapStr) error { + req := &util.Request{ + Method: http.MethodDelete, + Url: fmt.Sprintf("%s/queue/_search", inst.Endpoint), + Body: util.MustToJSONBytes(util.MapStr{ + "selector": selector, + }), + } + return inst.doRequest(req, nil) +} + +func (inst *Instance) TryConnect(ctx context.Context) error { + req := &util.Request{ + Method: http.MethodGet, + Url: fmt.Sprintf("%s/_framework/api/_info", inst.Endpoint), + Context: ctx, + } + return inst.doRequest(req, nil) +} +func (inst *Instance) TryConnectWithTimeout(duration time.Duration) error { + ctx, cancel := context.WithTimeout(context.Background(), duration) + defer cancel() + return inst.TryConnect(ctx) +} + +func (inst *Instance) doRequest(req *util.Request, resBody interface{}) error { + req.SetBasicAuth(inst.BasicAuth.Username, inst.BasicAuth.Password) + result, err := util.ExecuteRequest(req) + if err != nil { + return err + } + if result.StatusCode != http.StatusOK { + return fmt.Errorf(string(result.Body)) + } + if resBody != nil { + return util.FromJSONBytes(result.Body, resBody) + } + return nil +} \ No newline at end of file diff --git a/plugin/api/gateway/instance.go b/plugin/api/gateway/instance.go index 2f9f836e..7a1f8e04 100644 --- a/plugin/api/gateway/instance.go +++ b/plugin/api/gateway/instance.go @@ -5,7 +5,6 @@ package gateway import ( - "context" "fmt" log "github.com/cihub/seelog" "github.com/segmentio/encoding/json" @@ -368,60 +367,7 @@ func (h *GatewayAPI) getExecutionNodes(w http.ResponseWriter, req *http.Request, if from < 0 { from = 0 } - agentIndexName := orm.GetIndexName(agent.Instance{}) gatewayIndexName := orm.GetIndexName(model.Instance{}) - agentMust := []util.MapStr{ - { - "term": util.MapStr{ - "enrolled": util.MapStr{ - "value": true, - }, - }, - }, - { - "term": util.MapStr{ - "status": util.MapStr{ - "value": "online", - }, - }, - }, - { - "term": util.MapStr{ - "_index": util.MapStr{ - "value": agentIndexName, - }, - }, - }, - } - - boolQ := util.MapStr{ - "minimum_should_match": 1, - "should": []util.MapStr{ - { - "bool": util.MapStr{ - "must": agentMust, - }, - }, - { - "term": util.MapStr{ - "_index": util.MapStr{ - "value": gatewayIndexName, - }, - }, - }, - }, - } - if keyword != "" { - boolQ["must"] = []util.MapStr{ - { - "prefix": util.MapStr{ - "name": util.MapStr{ - "value": keyword, - }, - }, - }, - } - } query := util.MapStr{ "size": size, @@ -433,12 +379,24 @@ func (h *GatewayAPI) getExecutionNodes(w http.ResponseWriter, req *http.Request, }, }, }, - "query": util.MapStr{ - "bool": boolQ, - }, + } + if keyword != "" { + query["query"] = util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "prefix": util.MapStr{ + "name": util.MapStr{ + "value": keyword, + }, + }, + }, + }, + }, + } } q := orm.Query{ - IndexName: fmt.Sprintf("%s,%s", gatewayIndexName, agentIndexName), + IndexName: gatewayIndexName, RawQuery: util.MustToJSONBytes(query), } err, result := orm.Search(nil, &q) @@ -457,63 +415,33 @@ func (h *GatewayAPI) getExecutionNodes(w http.ResponseWriter, req *http.Request, var nodes = []util.MapStr{} for _, hit := range searchRes.Hits.Hits { - var ( - endpoint string - ok bool - ) + buf := util.MustToJSONBytes(hit.Source) + inst := model.Instance{} + err = util.FromJSONBytes(buf, &inst) + if err != nil { + log.Error(err) + continue + } node := util.MapStr{ - "id": hit.Source["id"], - "name": hit.Source["name"], + "id": inst.ID, + "name": inst.Name, "available": false, + "type": "gateway", } - hasErr := false - if hit.Index == gatewayIndexName { - node["type"] = "gateway" - if endpoint, ok = hit.Source["endpoint"].(string); !ok { - log.Warnf("got unexpect endpoint type of gateway instance [%s]: %s", hit.ID, hit.Source["endpoint"]) - hasErr = true - } - }else if hit.Index == agentIndexName { - node["type"] = "agent" - endpoint = fmt.Sprintf("%s://%s:%v", hit.Source["schema"], hit.Source["remote_ip"], hit.Source["port"]) + ul, err := url.Parse(inst.Endpoint) + if err != nil { + log.Error(err) + continue } - ul, err := url.Parse(endpoint) + node["host"] = ul.Host + err = inst.TryConnectWithTimeout(time.Second) if err != nil { log.Error(err) }else{ - node["host"] = ul.Host + node["available"] = true } - if !hasErr { - available, err := isNodeAvailable(endpoint) //TODO remove - if err != nil { - log.Error(err) - } - node["available"] = available - } nodes = append(nodes, node) } h.WriteJSON(w, nodes, http.StatusOK) } - -func isNodeAvailable(endpoint string) (bool, error){ - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - rq := &util.Request{ - Method: http.MethodGet, - Url: fmt.Sprintf("%s%s", endpoint, "/pipeline/tasks/_dynamic"), - Context: ctx, - } - resp, err := util.ExecuteRequest(rq) - if err != nil { - return false, err - } - resBody := struct { - Success bool `json:"success"` - }{} - err = util.FromJSONBytes(resp.Body, &resBody) - if err != nil { - return false, err - } - return resBody.Success, nil -} \ No newline at end of file diff --git a/plugin/elastic/activity.go b/plugin/elastic/activity.go index 6f9cfcc4..37ff1079 100644 --- a/plugin/elastic/activity.go +++ b/plugin/elastic/activity.go @@ -263,7 +263,7 @@ func (processor *ActivityProcessor) HandleMessage(ctx *pipeline.Context, qConfig if timeout { log.Tracef("timeout on queue:[%v]", qConfig.Name) - ctx.Failed() + ctx.Failed(fmt.Errorf("timeout on queue:[%v]", qConfig.Name)) return } diff --git a/plugin/migration/api.go b/plugin/migration/api.go index ee75eac9..a73faf9e 100644 --- a/plugin/migration/api.go +++ b/plugin/migration/api.go @@ -7,16 +7,14 @@ package migration import ( "context" "fmt" - "github.com/buger/jsonparser" log "github.com/cihub/seelog" - "infini.sh/framework/core/agent" + "infini.sh/console/model" "infini.sh/framework/core/api" "infini.sh/framework/core/api/rbac" httprouter "infini.sh/framework/core/api/router" "infini.sh/framework/core/elastic" "infini.sh/framework/core/global" "infini.sh/framework/core/orm" - "infini.sh/framework/core/proxy" task2 "infini.sh/framework/core/task" "infini.sh/framework/core/util" "net/http" @@ -26,9 +24,8 @@ import ( ) -func InitAPI(bulkResultIndexName string) { +func InitAPI() { handler := APIHandler{ - bulkResultIndexName: bulkResultIndexName, } api.HandleAPIMethod(api.GET, "/migration/data/_search", handler.RequireLogin(handler.searchDataMigrationTask)) api.HandleAPIMethod(api.POST, "/migration/data", handler.RequireLogin(handler.createDataMigrationTask)) @@ -40,7 +37,7 @@ func InitAPI(bulkResultIndexName string) { api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_stop", handler.RequireLogin(handler.stopDataMigrationTask)) //api.HandleAPIMethod(api.GET, "/migration/data/:task_id", handler.getMigrationTask) api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info", handler.RequireLogin(handler.getDataMigrationTaskInfo)) - api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info/index", handler.RequireLogin(handler.getDataMigrationTaskOfIndex)) + api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info/:index/:doc_type", handler.RequireLogin(handler.getDataMigrationTaskOfIndex)) api.HandleAPIMethod(api.PUT, "/migration/data/:task_id/status", handler.RequireLogin(handler.updateDataMigrationTaskStatus)) } @@ -82,7 +79,7 @@ func (h *APIHandler) createDataMigrationTask(w http.ResponseWriter, req *http.Re Metadata: task2.Metadata{ Type: "pipeline", Labels: util.MapStr{ - "pipeline_id": "cluster_migration", + "business_id": "cluster_migration", "source_cluster_id": clusterTaskConfig.Cluster.Source.Id, "target_cluster_id": clusterTaskConfig.Cluster.Target.Id, "source_total_docs": totalDocs, @@ -91,11 +88,7 @@ func (h *APIHandler) createDataMigrationTask(w http.ResponseWriter, req *http.Re Cancellable: true, Runnable: false, Status: task2.StatusInit, - Parameters: map[string]interface{}{ - "pipeline": util.MapStr{ - "config": clusterTaskConfig, - }, - }, + Config: clusterTaskConfig, } t.ID = util.GetUUID() err = orm.Create(nil, &t) @@ -116,7 +109,7 @@ func (h *APIHandler) searchDataMigrationTask(w http.ResponseWriter, req *http.Re ) mustQ = append(mustQ, util.MapStr{ "term": util.MapStr{ - "metadata.labels.pipeline_id": util.MapStr{ + "metadata.labels.business_id": util.MapStr{ "value": "cluster_migration", }, }, @@ -173,43 +166,23 @@ func (h *APIHandler) searchDataMigrationTask(w http.ResponseWriter, req *http.Re h.WriteError(w, err.Error(), http.StatusInternalServerError) return } -mainLoop: for _, hit := range searchRes.Hits.Hits { sourceM := util.MapStr(hit.Source) - config, err := sourceM.GetValue("parameters.pipeline.config") - if err != nil { - log.Error(err) - continue - } - buf := util.MustToJSONBytes(config) + buf := util.MustToJSONBytes(sourceM["config"]) dataConfig := ElasticDataConfig{} err = util.FromJSONBytes(buf, &dataConfig) if err != nil { log.Error(err) continue } - var targetTotalDocs int64 + //var targetTotalDocs int64 if hit.Source["status"] == task2.StatusRunning { - esClient := elastic.GetClientNoPanic(dataConfig.Cluster.Target.Id) - if esClient == nil { - log.Warnf("cluster [%s] was not found", dataConfig.Cluster.Target.Id) + ts, err := getMajorTaskStatsFromInstances(hit.ID) + if err != nil { + log.Warnf("fetch progress info of task error: %v", err) continue } - for _, index := range dataConfig.Indices { - count, err := getIndexTaskDocCount(&index, esClient) - if err != nil { - log.Error(err) - continue mainLoop - } - targetTotalDocs += count - } - sourceM.Put("metadata.labels.target_total_docs", targetTotalDocs) - sourceTotalDocs, _ := sourceM.GetValue("metadata.labels.source_total_docs") - if sv, ok := sourceTotalDocs.(float64); ok{ - if int64(sv) == targetTotalDocs { - hit.Source["status"] = task2.StatusComplete - } - } + sourceM.Put("metadata.labels.target_total_docs", ts.IndexDocs) } } @@ -251,87 +224,7 @@ func (h *APIHandler) startDataMigration(w http.ResponseWriter, req *http.Request h.WriteError(w, fmt.Sprintf("task [%s] not found", taskID), http.StatusInternalServerError) return } - if obj.Status == "init" { - //root task - obj.Status = task2.StatusReady - }else if obj.Status == task2.StatusStopped { - if obj.Metadata.Labels != nil && obj.Metadata.Labels["level"] == "partition" { - obj.Status = task2.StatusReady - //update parent task status - if len(obj.ParentId) == 0 { - h.WriteError(w, fmt.Sprintf("empty parent id of task [%s]", taskID), http.StatusInternalServerError) - return - } - query := util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "terms": util.MapStr{ - "id": obj.ParentId, - }, - }, - }, - }, - } - queryDsl := util.MapStr{ - "query": query, - "script": util.MapStr{ - "source": fmt.Sprintf("ctx._source['status'] = '%s'", task2.StatusRunning), - }, - } - - err = orm.UpdateBy(obj, util.MustToJSONBytes(queryDsl)) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - }else{ - obj.Status = task2.StatusRunning - //update sub task status - query := util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "parent_id": util.MapStr{ - "value": taskID, - }, - }, - }, - { - "term": util.MapStr{ - "metadata.labels.pipeline_id": util.MapStr{ - "value": "index_migration", - }, - }, - }, - { - "terms": util.MapStr{ - "status": []string{task2.StatusError, task2.StatusStopped}, - }, - }, - }, - }, - } - queryDsl := util.MapStr{ - "query": query, - "script": util.MapStr{ - "source": fmt.Sprintf("ctx._source['status'] = '%s'", task2.StatusReady), - }, - } - - err = orm.UpdateBy(task2.Task{}, util.MustToJSONBytes(queryDsl)) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - } - - }else if obj.Status == task2.StatusError { - obj.Status = task2.StatusReady - } + obj.Status = task2.StatusReady err = orm.Update(nil, &obj) if err != nil { @@ -340,52 +233,41 @@ func (h *APIHandler) startDataMigration(w http.ResponseWriter, req *http.Request return } + if obj.Metadata.Labels != nil && obj.Metadata.Labels["business_id"] == "index_migration" && len(obj.ParentId) > 0 { + //update status of major task to running + query := util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "id": util.MapStr{ + "value": obj.ParentId[0], + }, + }, + }, + }, + }, + } + queryDsl := util.MapStr{ + "query": query, + "script": util.MapStr{ + "source": fmt.Sprintf("ctx._source['status'] = '%s'", task2.StatusRunning), + }, + } + + err = orm.UpdateBy(obj, util.MustToJSONBytes(queryDsl)) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + } + h.WriteJSON(w, util.MapStr{ "success": true, }, 200) } -func getNodeEndpoint(nodeID string) (string, error){ - indexName := ".infini_agent,.infini_instance" - query := util.MapStr{ - "size": 1, - "query": util.MapStr{ - "term": util.MapStr{ - "id": util.MapStr{ - "value": nodeID, - }, - }, - }, - } - q := orm.Query{ - IndexName: indexName, - RawQuery: util.MustToJSONBytes(query), - } - err, result := orm.Search(nil, &q) - if err != nil { - return "", err - } - if len(result.Result) == 0 { - return "", fmt.Errorf("node [%s] not found", nodeID) - } - if info, ok := result.Result[0].(map[string]interface{}); ok { - if v, ok := info["endpoint"]; ok { - if endpoint, ok := v.(string); ok { - return endpoint, nil - } - return "", fmt.Errorf("got invalid endpoint value: %v", v) - } - ag := agent.Instance{} - buf := util.MustToJSONBytes(info) - err = util.FromJSONBytes(buf, &ag) - if err != nil { - return "", err - } - return ag.GetEndpoint(), nil - } - return "", fmt.Errorf("got unexpect node info: %s", util.MustToJSON(result.Result[0])) -} - func (h *APIHandler) stopDataMigrationTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { id := ps.MustGetParameter("task_id") obj := task2.Task{} @@ -399,78 +281,58 @@ func (h *APIHandler) stopDataMigrationTask(w http.ResponseWriter, req *http.Requ }, http.StatusNotFound) return } - execution, _ := util.MapStr(obj.Parameters).GetValue("pipeline.config.settings.execution") - if execution == nil { - execution, err = util.MapStr(obj.Parameters).GetValue("pipeline.config.execution") - if err != nil { - errStr := fmt.Sprintf("get execution config in task %s error: %s", id, err.Error()) - h.WriteError(w, errStr, http.StatusInternalServerError) - log.Error(errStr) - return - } - } - buf := util.MustToJSONBytes(execution) - executionConfig := ExecutionConfig{} - err = util.FromJSONBytes(buf, &executionConfig) + //query all pipeline task(scroll/bulk_indexing) and then stop it + err = stopPipelineTasks(id) if err != nil { - h.WriteError(w, err.Error(), http.StatusInternalServerError) log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) return } - - if len(executionConfig.Nodes.Permit) == 0 { - h.WriteError(w, "node of running task can not found", http.StatusInternalServerError) - return - } - - query := util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "parent_id": util.MapStr{ - "value": id, - }, - }, - }, - { - "term": util.MapStr{ - "metadata.labels.pipeline_id": util.MapStr{ - "value": "index_migration", - }, - }, - }, - { - "terms": util.MapStr{ - "status": []string{task2.StatusRunning, task2.StatusInit}, - }, - }, - }, - }, - } - //todo reset stat_time? - queryDsl := util.MapStr{ - "query": util.MapStr{ + if obj.Metadata.Labels != nil && obj.Metadata.Labels["business_id"] == "cluster_migration" { + //update status of subtask to pending stop + query := util.MapStr{ "bool": util.MapStr{ - "minimum_should_match": 1, - "should": []util.MapStr{ + "must": []util.MapStr{ { "term": util.MapStr{ - "id": util.MapStr{ - "value": id, + "parent_id": util.MapStr{ + "value": obj.ID, + }, + }, + }, + { + "term": util.MapStr{ + "status": util.MapStr{ + "value": task2.StatusRunning, + }, + }, + }, + { + "term": util.MapStr{ + "metadata.labels.business_id": util.MapStr{ + "value": "index_migration", }, }, }, - query, }, }, - }, - "script": util.MapStr{ - "source": "ctx._source['status'] = 'stopped'", - }, - } + } + queryDsl := util.MapStr{ + "query": query, + "script": util.MapStr{ + "source": fmt.Sprintf("ctx._source['status'] = '%s'", task2.StatusPendingStop), + }, + } - err = orm.UpdateBy(task2.Task{}, util.MustToJSONBytes(queryDsl)) + err = orm.UpdateBy(obj, util.MustToJSONBytes(queryDsl)) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + } + obj.Status = task2.StatusPendingStop + err = orm.Update(nil, &obj) if err != nil { log.Error(err) h.WriteError(w, err.Error(), http.StatusInternalServerError) @@ -483,11 +345,7 @@ func (h *APIHandler) stopDataMigrationTask(w http.ResponseWriter, req *http.Requ } func getTaskConfig(task *task2.Task, config interface{}) error{ - configSec, err := util.MapStr(task.Parameters).GetValue("pipeline.config") - if err != nil { - return err - } - configBytes, err := util.ToJSONBytes(configSec) + configBytes, err := util.ToJSONBytes(task.Config) if err != nil { return err } @@ -593,93 +451,36 @@ func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.R return } - taskErrors, err := getErrorPartitionTasks(id) + indexState, err := getMajorTaskInfoByIndex(id) if err != nil { log.Error(err) h.WriteError(w, err.Error(), http.StatusInternalServerError) return } - //get status of sub task - //todo size config? - query := util.MapStr{ - "size": 1000, - "query": util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "parent_id": util.MapStr{ - "value": id, - }, - }, - },{ - "term": util.MapStr{ - "metadata.labels.pipeline_id": util.MapStr{ - "value": "index_migration", - }, - }, - }, - { - "term": util.MapStr{ - "metadata.labels.level": util.MapStr{ - "value": "index", - }, - }, - }, - }, - }, - }, - } - q := orm.Query{ - RawQuery: util.MustToJSONBytes(query), - } - err, result := orm.Search(task2.Task{}, &q) + realtimeIndexState, err := getMajorTaskByIndexFromES(id) if err != nil { log.Error(err) h.WriteError(w, err.Error(), http.StatusInternalServerError) return } - statusM := util.MapStr{} - for _, row := range result.Result { - if rowM, ok := row.(map[string]interface{}); ok { - if v, ok := rowM["id"].(string); ok { - statusM[v] = rowM["status"] - } - } - } var completedIndices int - targetESClient := elastic.GetClientNoPanic(taskConfig.Cluster.Target.Id) for i, index := range taskConfig.Indices { - if st, ok := statusM[index.TaskID]; ok { - taskConfig.Indices[i].Status = st.(string) - } - var count = index.Target.Docs - if taskConfig.Indices[i].Status != task2.StatusComplete || count == 0 { - if targetESClient == nil { - log.Warnf("cluster [%s] was not found", taskConfig.Cluster.Target.Id) - break - } - count, err = getIndexTaskDocCount(&index, targetESClient) - if err != nil { - log.Error(err) - continue - } - taskConfig.Indices[i].Target.Docs = count - } - percent := float64(count * 100) / float64(index.Source.Docs) + indexName := index.Source.GetUniqueIndexName() + count := indexState[indexName].IndexDocs + realtimeIndexState[indexName].IndexDocs + percent := count * 100 / float64(index.Source.Docs) if percent > 100 { percent = 100 } + taskConfig.Indices[i].Target.Docs = int64(count) taskConfig.Indices[i].Percent = util.ToFixed(percent, 2) - taskConfig.Indices[i].ErrorPartitions = taskErrors[index.TaskID] - if count == index.Source.Docs { + taskConfig.Indices[i].ErrorPartitions = indexState[indexName].ErrorPartitions + if int64(count) == index.Source.Docs { completedIndices ++ - taskConfig.Indices[i].Status = task2.StatusComplete } } cfg := global.MustLookup("cluster_migration_config") - if migrationConfig, ok := cfg.(*ClusterMigrationConfig); ok { + if migrationConfig, ok := cfg.(*DispatcherConfig); ok { if obj.Metadata.Labels == nil { obj.Metadata.Labels = util.MapStr{} } @@ -688,19 +489,32 @@ func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.R "index_name": migrationConfig.LogIndexName, } } - util.MapStr(obj.Parameters).Put("pipeline.config", taskConfig) + obj.Config = taskConfig obj.Metadata.Labels["completed_indices"] = completedIndices h.WriteJSON(w, obj, http.StatusOK) } -func getErrorPartitionTasks(taskID string) (map[string]int, error){ +func getMajorTaskInfoByIndex(taskID string) (map[string]IndexStateInfo, error){ query := util.MapStr{ "size": 0, "aggs": util.MapStr{ "group_by_task": util.MapStr{ "terms": util.MapStr{ - "field": "parent_id", + "field": "metadata.labels.unique_index_name", "size": 100, }, + "aggs": util.MapStr{ + "group_by_status": util.MapStr{ + "terms": util.MapStr{ + "field": "status", + "size": 100, + }, + }, + "total_docs": util.MapStr{ + "sum": util.MapStr{ + "field": "metadata.labels.index_docs", + }, + }, + }, }, }, "query": util.MapStr{ @@ -708,25 +522,11 @@ func getErrorPartitionTasks(taskID string) (map[string]int, error){ "must": []util.MapStr{ { "term": util.MapStr{ - "metadata.labels.pipeline_id": util.MapStr{ + "metadata.labels.business_id": util.MapStr{ "value": "index_migration", }, }, }, - { - "term": util.MapStr{ - "runnable": util.MapStr{ - "value": true, - }, - }, - }, - { - "term": util.MapStr{ - "status": util.MapStr{ - "value": task2.StatusError, - }, - }, - }, { "term": util.MapStr{ "parent_id": util.MapStr{ @@ -751,15 +551,36 @@ func getErrorPartitionTasks(taskID string) (map[string]int, error){ if err != nil { return nil, err } - resBody := map[string]int{} + resBody := map[string]IndexStateInfo{} if taskAgg, ok := searchRes.Aggregations["group_by_task"]; ok { for _, bk := range taskAgg.Buckets { if key, ok := bk["key"].(string); ok { - if key == taskID { - continue + //resBody[key] = int(bk["doc_count"].(float64)) + resBody[key] = IndexStateInfo{} + if statusAgg, ok := bk["group_by_status"].(map[string]interface{}); ok { + if sbks, ok := statusAgg["buckets"].([]interface{}); ok { + for _, sbk := range sbks { + if sbkM, ok := sbk.(map[string]interface{}); ok { + if sbkM["key"] == task2.StatusError { + if v, ok := sbkM["doc_count"].(float64); ok { + st := resBody[key] + st.ErrorPartitions = int(v) + resBody[key] = st + } + } + } + } + } + } + if indexDocsAgg, ok := bk["total_docs"].(map[string]interface{}); ok { + if v, ok := indexDocsAgg["value"].(float64); ok { + st := resBody[key] + st.IndexDocs = v + resBody[key] = st + } + } - resBody[key] = int(bk["doc_count"].(float64)) } } } @@ -809,69 +630,23 @@ func getIndexTaskDocCount(index *IndexConfig, targetESClient elastic.API) (int64 return countRes.Count, nil } -func getExecutionConfig(parameters map[string]interface{}, key string)(*ExecutionConfig, error){ - execution, err := util.MapStr(parameters).GetValue(key) - if err != nil { - return nil, err - } - buf := util.MustToJSONBytes(execution) - executionConfig := ExecutionConfig{} - err = util.FromJSONBytes(buf, &executionConfig) - return &executionConfig, err -} - -func getTaskStats(nodeID string) (map[string]interface{}, error){ - endpoint, err := getNodeEndpoint(nodeID) - if err != nil { - return nil, err - } - res, err := proxy.DoProxyRequest(&proxy.Request{ - Method: http.MethodGet, - Endpoint: endpoint, - Path: "/stats", - }) - - if err != nil { - return nil, fmt.Errorf("call stats api error: %w", err) - } - resBody := struct { - Stats map[string]interface{} `json:"stats"` - }{} - err = util.FromJSONBytes(res.Body, &resBody) - return resBody.Stats, err -} - func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ id := ps.MustGetParameter("task_id") - indexTask := task2.Task{} - indexTask.ID = id - exists, err := orm.Get(&indexTask) + indexName := ps.MustGetParameter("index") + docType := ps.MustGetParameter("doc_type") + majorTask := task2.Task{} + majorTask.ID = id + exists, err := orm.Get(&majorTask) if !exists || err != nil { h.WriteError(w, fmt.Sprintf("task [%s] not found", id), http.StatusInternalServerError) return } - var durationInMS int64 - if indexTask.StartTimeInMillis > 0 { - durationInMS = time.Now().UnixMilli() - indexTask.StartTimeInMillis - if indexTask.CompletedTime != nil && indexTask.Status == task2.StatusComplete { - durationInMS = indexTask.CompletedTime.UnixMilli() - indexTask.StartTimeInMillis - } - } var completedTime int64 - if indexTask.CompletedTime != nil { - completedTime = indexTask.CompletedTime.UnixMilli() - } taskInfo := util.MapStr{ "task_id": id, - "start_time": indexTask.StartTimeInMillis, - "status": indexTask.Status, - "completed_time": completedTime, - "duration": durationInMS, - } - if len(indexTask.Metadata.Labels) > 0 { - taskInfo["data_partition"] = indexTask.Metadata.Labels["partition_count"] + "start_time": majorTask.StartTimeInMillis, } partitionTaskQuery := util.MapStr{ "size": 500, @@ -891,10 +666,11 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt "value": id, }, }, - },{ + }, + { "term": util.MapStr{ - "metadata.labels.pipeline_id": util.MapStr{ - "value": "index_migration", + "metadata.labels.unique_index_name": util.MapStr{ + "value": fmt.Sprintf("%s:%s", indexName, docType), }, }, }, @@ -911,55 +687,103 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt h.WriteError(w, err.Error(), http.StatusInternalServerError) return } - executionConfig, err := getExecutionConfig(indexTask.Parameters, "pipeline.config.execution") - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - if len(executionConfig.Nodes.Permit) == 0 { - h.WriteError(w, "node of running task can not found", http.StatusInternalServerError) - return - } - stats, err := getTaskStats(executionConfig.Nodes.Permit[0].ID) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - var ptasks = make([]task2.Task, 0, len(result.Result)) - var ptaskIds = make([]string, 0, len(result.Result)) + + var subTasks []task2.Task + var pipelineTaskIDs = map[string][]string{} + pipelineSubParentIDs := map[string]string{} + subTaskStatus := map[string]string{} for _, row := range result.Result { buf := util.MustToJSONBytes(row) - ptask := task2.Task{} - err = util.FromJSONBytes(buf, &ptask) + subTask := task2.Task{} + err = util.FromJSONBytes(buf, &subTask) if err != nil { log.Error(err) continue } - ptasks = append(ptasks, ptask) - ptaskIds = append(ptaskIds, ptask.ID) + if subTask.Metadata.Labels != nil { + if subTask.Metadata.Labels["business_id"] == "index_migration" { + subTasks = append(subTasks, subTask) + subTaskStatus[subTask.ID] = subTask.Status + continue + } + if subTask.Metadata.Labels["pipeline_id"] == "es_scroll" || subTask.Metadata.Labels["pipeline_id"] == "bulk_indexing" { + if instID, ok := subTask.Metadata.Labels["execution_instance_id"].(string); ok { + pipelineTaskIDs[instID] = append(pipelineTaskIDs[instID], subTask.ID) + if pl := len(subTask.ParentId); pl > 0 { + if subTaskStatus[subTask.ParentId[pl-1]] == task2.StatusRunning { + pipelineSubParentIDs[subTask.ID] = subTask.ParentId[pl-1] + } + } + } + } + } } - indexingStats, err := getIndexingStats(ptaskIds, h.bulkResultIndexName) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return + taskInfo["data_partition"] = len(subTasks) + var taskStats = map[string]struct{ + ScrolledDocs float64 + IndexDocs float64 + }{} + for instID, taskIDs := range pipelineTaskIDs { + inst := &model.Instance{} + inst.ID = instID + _, err = orm.Get(inst) + if err != nil { + log.Error(err) + continue + } + pipelines, err := inst.GetPipelinesByIDs(taskIDs) + if err != nil { + log.Error(err) + continue + } + + for pipelineID, status := range pipelines { + if pid, ok := pipelineSubParentIDs[pipelineID]; ok { + if v, err := status.Context.GetValue("es_scroll.scrolled_docs"); err == nil { + if vv, ok := v.(float64); ok { + stat := taskStats[pid] + stat.ScrolledDocs = vv + taskStats[pid] = stat + } + } + if v, err := status.Context.GetValue("bulk_indexing.success.count"); err == nil { + if vv, ok := v.(float64); ok { + stat := taskStats[pid] + stat.IndexDocs = vv + taskStats[pid] = stat + } + } + } + } } var ( partitionTaskInfos []util.MapStr completedPartitions int + startTime int64 ) - for i, ptask := range ptasks { - start, _ := util.MapStr(ptask.Parameters).GetValue("pipeline.config.source.start") - end, _ := util.MapStr(ptask.Parameters).GetValue("pipeline.config.source.end") + if len(subTasks) > 0 { + startTime = subTasks[0].StartTimeInMillis + } + for i, ptask := range subTasks { + var ( + cfg map[string]interface{} + ok bool + ) + if cfg, ok = ptask.Config.(map[string]interface{}); !ok { + continue + } + start, _ := util.MapStr(cfg).GetValue("source.start") + end, _ := util.MapStr(cfg).GetValue("source.end") if i == 0 { - step, _ := util.MapStr(ptask.Parameters).GetValue("pipeline.config.source.step") + step, _ := util.MapStr(cfg).GetValue("source.step") taskInfo["step"] = step } - durationInMS = 0 + var durationInMS int64 = 0 if ptask.StartTimeInMillis > 0 { + if ptask.StartTimeInMillis < startTime { + startTime = ptask.StartTimeInMillis + } durationInMS = time.Now().UnixMilli() - ptask.StartTimeInMillis if ptask.CompletedTime != nil && (ptask.Status == task2.StatusComplete || ptask.Status == task2.StatusError) { durationInMS = ptask.CompletedTime.UnixMilli() - ptask.StartTimeInMillis @@ -969,23 +793,31 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt scrollDocs float64 indexDocs float64 ) - var stKey = fmt.Sprintf("scrolling_processing.%s", ptask.ID) - if pt, ok := stats[stKey]; ok { - if ptv, ok := pt.(map[string]interface{}); ok { - if v, ok := ptv["docs"].(float64); ok { - scrollDocs = v + if stat, ok := taskStats[ptask.ID]; ok { + scrollDocs = stat.ScrolledDocs + indexDocs = stat.IndexDocs + }else{ + if ptask.Status == task2.StatusComplete || ptask.Status == task2.StatusError { + if ptask.Metadata.Labels != nil { + if v, ok := ptask.Metadata.Labels["scrolled_docs"].(float64); ok{ + scrollDocs = v + } + if v, ok := ptask.Metadata.Labels["index_docs"].(float64); ok{ + indexDocs = v + } } } } - if v, ok := indexingStats[ptask.ID]; ok { - indexDocs = v - } + var subCompletedTime int64 if ptask.CompletedTime != nil { subCompletedTime = ptask.CompletedTime.UnixMilli() + if subCompletedTime > completedTime { + completedTime = subCompletedTime + } } - partitionTotalDocs, _ := util.MapStr(ptask.Parameters).GetValue("pipeline.config.source.doc_count") + partitionTotalDocs, _ := util.MapStr(cfg).GetValue("source.doc_count") partitionTaskInfos = append(partitionTaskInfos, util.MapStr{ "task_id": ptask.ID, "status": ptask.Status, @@ -998,10 +830,17 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt "index_docs": indexDocs, "total_docs": partitionTotalDocs, }) - if ptask.Status == task2.StatusComplete { + if ptask.Status == task2.StatusComplete || ptask.Status == task2.StatusError { completedPartitions++ } } + if len(subTasks) == completedPartitions { + taskInfo["completed_time"] = completedTime + taskInfo["duration"] = completedTime - startTime + }else{ + taskInfo["duration"] = time.Now().UnixMilli() - startTime + } + taskInfo["start_time"] = startTime taskInfo["partitions"] = partitionTaskInfos taskInfo["completed_partitions"] = completedPartitions h.WriteJSON(w, taskInfo, http.StatusOK) @@ -1179,49 +1018,244 @@ func (h *APIHandler) validateMultiType(w http.ResponseWriter, req *http.Request, } , http.StatusOK) } -func getIndexingStats(taskIDs []string, indexName string) (map[string]float64, error){ - if len(taskIDs) == 0 { - return nil, fmt.Errorf("taskIDs should not be empty") - } - q := util.MapStr{ - "size": 0, +func stopPipelineTasks(parentID string) error { + queryDsl := util.MapStr{ + "size": 1000, "query": util.MapStr{ - "terms": util.MapStr{ - "labels.queue": taskIDs, - }, - }, - "aggs": util.MapStr{ - "gp_task": util.MapStr{ - "terms": util.MapStr{ - "field": "labels.queue", - "size": len(taskIDs), - }, - "aggs": util.MapStr{ - "success_count": util.MapStr{ - "sum": util.MapStr{ - "field": "bulk_results.summary.success.count", + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "parent_id": util.MapStr{ + "value": parentID, + }, + }, + }, + { + "terms": util.MapStr{ + "metadata.labels.pipeline_id": []string{"es_scroll","bulk_indexing"}, }, }, }, }, }, } - query := orm.Query{ - RawQuery: util.MustToJSONBytes(q), - IndexName: indexName, + q := orm.Query{ + RawQuery: util.MustToJSONBytes(queryDsl), } - err, result := orm.Search(nil, &query) + err, result := orm.Search(task2.Task{}, &q) if err != nil { - return nil, fmt.Errorf("query indexing stats error: %w", err) + return err } - statsM := map[string]float64{} - jsonparser.ArrayEach(result.Raw, func(value []byte, dataType jsonparser.ValueType, offset int, err error) { - key, _ := jsonparser.GetString(value, "key") - successCount, err := jsonparser.GetFloat(value, "success_count", "value") + + for _, hit := range result.Result { + buf, err := util.ToJSONBytes(hit) + if err != nil { + return err + } + tk := task2.Task{} + err = util.FromJSONBytes(buf, &tk) + if err != nil { + return err + } + if tk.Metadata.Labels != nil { + if instID, ok := tk.Metadata.Labels["execution_instance_id"].(string); ok { + inst := model.Instance{} + inst.ID = instID + _, err = orm.Get(&inst) + if err != nil { + return err + } + err = inst.StopPipeline(context.Background(), tk.ID) + if err != nil { + log.Error(err) + continue + } + } + } + } + return nil +} + +func getMajorTaskStatsFromInstances(majorTaskID string) (taskStats MajorTaskState, err error) { + taskQuery := util.MapStr{ + "size": 500, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "parent_id": util.MapStr{ + "value": majorTaskID, + }, + }, + }, + { + "bool": util.MapStr{ + "minimum_should_match": 1, + "should": []util.MapStr{ + { + "term": util.MapStr{ + "metadata.labels.pipeline_id": util.MapStr{ + "value": "bulk_indexing", + }, + }, + }, + { + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "metadata.labels.business_id": util.MapStr{ + "value": "index_migration", + }, + }, + }, + { + "terms": util.MapStr{ + "status": []string{task2.StatusComplete, task2.StatusError}, + }, + }, + }, + }, + }, + }, + }, + }, + + }, + }, + }, + } + q := &orm.Query{ + RawQuery: util.MustToJSONBytes(taskQuery), + } + err, result := orm.Search(task2.Task{}, q) + if err != nil { + return taskStats, err + } + var pipelineTaskIDs = map[string][]string{} + for _, row := range result.Result { + buf := util.MustToJSONBytes(row) + subTask := task2.Task{} + err = util.FromJSONBytes(buf, &subTask) if err != nil { log.Error(err) + continue } - statsM[key] = successCount - }, "aggregations", "gp_task", "buckets") - return statsM, nil + if subTask.Metadata.Labels != nil { + //add indexDocs of already complete + if subTask.Metadata.Labels["business_id"] == "index_migration" { + if v, ok := subTask.Metadata.Labels["index_docs"].(float64); ok { + taskStats.IndexDocs += v + } + continue + } + if instID, ok := subTask.Metadata.Labels["execution_instance_id"].(string); ok { + pipelineTaskIDs[instID] = append(pipelineTaskIDs[instID], subTask.ID) + } + } + } + for instID, taskIDs := range pipelineTaskIDs { + inst := &model.Instance{} + inst.ID = instID + _, err = orm.Get(inst) + if err != nil { + log.Error(err) + continue + } + pipelines, err := inst.GetPipelinesByIDs(taskIDs) + if err != nil { + log.Error(err) + continue + } + + for _, status := range pipelines { + if v, err := status.Context.GetValue("bulk_indexing.success.count"); err == nil { + if vv, ok := v.(float64); ok { + taskStats.IndexDocs += vv + } + } + } + } + return taskStats, nil +} + +func getMajorTaskByIndexFromES(majorTaskID string)(map[string]IndexStateInfo, error){ + taskQuery := util.MapStr{ + "size": 500, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "parent_id": util.MapStr{ + "value": majorTaskID, + }, + }, + }, + { + "term": util.MapStr{ + "metadata.labels.pipeline_id": util.MapStr{ + "value": "bulk_indexing", + }, + }, + }, + }, + }, + }, + } + q := &orm.Query{ + RawQuery: util.MustToJSONBytes(taskQuery), + } + err, result := orm.Search(task2.Task{}, q) + if err != nil { + return nil, err + } + var pipelineTaskIDs = map[string][]string{} + var pipelineIndexNames = map[string]string{} + for _, row := range result.Result { + buf := util.MustToJSONBytes(row) + subTask := task2.Task{} + err = util.FromJSONBytes(buf, &subTask) + if err != nil { + log.Error(err) + continue + } + if subTask.Metadata.Labels != nil { + if instID, ok := subTask.Metadata.Labels["execution_instance_id"].(string); ok { + pipelineTaskIDs[instID] = append(pipelineTaskIDs[instID], subTask.ID) + } + if indexName, ok := subTask.Metadata.Labels["unique_index_name"].(string); ok { + pipelineIndexNames[subTask.ID] = indexName + } + } + } + state := map[string]IndexStateInfo{} + for instID, taskIDs := range pipelineTaskIDs { + inst := &model.Instance{} + inst.ID = instID + _, err = orm.Get(inst) + if err != nil { + log.Error(err) + continue + } + pipelines, err := inst.GetPipelinesByIDs(taskIDs) + if err != nil { + log.Error(err) + continue + } + + for pipelineID, status := range pipelines { + indexName := pipelineIndexNames[pipelineID] + if v, err := status.Context.GetValue("bulk_indexing.success.count"); err == nil { + if vv, ok := v.(float64); ok && indexName != ""{ + st := state[indexName] + st.IndexDocs += vv + state[indexName] = st + } + } + } + } + return state, nil } \ No newline at end of file diff --git a/plugin/migration/model.go b/plugin/migration/model.go index 74a4139f..9baf15a4 100644 --- a/plugin/migration/model.go +++ b/plugin/migration/model.go @@ -4,6 +4,8 @@ package migration +import "fmt" + type ElasticDataConfig struct { Cluster struct { Source ClusterInfo `json:"source"` @@ -21,6 +23,9 @@ type ElasticDataConfig struct { Bulk struct { Docs int `json:"docs"` StoreSizeInMB int `json:"store_size_in_mb"` + MaxWorkerSize int `json:"max_worker_size"` + IdleTimeoutInSeconds int `json:"idle_timeout_in_seconds"` + SliceSize int `json:"slice_size"` } `json:"bulk"` Execution ExecutionConfig `json:"execution"` } `json:"settings"` @@ -54,8 +59,8 @@ type IndexConfig struct { IndexRename map[string]interface{} `json:"index_rename"` TypeRename map[string]interface{} `json:"type_rename"` Partition *IndexPartition `json:"partition,omitempty"` - TaskID string `json:"task_id,omitempty"` - Status string `json:"status,omitempty"` + //TaskID string `json:"task_id,omitempty"` + //Status string `json:"status,omitempty"` Percent float64 `json:"percent,omitempty"` ErrorPartitions int `json:"error_partitions,omitempty"` } @@ -72,7 +77,33 @@ type IndexInfo struct { StoreSizeInBytes int `json:"store_size_in_bytes"` } +func (ii *IndexInfo) GetUniqueIndexName() string{ + return fmt.Sprintf("%s:%s", ii.Name, ii.DocType) +} + type ClusterInfo struct { Id string `json:"id"` Name string `json:"name"` +} + +type TaskCompleteState struct { + IsComplete bool + Error string + ClearPipeline bool + PipelineIds []string + SuccessDocs float64 + ScrolledDocs float64 + RunningPhase int + TotalDocs interface{} +} + +type MajorTaskState struct{ + ScrolledDocs float64 + IndexDocs float64 + Status string +} + +type IndexStateInfo struct { + ErrorPartitions int + IndexDocs float64 } \ No newline at end of file diff --git a/plugin/migration/module.go b/plugin/migration/module.go index 67271880..de2d4b73 100644 --- a/plugin/migration/module.go +++ b/plugin/migration/module.go @@ -15,12 +15,11 @@ func (module *Module) Name() string { } func (module *Module) Setup() { - module.BulkResultIndexName = ".infini_async_bulk_results" exists, err := env.ParseConfig("migration", module) if exists && err != nil { log.Error(err) } - InitAPI(module.BulkResultIndexName) + InitAPI() } func (module *Module) Start() error { return nil @@ -31,7 +30,6 @@ func (module *Module) Stop() error { } type Module struct { - BulkResultIndexName string `config:"bulk_result_index_name"` } func init() { diff --git a/plugin/migration/pipeline.go b/plugin/migration/pipeline.go index ff74db7f..42593863 100644 --- a/plugin/migration/pipeline.go +++ b/plugin/migration/pipeline.go @@ -5,45 +5,55 @@ package migration import ( + "errors" "fmt" log "github.com/cihub/seelog" + "infini.sh/console/model" "infini.sh/framework/core/config" "infini.sh/framework/core/elastic" "infini.sh/framework/core/env" + "infini.sh/framework/core/event" "infini.sh/framework/core/global" "infini.sh/framework/core/orm" "infini.sh/framework/core/pipeline" task2 "infini.sh/framework/core/task" "infini.sh/framework/core/util" "infini.sh/framework/modules/elastic/common" - "runtime" + "math" + "strings" + "syscall" "time" ) -type ClusterMigrationProcessor struct { +type DispatcherProcessor struct { id string - config *ClusterMigrationConfig + config *DispatcherConfig + state map[string]DispatcherState } -type ClusterMigrationConfig struct { +type DispatcherConfig struct { Elasticsearch string `config:"elasticsearch,omitempty"` IndexName string `config:"index_name"` - DetectIntervalInMs int `config:"detect_interval_in_ms"` LogIndexName string `config:"log_index_name"` + MaxTasksPerInstance int `config:"max_tasks_per_instance"` + CheckInstanceAvailable bool `config:"check_instance_available"` + TaskBatchSize int `config:"task_batch_size"` +} + +type DispatcherState struct { + Total int } func init() { - pipeline.RegisterProcessorPlugin("cluster_migration", newClusterMigrationProcessor) + pipeline.RegisterProcessorPlugin("migration_dispatcher", newMigrationDispatcherProcessor) } -func newClusterMigrationProcessor(c *config.Config) (pipeline.Processor, error) { +func newMigrationDispatcherProcessor(c *config.Config) (pipeline.Processor, error) { - cfg := ClusterMigrationConfig{ - DetectIntervalInMs: 5000, - } + cfg := DispatcherConfig{} if err := c.Unpack(&cfg); err != nil { log.Error(err) - return nil, fmt.Errorf("failed to unpack the configuration of cluster migration processor: %s", err) + return nil, fmt.Errorf("failed to unpack the configuration of migration dispatcher processor: %s", err) } if cfg.IndexName == "" || cfg.LogIndexName == "" { ormConfig := common.ORMConfig{} @@ -53,7 +63,7 @@ func newClusterMigrationProcessor(c *config.Config) (pipeline.Processor, error) cfg.IndexName = fmt.Sprintf("%stask", ormConfig.IndexPrefix) } if cfg.LogIndexName == "" { - cfg.LogIndexName = fmt.Sprintf("%stask-log", ormConfig.IndexPrefix) + cfg.LogIndexName = fmt.Sprintf("%slogs", ormConfig.IndexPrefix) } }else{ err = fmt.Errorf("parse config elastic.orm error: %w", err) @@ -62,96 +72,92 @@ func newClusterMigrationProcessor(c *config.Config) (pipeline.Processor, error) } } global.Register("cluster_migration_config", &cfg) + if cfg.MaxTasksPerInstance <= 0 { + cfg.MaxTasksPerInstance = 10 + } + if cfg.TaskBatchSize <= 0 { + cfg.TaskBatchSize = 50 + } - processor := ClusterMigrationProcessor{ + //query and then init dispatcher state + processor := DispatcherProcessor{ id: util.GetUUID(), config: &cfg, + state: map[string]DispatcherState{}, } + state, err := processor.getInstanceTaskState() + if err != nil { + log.Error(err) + return nil, err + } + processor.state = state return &processor, nil } -func (p *ClusterMigrationProcessor) Name() string { - return "cluster_migration" +func (p *DispatcherProcessor) Name() string { + return "migration_dispatcher" } -func (p *ClusterMigrationProcessor) Process(ctx *pipeline.Context) error { - defer func() { - if !global.Env().IsDebug { - if r := recover(); r != nil { - var v string - switch r.(type) { - case error: - v = r.(error).Error() - case runtime.Error: - v = r.(runtime.Error).Error() - case string: - v = r.(string) - } - log.Errorf("error in %s processor: %v", p.Name(), v) - } - } - log.Tracef("exit %s processor", p.Name()) - }() - +func (p *DispatcherProcessor) Process(ctx *pipeline.Context) error { for { if ctx.IsCanceled() { return nil } - tasks, err := p.getClusterMigrationTasks(20) + tasks, err := p.getMigrationTasks(p.config.TaskBatchSize) if err != nil { - panic(err) + return err } if len(tasks) == 0 { log.Debug("got zero cluster migration task from es") - if p.config.DetectIntervalInMs > 0 { - time.Sleep(time.Millisecond * time.Duration(p.config.DetectIntervalInMs)) - } + return nil } for _, t := range tasks { if ctx.IsCanceled() { return nil } - t.Status = task2.StatusRunning - t.StartTimeInMillis = time.Now().UnixMilli() - p.writeTaskLog(&t, &task2.Log{ - ID: util.GetUUID(), - TaskId: t.ID, - Status: task2.StatusRunning, - Type: t.Metadata.Type, - Action: task2.LogAction{ - Parameters: t.Parameters, - }, - Content: fmt.Sprintf("starting to execute task [%s]", t.ID), - Timestamp: time.Now().UTC(), - }) - err = p.SplitMigrationTask(&t) - taskLog := &task2.Log{ - ID: util.GetUUID(), - TaskId: t.ID, - Status: task2.StatusRunning, - Type: t.Metadata.Type, - Action: task2.LogAction{ - Parameters: t.Parameters, - Result: &task2.LogResult{ - Success: true, - }, - }, - Content: fmt.Sprintf("success to split task [%s]", t.ID), - Timestamp: time.Now().UTC(), + if t.Metadata.Labels == nil { + log.Error("got migration task with empty labels, skip handling: %v", t) + continue } - if err != nil { - taskLog.Status = task2.StatusError - taskLog.Content = fmt.Sprintf("failed to split task [%s]: %v", t.ID, err) - taskLog.Action.Result = &task2.LogResult{ - Success: false, - Error: err.Error(), + if t.Metadata.Labels["business_id"] == "cluster_migration" { + //handle major task + switch t.Status { + case task2.StatusReady: + err = p.handleReadyMajorTask(&t) + case task2.StatusRunning: + err = p.handleRunningMajorTask(&t) + case task2.StatusPendingStop: + err = p.handlePendingStopMajorTask(&t) + } + }else if t.Metadata.Labels["business_id"] == "index_migration" { + //handle sub migration task + switch t.Status { + case task2.StatusReady: + err = p.handleReadySubTask(&t) + case task2.StatusRunning: + err = p.handleRunningSubTask(&t) + case task2.StatusPendingStop: + err = p.handlePendingStopSubTask(&t) } } - t.Status = taskLog.Status - p.writeTaskLog(&t, taskLog) if err != nil { - continue + t.Status = task2.StatusError + tn := time.Now() + t.CompletedTime = &tn + p.saveTaskAndWriteLog(&t, &task2.Log{ + ID: util.GetUUID(), + TaskId: t.ID, + Status: task2.StatusError, + Type: t.Metadata.Type, + Config: t.Config, + Result: &task2.LogResult{ + Success: false, + Error: err.Error(), + }, + Message: fmt.Sprintf("failed to handling task [%s]: [%v]", t.ID, err), + Timestamp: time.Now().UTC(), + },"") } } //es index refresh @@ -159,34 +165,773 @@ func (p *ClusterMigrationProcessor) Process(ctx *pipeline.Context) error { } } - -func (p *ClusterMigrationProcessor) SplitMigrationTask(taskItem *task2.Task) error { +func (p *DispatcherProcessor) handleReadyMajorTask(taskItem *task2.Task) error{ if taskItem.Metadata.Labels == nil { - return fmt.Errorf("empty metadata labels, unexpected cluster migration task: %s", util.MustToJSON(taskItem)) + return fmt.Errorf("got migration task with empty labels, skip handling: %v", taskItem) } - if taskItem.Metadata.Labels["pipeline_id"] != p.Name() { - log.Tracef("got unexpect task type of %s with task id [%s] in cluster migration processor", taskItem.Metadata.Type, taskItem.ID) - return nil + if taskItem.Metadata.Labels["is_split"] != true { + err := p.splitMajorMigrationTask(taskItem) + if err != nil { + return err + } + taskItem.Metadata.Labels["is_split"] = true + }else{ + taskItem.RetryTimes++ } - parameters := util.MapStr(taskItem.Parameters) - migrationConfig, err := parameters.GetValue("pipeline.config") + //update status of subtask to ready + query := util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "parent_id": util.MapStr{ + "value": taskItem.ID, + }, + }, + }, + { + "terms": util.MapStr{ + "status": []string{task2.StatusError, task2.StatusStopped}, + }, + }, + { + "term": util.MapStr{ + "metadata.labels.business_id": util.MapStr{ + "value": "index_migration", + }, + }, + }, + }, + }, + } + queryDsl := util.MapStr{ + "query": query, + "script": util.MapStr{ + "source": fmt.Sprintf("ctx._source['status'] = '%s'", task2.StatusReady), + }, + } + + esClient := elastic.GetClient(p.config.Elasticsearch) + _, err := esClient.UpdateByQuery(p.config.IndexName, util.MustToJSONBytes(queryDsl) ) if err != nil { return err } - buf := util.MustToJSONBytes(migrationConfig) + taskLog := &task2.Log{ + ID: util.GetUUID(), + TaskId: taskItem.ID, + Status: task2.StatusRunning, + Type: taskItem.Metadata.Type, + Config: taskItem.Config, + Result: &task2.LogResult{ + Success: true, + }, + Message: fmt.Sprintf("success to start task [%s]", taskItem.ID), + Timestamp: time.Now().UTC(), + } + taskItem.Status = task2.StatusRunning + p.saveTaskAndWriteLog(taskItem, taskLog, "") + return nil +} + +func (p *DispatcherProcessor) handlePendingStopMajorTask(taskItem *task2.Task) error{ + //check whether all pipeline task is stopped or not, then update task status + q := util.MapStr{ + "size": 200, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "metadata.labels.business_id": "index_migration", + }, + }, + { + "terms": util.MapStr{ + "status": []string{task2.StatusRunning, task2.StatusPendingStop}, + }, + }, + }, + }, + }, + } + tasks, err := p.getTasks(q) + if err != nil { + return err + } + // all subtask stopped or error or complete + if len(tasks) == 0 { + taskItem.Status = task2.StatusStopped + p.saveTaskAndWriteLog(taskItem, &task2.Log{ + ID: util.GetUUID(), + TaskId: taskItem.ID, + Status: task2.StatusStopped, + Type: taskItem.Metadata.Type, + Config: taskItem.Config, + Message: fmt.Sprintf("task [%s] is stopped", taskItem.ID), + Timestamp: time.Now().UTC(), + },"") + } + return nil +} +func (p *DispatcherProcessor) handleRunningMajorTask(taskItem *task2.Task) error{ + ts, err := p.getMajorTaskState(taskItem) + if err != nil { + return err + } + if ts.Status == task2.StatusComplete || ts.Status == task2.StatusError { + taskItem.Metadata.Labels["target_total_docs"] = ts.IndexDocs + taskItem.Status = ts.Status + tn := time.Now() + taskItem.CompletedTime = &tn + p.saveTaskAndWriteLog(taskItem, &task2.Log{ + ID: util.GetUUID(), + TaskId: taskItem.ID, + Status: taskItem.Status, + Type: taskItem.Metadata.Type, + Config: taskItem.Config, + Message: fmt.Sprintf("task [%s] is complete", taskItem.ID), + Timestamp: time.Now().UTC(), + },"") + } + return nil +} + +func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error{ + state, err := p.getTaskCompleteState(taskItem) + if err != nil { + return err + } + if state.IsComplete { + if taskItem.Metadata.Labels != nil { + taskItem.Metadata.Labels["index_docs"] = state.SuccessDocs + taskItem.Metadata.Labels["scrolled_docs"] = state.ScrolledDocs + if instanceID, ok := taskItem.Metadata.Labels["execution_instance_id"].(string); ok { + inst := model.Instance{} + inst.ID = instanceID + _, err = orm.Get(&inst) + if err == nil { + for _, pipelineID := range state.PipelineIds { + err = inst.DeletePipeline(pipelineID) + if err != nil { + log.Error(err) + continue + } + selector := util.MapStr{ + "labels": util.MapStr{ + "migration_task_id": taskItem.ID, + }, + } + //clear queue + err = inst.DeleteQueueBySelector(selector) + if err != nil { + log.Error(err) + } + } + } + if st, ok := p.state[instanceID]; ok { + st.Total -= 1 + p.state[instanceID] = st + } + } + + } + if state.Error != "" && state.TotalDocs != state.SuccessDocs { + taskItem.Status = task2.StatusError + }else { + taskItem.Status = task2.StatusComplete + } + + tn := time.Now() + taskItem.CompletedTime = &tn + p.saveTaskAndWriteLog(taskItem, &task2.Log{ + ID: util.GetUUID(), + TaskId: taskItem.ID, + Status: taskItem.Status, + Type: taskItem.Metadata.Type, + Config: taskItem.Config, + Result: &task2.LogResult{ + Success: state.Error == "", + Error: state.Error, + }, + Message: fmt.Sprintf("task [%s] is complete", taskItem.ID), + Timestamp: time.Now().UTC(), + },"") + }else{ + if state.RunningPhase == 1 && taskItem.Metadata.Labels["running_phase"] == float64(1) { + ptasks, err := p.getPipelineTasks(taskItem.ID) + if err != nil { + return err + } + var bulkTask *task2.Task + for i, t := range ptasks { + if t.Metadata.Labels != nil { + if t.Metadata.Labels["pipeline_id"] == "bulk_indexing" { + bulkTask = &ptasks[i] + } + } + } + if bulkTask == nil { + return fmt.Errorf("can not found bulk_indexing pipeline of sub task [%s]", taskItem.ID) + } + if bulkTask.Metadata.Labels != nil { + if instID, ok := bulkTask.Metadata.Labels["execution_instance_id"].(string); ok { + inst := &model.Instance{} + inst.ID = instID + _, err = orm.Get(inst) + if err != nil { + return err + } + err = inst.CreatePipeline(util.MustToJSONBytes(bulkTask.Config)) + if err != nil { + return err + } + taskItem.Metadata.Labels["running_phase"] = 2 + } + } + p.saveTaskAndWriteLog(taskItem,nil, "wait_for") + } + } + return nil +} + +func (p *DispatcherProcessor) handlePendingStopSubTask(taskItem *task2.Task) error{ + //check whether all pipeline task is stopped or not, then update task status + ptasks, err := p.getPipelineTasks(taskItem.ID) + if err != nil { + return err + } + var taskIDs []string + for _, t := range ptasks { + taskIDs = append(taskIDs, t.ID) + } + esClient := elastic.GetClient(p.config.Elasticsearch) + q := util.MapStr{ + "size": len(taskIDs), + "sort": []util.MapStr{ + { + "payload.pipeline.status_log.steps": util.MapStr{ + "order": "desc", + }, + }, + }, + "collapse": util.MapStr{ + "field": "metadata.labels.task_id", + }, + "query": util.MapStr{ + "terms": util.MapStr{ + "metadata.labels.task_id": taskIDs, + }, + }, + } + searchRes, err := esClient.SearchWithRawQueryDSL(p.config.LogIndexName, util.MustToJSONBytes(q)) + if err != nil { + return err + } + if len(searchRes.Hits.Hits) == 0 { + return nil + } + MainLoop: + for _, hit := range searchRes.Hits.Hits { + status, _ := util.MapStr(hit.Source).GetValue("payload.pipeline.status_log.status") + if status != "STOPPED" { + //call instance api to stop scroll/bulk_indexing pipeline task + if instID, ok := taskItem.Metadata.Labels["execution_instance_id"].(string); ok { + inst := model.Instance{} + inst.ID = instID + _, err = orm.Get(&inst) + if err != nil { + return err + } + hasStopped := true + for _, pipelineID := range taskIDs { + err = inst.StopPipelineWithTimeout(pipelineID, time.Second) + if err != nil { + if !errors.Is(err, syscall.ECONNREFUSED) && !strings.Contains(err.Error(), "task not found"){ + hasStopped = false + break + } + log.Error(err) + } + } + if hasStopped { + break MainLoop + } + } + return nil + } + } + taskItem.Status = task2.StatusStopped + + //delete pipeline and clear queue + if instanceID, ok := taskItem.Metadata.Labels["execution_instance_id"].(string); ok { + inst := model.Instance{} + inst.ID = instanceID + _, err = orm.Get(&inst) + if err != nil { + return err + } + for _, pipelineID := range taskIDs { + err = inst.DeletePipeline(pipelineID) + if err != nil { + log.Error(err) + continue + } + selector := util.MapStr{ + "labels": util.MapStr{ + "migration_task_id": taskItem.ID, + }, + } + //clear queue + err = inst.DeleteQueueBySelector(selector) + if err != nil { + log.Error(err) + } + } + if st, ok := p.state[instanceID]; ok { + st.Total -= 1 + p.state[instanceID] = st + } + } + p.saveTaskAndWriteLog(taskItem, &task2.Log{ + ID: util.GetUUID(), + TaskId: taskItem.ID, + Status: task2.StatusStopped, + Type: taskItem.Metadata.Type, + Config: taskItem.Config, + Message: fmt.Sprintf("task [%s] is stopped", taskItem.ID), + Timestamp: time.Now().UTC(), + },"") + return nil +} + +func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error{ + if taskItem.Metadata.Labels == nil { + return fmt.Errorf("empty labels") + } + var ( + scrollTask *task2.Task + bulkTask *task2.Task + ) + if taskItem.Metadata.Labels["is_split"] == true { + //query split pipeline task + ptasks, err := p.getPipelineTasks(taskItem.ID) + if err != nil { + return err + } + for i, t := range ptasks { + if t.Metadata.Labels != nil { + if cfg, ok := ptasks[i].Config.(map[string]interface{}); ok { + util.MapStr(cfg).Put("labels.retry_no", taskItem.RetryTimes + 1) + } + if t.Metadata.Labels["pipeline_id"] == "es_scroll" { + scrollTask = &ptasks[i] + }else if t.Metadata.Labels["pipeline_id"] == "bulk_indexing" { + bulkTask = &ptasks[i] + } + } + } + if scrollTask == nil || bulkTask == nil { + return fmt.Errorf("es_scroll or bulk_indexing pipeline task not found") + } + taskItem.RetryTimes++ + }else { + //split task to scroll/bulk_indexing pipeline and then persistent + var pids []string + pids = append(pids, taskItem.ParentId...) + pids = append(pids, taskItem.ID) + scrollID := util.GetUUID() + var ( + cfg map[string]interface{} + ok bool + ) + if cfg, ok = taskItem.Config.(map[string]interface{}); !ok { + return fmt.Errorf("got wrong config [%v] with task [%s]", taskItem.Config, taskItem.ID) + } + cfgm := util.MapStr(cfg) + var ( + sourceClusterID string + targetClusterID string + ) + if sourceClusterID, ok = getMapValue(cfgm, "source.cluster_id").(string); !ok { + return fmt.Errorf("got wrong source cluster id of task [%v]", *taskItem) + } + if targetClusterID, ok = getMapValue(cfgm, "target.cluster_id").(string); !ok { + return fmt.Errorf("got wrong target cluster id of task [%v]", *taskItem) + } + esConfig := elastic.GetConfig(sourceClusterID) + esTargetConfig := elastic.GetConfig(targetClusterID) + docType := common.GetClusterDocType(targetClusterID) + if len(taskItem.ParentId) == 0 { + return fmt.Errorf("got wrong parent id of task [%v]", *taskItem) + } + queryDsl := getMapValue(cfgm, "source.query_dsl") + scrollQueryDsl := util.MustToJSON(util.MapStr{ + "query": queryDsl, + }) + indexName := getMapValue(cfgm, "source.indices") + scrollTask = &task2.Task{ + ParentId: pids, + Runnable: true, + Cancellable: true, + Metadata: task2.Metadata{ + Type: "pipeline", + Labels: util.MapStr{ + "cluster_id": sourceClusterID, + "pipeline_id": "es_scroll", + "index_name": indexName, + "unique_index_name": taskItem.Metadata.Labels["unique_index_name"], + }, + }, + Config: util.MapStr{ + "name": scrollID, + "logging": util.MapStr{ + "enabled": true, + }, + "labels": util.MapStr{ + "parent_task_id": pids, + "unique_index_name": taskItem.Metadata.Labels["unique_index_name"], + "retry_no": taskItem.RetryTimes, + }, + "auto_start": true, + "keep_running": false, + "processor": []util.MapStr{ + { + "es_scroll": util.MapStr{ + "remove_type": docType == "", + "slice_size": getMapValue(cfgm, "source.slice_size"), + "batch_size": getMapValue(cfgm, "source.batch_size"), + "indices": indexName, + "elasticsearch": sourceClusterID, + "elasticsearch_config": util.MapStr{ + "name": sourceClusterID, + "enabled": true, + "endpoint": esConfig.Endpoint, + "basic_auth": esConfig.BasicAuth, + }, + "queue": util.MapStr{ + "name": scrollID, + "labels": util.MapStr{ + "migration_task_id": taskItem.ID, + }, + }, + "partition_size": 20, + "scroll_time": getMapValue(cfgm, "source.scroll_time"), + "query_dsl": scrollQueryDsl, + "index_rename": getMapValue(cfgm, "source.index_rename"), + "type_rename": getMapValue(cfgm, "source.type_rename"), + }, + }, + }, + }, + } + scrollTask.ID = scrollID + + bulkID := util.GetUUID() + bulkTask = &task2.Task{ + ParentId: pids, + Runnable: true, + Cancellable: true, + Metadata: task2.Metadata{ + Type: "pipeline", + Labels: util.MapStr{ + "cluster_id": targetClusterID, + "pipeline_id": "bulk_indexing", + "index_name": indexName, + "unique_index_name": taskItem.Metadata.Labels["unique_index_name"], + }, + }, + Config: util.MapStr{ + "name": bulkID, + "logging": util.MapStr{ + "enabled": true, + }, + "labels": util.MapStr{ + "parent_task_id": pids, + "unique_index_name": taskItem.Metadata.Labels["unique_index_name"], + "retry_no": taskItem.RetryTimes, + }, + "auto_start": true, + "keep_running": false, + "processor": []util.MapStr{ + { + "bulk_indexing": util.MapStr{ + "detect_active_queue": false, + "bulk": util.MapStr{ + "batch_size_in_mb": getMapValue(cfgm, "target.bulk.batch_size_in_mb"), + "batch_size_in_docs": getMapValue(cfgm, "target.bulk.batch_size_in_docs"), + "invalid_queue": "bulk_indexing_400", + //"retry_rules": util.MapStr{ + // "default": false, + // "retry_4xx": false, + // "retry_429": true, + //}, + }, + "max_worker_size": getMapValue(cfgm, "target.bulk.max_worker_size"), + "num_of_slices": getMapValue(cfgm, "target.bulk.slice_size"), + "idle_timeout_in_seconds": getMapValue(cfgm, "target.bulk.idle_timeout_in_seconds"), + "elasticsearch": targetClusterID, + "elasticsearch_config": util.MapStr{ + "name": targetClusterID, + "enabled": true, + "endpoint": esTargetConfig.Endpoint, + "basic_auth": esTargetConfig.BasicAuth, + }, + "queues": util.MapStr{ + "type": "scroll_docs", + "migration_task_id": taskItem.ID, + }, + }, + }, + }, + }, + } + bulkTask.ID = bulkID + } + instance, err := p.getPreferenceInstance(taskItem.ParentId[0]) + if err != nil { + return fmt.Errorf("get preference intance error: %w", err) + } + if p.state[instance.ID].Total >= p.config.MaxTasksPerInstance { + log.Infof("hit max tasks per instance with %d, skip dispatch", p.config.MaxTasksPerInstance) + return nil + } + scrollTask.Metadata.Labels["execution_instance_id"] = instance.ID + bulkTask.Metadata.Labels["execution_instance_id"] = instance.ID + + //try to clear queue when tasks are retried + if taskItem.RetryTimes > 0 { + selector := util.MapStr{ + "labels": util.MapStr{ + "migration_task_id": taskItem.ID, + }, + } + _ = instance.DeleteQueueBySelector(selector) + } + + //call instance api to create pipeline task + err = instance.CreatePipeline(util.MustToJSONBytes(scrollTask.Config)) + if err != nil { + return err + } + //err = instance.CreatePipeline(util.MustToJSONBytes(bulkTask.Config)) + //if err != nil { + // return err + //} + //save task info + if taskItem.Metadata.Labels["is_split"] != true { + err = orm.Create(nil, scrollTask) + if err != nil { + return fmt.Errorf("create scroll pipeline task error: %w", err) + } + err = orm.Create(nil, bulkTask) + if err != nil { + return fmt.Errorf("create bulk_indexing pipeline task error: %w", err) + } + }else{ + err = orm.Update(nil, scrollTask) + if err != nil { + return fmt.Errorf("update scroll pipeline task error: %w", err) + } + err = orm.Update(nil, bulkTask) + if err != nil { + return fmt.Errorf("update bulk_indexing pipeline task error: %w", err) + } + } + taskItem.Metadata.Labels["is_split"] = true + taskItem.Metadata.Labels["running_phase"] = 1 + //update dispatcher state + instanceState := p.state[instance.ID] + instanceState.Total = instanceState.Total + 1 + p.state[instance.ID] = instanceState + //update sub migration task status to ready and save task log + taskItem.Metadata.Labels["execution_instance_id"] = instance.ID + taskItem.Metadata.Labels["index_docs"] = 0 + taskItem.Metadata.Labels["scrolled_docs"] = 0 + taskItem.Status = task2.StatusRunning + taskItem.StartTimeInMillis = time.Now().UnixMilli() + + taskLog := &task2.Log{ + ID: util.GetUUID(), + TaskId: taskItem.ID, + Status: task2.StatusRunning, + Type: taskItem.Metadata.Type, + Config: taskItem.Config, + Result: &task2.LogResult{ + Success: true, + }, + Message: fmt.Sprintf("dispatch task [%s] to instance ", taskItem.ID), + Timestamp: time.Now().UTC(), + } + p.saveTaskAndWriteLog(taskItem, taskLog, "wait_for") + return nil +} + +func getMapValue(m util.MapStr, key string) interface{}{ + v, _ := m.GetValue(key) + return v +} + +func (p *DispatcherProcessor) getPreferenceInstance(majorTaskID string) (instance model.Instance, err error) { + majorTask := task2.Task{} + majorTask.ID = majorTaskID + _, err = orm.Get(&majorTask) + if err != nil { + return + } + cfg := ElasticDataConfig{} + buf, err := util.ToJSONBytes(majorTask.Config) + if err != nil { + return + } + err = util.FromJSONBytes(buf, &cfg) + if err != nil { + return + } + var ( + total = math.MaxInt + tempInst = model.Instance{} + ) + for _, node := range cfg.Settings.Execution.Nodes.Permit { + if p.state[node.ID].Total < total { + if p.config.CheckInstanceAvailable { + tempInst.ID = node.ID + _, err = orm.Get(&tempInst) + if err != nil { + log.Error(err) + continue + } + err = tempInst.TryConnectWithTimeout(time.Second) + if err != nil { + log.Debugf("instance [%s] is not available, caused by: %v", tempInst.ID, err) + continue + } + } + instance.ID = node.ID + total = p.state[node.ID].Total + } + } + if instance.ID == "" && p.config.CheckInstanceAvailable { + return instance, fmt.Errorf("no available instance") + } + if instance.ID == tempInst.ID { + return tempInst, nil + } + _, err = orm.Get(&instance) + return +} +func (p *DispatcherProcessor) getMigrationTasks(size int)([]task2.Task, error){ + majorTaskQ := util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "metadata.labels.business_id": "cluster_migration", + }, + }, + { + "terms": util.MapStr{ + "status": []string{task2.StatusReady, task2.StatusRunning, task2.StatusPendingStop}, + }, + }, + }, + }, + } + subTaskQ := util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "metadata.labels.business_id": "index_migration", + }, + }, + { + "terms": util.MapStr{ + "status": []string{task2.StatusReady, task2.StatusRunning, task2.StatusPendingStop}, + }, + }, + }, + }, + } + + queryDsl := util.MapStr{ + "size": size, + "sort": []util.MapStr{ + { + "created": util.MapStr{ + "order": "asc", + }, + }, + }, + "query": util.MapStr{ + "bool": util.MapStr{ + "should": []util.MapStr{ + majorTaskQ, subTaskQ, + }, + "minimum_should_match": 1, + }, + }, + } + return p.getTasks(queryDsl) +} + +func (p *DispatcherProcessor) saveTaskAndWriteLog(taskItem *task2.Task, logItem *task2.Log, refresh string) { + esClient := elastic.GetClient(p.config.Elasticsearch) + _, err := esClient.Index(p.config.IndexName,"", taskItem.ID, taskItem, refresh ) + if err != nil{ + log.Error(err) + } + if logItem != nil { + event.SaveLog(event.Event{ + Metadata: event.EventMetadata{ + Category: "migration", + Name: "logging", + Datatype: "event", + Labels: util.MapStr{ + "task_id": logItem.TaskId, + "parent_task_id": taskItem.ParentId, + "retry_no": taskItem.RetryTimes, + }, + }, + Fields: util.MapStr{ + "migration": util.MapStr{ + "logging": util.MapStr{ + "config": logItem.Config, + "context": logItem.Context, + "status": logItem.Status, + "message": logItem.Message, + "result": logItem.Result, + }, + }, + }, + }) + } +} + +func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) error { + if taskItem.Metadata.Labels == nil { + return fmt.Errorf("empty metadata labels, unexpected cluster migration task: %s", util.MustToJSON(taskItem)) + } + if taskItem.Metadata.Labels["is_split"] == true { + return nil + } + if taskItem.Metadata.Labels["business_id"] != "cluster_migration" { + log.Tracef("got unexpect task type of %s with task id [%s] in cluster migration processor", taskItem.Metadata.Type, taskItem.ID) + return nil + } + + buf := util.MustToJSONBytes(taskItem.Config) clusterMigrationTask := ElasticDataConfig{} - err = util.FromJSONBytes(buf, &clusterMigrationTask) + err := util.FromJSONBytes(buf, &clusterMigrationTask) if err != nil { return err } defer func() { - parameters.Put("pipeline.config", clusterMigrationTask) + taskItem.Config = clusterMigrationTask }() esSourceClient := elastic.GetClient(clusterMigrationTask.Cluster.Source.Id) esTargetClient := elastic.GetClient(clusterMigrationTask.Cluster.Target.Id) - //esClient := elastic.GetClient(p.config.Elasticsearch) - for i, index := range clusterMigrationTask.Indices { + for _, index := range clusterMigrationTask.Indices { source := util.MapStr{ "cluster_id": clusterMigrationTask.Cluster.Source.Id, "indices": index.Source.Name, @@ -256,40 +1001,37 @@ func (p *ClusterMigrationProcessor) SplitMigrationTask(taskItem *task2.Task) err "bulk": util.MapStr{ "batch_size_in_mb": clusterMigrationTask.Settings.Bulk.StoreSizeInMB, "batch_size_in_docs": clusterMigrationTask.Settings.Bulk.Docs, + "max_worker_size": clusterMigrationTask.Settings.Bulk.MaxWorkerSize, + "idle_timeout_in_seconds": clusterMigrationTask.Settings.Bulk.IdleTimeoutInSeconds, + "slice_size": clusterMigrationTask.Settings.Bulk.SliceSize, }, } - indexParameters := map[string]interface{}{ - "pipeline": util.MapStr{ - "id": "index_migration", - "config": util.MapStr{ - "source": source, - "target": target, - "execution": clusterMigrationTask.Settings.Execution, - }, - }, + indexParameters := util.MapStr{ + "source": source, + "target": target, } indexMigrationTask := task2.Task{ ParentId: []string{taskItem.ID}, Cancellable: true, Runnable: false, - Status: task2.StatusRunning, + Status: task2.StatusReady, StartTimeInMillis: time.Now().UnixMilli(), Metadata: task2.Metadata{ Type: "pipeline", Labels: util.MapStr{ - "pipeline_id": "index_migration", + "business_id": "index_migration", "source_cluster_id": clusterMigrationTask.Cluster.Source.Id, "target_cluster_id": clusterMigrationTask.Cluster.Target.Id, - "level": "index", "partition_count": 1, + "index_name": index.Source.Name, + "unique_index_name": index.Source.GetUniqueIndexName(), }, }, - Parameters: indexParameters, + Config: indexParameters, } indexMigrationTask.ID = util.GetUUID() - clusterMigrationTask.Indices[i].TaskID = indexMigrationTask.ID if index.Partition != nil { partitionQ := &elastic.PartitionQuery{ IndexName: index.Source.Name, @@ -356,122 +1098,68 @@ func (p *ClusterMigrationProcessor) SplitMigrationTask(taskItem *task2.Task) err } partitionMigrationTask := task2.Task{ - ParentId: []string{taskItem.ID, indexMigrationTask.ID}, + ParentId: []string{taskItem.ID}, Cancellable: false, Runnable: true, Status: task2.StatusReady, Metadata: task2.Metadata{ Type: "pipeline", Labels: util.MapStr{ - "pipeline_id": "index_migration", + "business_id": "index_migration", "source_cluster_id": clusterMigrationTask.Cluster.Source.Id, "target_cluster_id": clusterMigrationTask.Cluster.Target.Id, - "level": "partition", "index_name": index.Source.Name, - "execution": util.MapStr{ - "nodes": util.MapStr{ - "permit": clusterMigrationTask.Settings.Execution.Nodes.Permit, - }, - }, + "unique_index_name": index.Source.GetUniqueIndexName(), }, }, - Parameters: map[string]interface{}{ - "pipeline": util.MapStr{ - "id": "index_migration", - "config": util.MapStr{ - "source": partitionSource, - "target": target, - "execution": clusterMigrationTask.Settings.Execution, - }, - }, + Config: util.MapStr{ + "source": partitionSource, + "target": target, + "execution": clusterMigrationTask.Settings.Execution, }, } partitionMigrationTask.ID = util.GetUUID() err = orm.Create(nil, &partitionMigrationTask) - //_, err = esClient.Index(p.config.IndexName, "", partitionMigrationTask.ID, partitionMigrationTask, "") delete(target, "query_dsl") if err != nil { return fmt.Errorf("store index migration task(partition) error: %w", err) } } - indexMigrationTask.Metadata.Labels["partition_count"] = partitionID }else{ source["doc_count"] = index.Source.Docs - if targetMust != nil { - target["query_dsl"] = util.MapStr{ - "bool": util.MapStr{ - "must": targetMust, - }, - } - } - partitionMigrationTask := task2.Task{ - ParentId: []string{taskItem.ID, indexMigrationTask.ID}, - Cancellable: false, - Runnable: true, - Status: task2.StatusReady, - Metadata: task2.Metadata{ - Type: "pipeline", - Labels: util.MapStr{ - "pipeline_id": "index_migration", - "source_cluster_id": clusterMigrationTask.Cluster.Source.Id, - "target_cluster_id": clusterMigrationTask.Cluster.Target.Id, - "level": "partition", - "index_name": index.Source.Name, - "execution": util.MapStr{ - "nodes": util.MapStr{ - "permit": clusterMigrationTask.Settings.Execution.Nodes.Permit, - }, - }, - }, - }, - Parameters: indexParameters, - } - orm.Create(nil, &partitionMigrationTask) - //_, err = esClient.Index(p.config.IndexName, "", partitionMigrationTask.ID, partitionMigrationTask, "") - delete(target, "query_dsl") + err = orm.Create(nil, &indexMigrationTask) if err != nil { - return fmt.Errorf("store index migration task(partition) error: %w", err) + return fmt.Errorf("store index migration task error: %w", err) } } - err = orm.Create(nil, &indexMigrationTask) - //_, err = esClient.Index(p.config.IndexName, "", indexMigrationTask.ID, indexMigrationTask, "") - if err != nil { - return fmt.Errorf("store index migration task error: %w", err) - } } return nil } -func (p *ClusterMigrationProcessor) getClusterMigrationTasks(size int)([]task2.Task, error){ +func (p *DispatcherProcessor) getPipelineTasks(subTaskID string) ([]task2.Task, error){ queryDsl := util.MapStr{ - "size": size, - "sort": []util.MapStr{ - { - "created": util.MapStr{ - "order": "asc", - }, - }, - }, + "size": 2, "query": util.MapStr{ "bool": util.MapStr{ "must": []util.MapStr{ { "term": util.MapStr{ - "status": task2.StatusReady, - }, - }, - { - "term": util.MapStr{ - "metadata.labels.pipeline_id": p.Name(), + "parent_id": util.MapStr{ + "value": subTaskID, + }, }, }, }, }, }, } + return p.getTasks(queryDsl) +} + +func (p *DispatcherProcessor) getTasks(query interface{}) ([]task2.Task, error){ esClient := elastic.GetClient(p.config.Elasticsearch) - res, err := esClient.SearchWithRawQueryDSL(p.config.IndexName, util.MustToJSONBytes(queryDsl)) + res, err := esClient.SearchWithRawQueryDSL(p.config.IndexName, util.MustToJSONBytes(query)) if err != nil { return nil, err } @@ -494,14 +1182,255 @@ func (p *ClusterMigrationProcessor) getClusterMigrationTasks(size int)([]task2.T return migrationTasks, nil } -func (p *ClusterMigrationProcessor) writeTaskLog(taskItem *task2.Task, logItem *task2.Log) { +func (p *DispatcherProcessor) getTaskCompleteState(subTask *task2.Task) (*TaskCompleteState, error){ + ptasks, err := p.getPipelineTasks(subTask.ID) + if err != nil { + return nil, err + } + var pids []string + for _, t := range ptasks { + pids = append(pids, t.ID) + } + + if len(pids) == 0 { + return nil, fmt.Errorf("pipeline task not found") + } + query := util.MapStr{ + "sort": []util.MapStr{ + { + "timestamp": util.MapStr{ + "order": "desc", + }, + }, + { + "payload.pipeline.logging.steps": util.MapStr{ + "order": "desc", + }, + }, + }, + "collapse": util.MapStr{ + "field": "metadata.labels.task_id", + }, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "terms": util.MapStr{ + "metadata.labels.task_id": pids, + }, + }, + { + "range": util.MapStr{ + "timestamp": util.MapStr{ + "gt": subTask.StartTimeInMillis - 30*1000, + }, + }, + }, + }, + }, + + }, + } esClient := elastic.GetClient(p.config.Elasticsearch) - _, err := esClient.Index(p.config.IndexName,"", logItem.TaskId, taskItem, "" ) - if err != nil{ - log.Error(err) + res, err := esClient.SearchWithRawQueryDSL(p.config.LogIndexName, util.MustToJSONBytes(query)) + if err != nil { + return nil, err } - _, err = esClient.Index(p.config.LogIndexName,"", logItem.ID, logItem, "" ) - if err != nil{ - log.Error(err) + var ( + cfg map[string]interface{} + ok bool + ) + if cfg, ok = subTask.Config.(map[string]interface{}); !ok { + return nil, fmt.Errorf("got wrong config of task %v", *subTask) } + totalDocs, err := util.MapStr(cfg).GetValue("source.doc_count") + if err != nil { + return nil, err + } + + var ( + indexDocs float64 + successDocs float64 + scrolledDocs interface{} + state TaskCompleteState + ) + state.TotalDocs = totalDocs + state.PipelineIds = pids + for _, hit := range res.Hits.Hits { + resultErr, _ := util.MapStr(hit.Source).GetValue("payload.pipeline.logging.result.error") + if errStr, ok := resultErr.(string); ok && errStr != "" { + state.Error = errStr + state.IsComplete = true + state.ClearPipeline = true + } + for _, key := range []string{"payload.pipeline.logging.context.bulk_indexing.success.count", "payload.pipeline.logging.context.bulk_indexing.failure.count", "payload.pipeline.logging.context.bulk_indexing.invalid.count"}{ + v, err := util.MapStr(hit.Source).GetValue(key) + if err == nil { + if fv, ok := v.(float64); ok { + indexDocs += fv + if key == "payload.pipeline.logging.context.bulk_indexing.success.count" { + successDocs = fv + state.SuccessDocs = successDocs + } + } + }else{ + break + } + } + v, err := util.MapStr(hit.Source).GetValue("payload.pipeline.logging.context.es_scroll.scrolled_docs") + if err == nil { + scrolledDocs = v + if vv, ok := v.(float64); ok { + state.ScrolledDocs = vv + } + } + } + if totalDocs == scrolledDocs { + state.RunningPhase = 1 + } + if (totalDocs == indexDocs || successDocs == totalDocs) && totalDocs == scrolledDocs { + if successDocs != totalDocs { + if state.Error == "" { + if successDocs > 0 { + state.Error = "partial complete" + }else{ + state.Error = "invalid request" + } + } + } + state.IsComplete = true + return &state, nil + } + //check instance is available + if subTask.Metadata.Labels != nil { + if instID, ok := subTask.Metadata.Labels["execution_instance_id"].(string); ok { + inst := model.Instance{} + inst.ID = instID + _, err = orm.Get(&inst) + if err != nil { + return nil, err + } + err = inst.TryConnectWithTimeout(time.Second * 3) + if err != nil && errors.Is(err, syscall.ECONNREFUSED) { + state.Error = fmt.Errorf("instance [%s] is unavailable: %w", instID, err).Error() + state.IsComplete = true + } + } + } + return &state, nil +} + +func (p *DispatcherProcessor) getMajorTaskState(majorTask *task2.Task) (taskState MajorTaskState, err error){ + query := util.MapStr{ + "size": 0, + "aggs": util.MapStr{ + "total_docs": util.MapStr{ + "sum": util.MapStr{ + "field": "metadata.labels.index_docs", + }, + }, + "grp": util.MapStr{ + "terms": util.MapStr{ + "field": "status", + }, + }, + }, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "parent_id": util.MapStr{ + "value": majorTask.ID, + }, + }, + }, + { + "term": util.MapStr{ + "metadata.labels.business_id": util.MapStr{ + "value": "index_migration", + }, + }, + }, + }, + }, + + }, + } + esClient := elastic.GetClient(p.config.Elasticsearch) + res, err := esClient.SearchWithRawQueryDSL( p.config.IndexName, util.MustToJSONBytes(query)) + if err != nil { + return taskState, err + } + if v, ok := res.Aggregations["total_docs"].Value.(float64); ok { + taskState.IndexDocs = v + } + var ( + hasError bool + ) + for _, bk := range res.Aggregations["grp"].Buckets { + if bk["key"] == task2.StatusReady || bk["key"] == task2.StatusRunning { + taskState.Status = task2.StatusRunning + return taskState, nil + } + if bk["key"] == task2.StatusError { + hasError = true + } + } + if hasError { + taskState.Status = task2.StatusError + }else { + taskState.Status = task2.StatusComplete + } + return taskState, nil +} + +func (p *DispatcherProcessor) getInstanceTaskState()(map[string]DispatcherState, error){ + query := util.MapStr{ + "size": 0, + "aggs": util.MapStr{ + "grp": util.MapStr{ + "terms": util.MapStr{ + "field": "metadata.labels.execution_instance_id", + "size": 1000, + }, + }, + }, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "metadata.labels.business_id": util.MapStr{ + "value": "index_migration", + }, + }, + }, + { + "term": util.MapStr{ + "status": util.MapStr{ + "value": task2.StatusRunning, + }, + }, + }, + }, + }, + }, + } + esClient := elastic.GetClient(p.config.Elasticsearch) + res, err := esClient.SearchWithRawQueryDSL(p.config.IndexName, util.MustToJSONBytes(query)) + if err != nil { + return nil, err + } + state := map[string]DispatcherState{} + for _, bk := range res.Aggregations["grp"].Buckets { + if key, ok := bk["key"].(string); ok { + if v, ok := bk["doc_count"].(float64); ok { + state[key] = DispatcherState{ + Total: int(v), + } + } + } + } + return state, nil } \ No newline at end of file