diff --git a/config/system_config.tpl b/config/system_config.tpl index c5840724..fb9333dc 100644 --- a/config/system_config.tpl +++ b/config/system_config.tpl @@ -44,6 +44,7 @@ pipeline: fetch_max_messages: 100 queues: type: indexing_merge + tag: "metrics" when: cluster_available: ["$[[CLUSTER_ID]]"] - name: metadata_ingest @@ -78,11 +79,46 @@ pipeline: group: activity when: cluster_available: ["$[[CLUSTER_ID]]"] - - name: cluster_migration_split + - name: migration_task_dispatcher auto_start: true keep_running: true processor: - - cluster_migration: + - migration_dispatcher: elasticsearch: "$[[CLUSTER_ID]]" + check_instance_available: true + max_tasks_per_instance: 10 + task_batch_size: 50 + when: + cluster_available: ["$[[CLUSTER_ID]]"] + + - name: logging_indexing_merge + auto_start: true + keep_running: true + processor: + - indexing_merge: + input_queue: "logging" + idle_timeout_in_seconds: 1 + elasticsearch: "$[[CLUSTER_ID]]" + index_name: "$[[INDEX_PREFIX]]logs" + output_queue: + name: "pipeline-logs" + label: + tag: "request_logging" + worker_size: 1 + bulk_size_in_kb: 1 + - name: consume-logging_requests + auto_start: true + keep_running: true + processor: + - bulk_indexing: + bulk: + compress: true + batch_size_in_mb: 1 + batch_size_in_docs: 1 + consumer: + fetch_max_messages: 100 + queues: + type: indexing_merge + tag: "request_logging" when: cluster_available: ["$[[CLUSTER_ID]]"] \ No newline at end of file diff --git a/model/instance.go b/model/instance.go index 31da501f..92ac9d79 100644 --- a/model/instance.go +++ b/model/instance.go @@ -5,8 +5,14 @@ package model import ( + "context" + "fmt" "infini.sh/framework/core/agent" "infini.sh/framework/core/orm" + "infini.sh/framework/core/util" + "infini.sh/framework/modules/pipeline" + "net/http" + "time" ) @@ -22,3 +28,101 @@ type Instance struct { Tags [] string `json:"tags,omitempty"` Description string `json:"description,omitempty" config:"description" elastic_mapping:"description:{type:keyword}"` } + +func (inst *Instance) CreatePipeline(body []byte) error { + req := &util.Request{ + Method: http.MethodPost, + Body: body, + Url: inst.Endpoint + "/pipeline/tasks/", + } + return inst.doRequest(req, nil) +} + +func (inst *Instance) StopPipeline(ctx context.Context, pipelineID string) error { + req := &util.Request{ + Method: http.MethodPost, + Url: fmt.Sprintf("%s/pipeline/task/%s/_stop", inst.Endpoint, pipelineID), + Context: ctx, + } + return inst.doRequest(req, nil) +} + +func (inst *Instance) StopPipelineWithTimeout(pipelineID string, duration time.Duration) error { + ctx, cancel := context.WithTimeout(context.Background(), duration) + defer cancel() + return inst.StopPipeline(ctx, pipelineID) +} + +func (inst *Instance) StartPipeline(pipelineID string) error { + req := &util.Request{ + Method: http.MethodPost, + Url: fmt.Sprintf("%s/pipeline/task/%s/_start", inst.Endpoint, pipelineID), + } + return inst.doRequest(req, nil) +} + + +func (inst *Instance) DeletePipeline(pipelineID string) error { + req := &util.Request{ + Method: http.MethodDelete, + Url: fmt.Sprintf("%s/pipeline/task/%s", inst.Endpoint, pipelineID), + } + return inst.doRequest(req, nil) +} + +func (inst *Instance) GetPipelinesByIDs(pipelineIDs []string) (pipeline.GetPipelinesResponse, error) { + body := util.MustToJSONBytes(util.MapStr{ + "ids": pipelineIDs, + }) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + req := &util.Request{ + Method: http.MethodPost, + Url: fmt.Sprintf("%s/pipeline/tasks/_search", inst.Endpoint), + Body: body, + Context: ctx, + } + res := pipeline.GetPipelinesResponse{} + err := inst.doRequest(req, &res) + return res, err +} + +func (inst *Instance) DeleteQueueBySelector(selector util.MapStr) error { + req := &util.Request{ + Method: http.MethodDelete, + Url: fmt.Sprintf("%s/queue/_search", inst.Endpoint), + Body: util.MustToJSONBytes(util.MapStr{ + "selector": selector, + }), + } + return inst.doRequest(req, nil) +} + +func (inst *Instance) TryConnect(ctx context.Context) error { + req := &util.Request{ + Method: http.MethodGet, + Url: fmt.Sprintf("%s/_framework/api/_info", inst.Endpoint), + Context: ctx, + } + return inst.doRequest(req, nil) +} +func (inst *Instance) TryConnectWithTimeout(duration time.Duration) error { + ctx, cancel := context.WithTimeout(context.Background(), duration) + defer cancel() + return inst.TryConnect(ctx) +} + +func (inst *Instance) doRequest(req *util.Request, resBody interface{}) error { + req.SetBasicAuth(inst.BasicAuth.Username, inst.BasicAuth.Password) + result, err := util.ExecuteRequest(req) + if err != nil { + return err + } + if result.StatusCode != http.StatusOK { + return fmt.Errorf(string(result.Body)) + } + if resBody != nil { + return util.FromJSONBytes(result.Body, resBody) + } + return nil +} \ No newline at end of file diff --git a/plugin/api/gateway/instance.go b/plugin/api/gateway/instance.go index 2f9f836e..7a1f8e04 100644 --- a/plugin/api/gateway/instance.go +++ b/plugin/api/gateway/instance.go @@ -5,7 +5,6 @@ package gateway import ( - "context" "fmt" log "github.com/cihub/seelog" "github.com/segmentio/encoding/json" @@ -368,60 +367,7 @@ func (h *GatewayAPI) getExecutionNodes(w http.ResponseWriter, req *http.Request, if from < 0 { from = 0 } - agentIndexName := orm.GetIndexName(agent.Instance{}) gatewayIndexName := orm.GetIndexName(model.Instance{}) - agentMust := []util.MapStr{ - { - "term": util.MapStr{ - "enrolled": util.MapStr{ - "value": true, - }, - }, - }, - { - "term": util.MapStr{ - "status": util.MapStr{ - "value": "online", - }, - }, - }, - { - "term": util.MapStr{ - "_index": util.MapStr{ - "value": agentIndexName, - }, - }, - }, - } - - boolQ := util.MapStr{ - "minimum_should_match": 1, - "should": []util.MapStr{ - { - "bool": util.MapStr{ - "must": agentMust, - }, - }, - { - "term": util.MapStr{ - "_index": util.MapStr{ - "value": gatewayIndexName, - }, - }, - }, - }, - } - if keyword != "" { - boolQ["must"] = []util.MapStr{ - { - "prefix": util.MapStr{ - "name": util.MapStr{ - "value": keyword, - }, - }, - }, - } - } query := util.MapStr{ "size": size, @@ -433,12 +379,24 @@ func (h *GatewayAPI) getExecutionNodes(w http.ResponseWriter, req *http.Request, }, }, }, - "query": util.MapStr{ - "bool": boolQ, - }, + } + if keyword != "" { + query["query"] = util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "prefix": util.MapStr{ + "name": util.MapStr{ + "value": keyword, + }, + }, + }, + }, + }, + } } q := orm.Query{ - IndexName: fmt.Sprintf("%s,%s", gatewayIndexName, agentIndexName), + IndexName: gatewayIndexName, RawQuery: util.MustToJSONBytes(query), } err, result := orm.Search(nil, &q) @@ -457,63 +415,33 @@ func (h *GatewayAPI) getExecutionNodes(w http.ResponseWriter, req *http.Request, var nodes = []util.MapStr{} for _, hit := range searchRes.Hits.Hits { - var ( - endpoint string - ok bool - ) + buf := util.MustToJSONBytes(hit.Source) + inst := model.Instance{} + err = util.FromJSONBytes(buf, &inst) + if err != nil { + log.Error(err) + continue + } node := util.MapStr{ - "id": hit.Source["id"], - "name": hit.Source["name"], + "id": inst.ID, + "name": inst.Name, "available": false, + "type": "gateway", } - hasErr := false - if hit.Index == gatewayIndexName { - node["type"] = "gateway" - if endpoint, ok = hit.Source["endpoint"].(string); !ok { - log.Warnf("got unexpect endpoint type of gateway instance [%s]: %s", hit.ID, hit.Source["endpoint"]) - hasErr = true - } - }else if hit.Index == agentIndexName { - node["type"] = "agent" - endpoint = fmt.Sprintf("%s://%s:%v", hit.Source["schema"], hit.Source["remote_ip"], hit.Source["port"]) + ul, err := url.Parse(inst.Endpoint) + if err != nil { + log.Error(err) + continue } - ul, err := url.Parse(endpoint) + node["host"] = ul.Host + err = inst.TryConnectWithTimeout(time.Second) if err != nil { log.Error(err) }else{ - node["host"] = ul.Host + node["available"] = true } - if !hasErr { - available, err := isNodeAvailable(endpoint) //TODO remove - if err != nil { - log.Error(err) - } - node["available"] = available - } nodes = append(nodes, node) } h.WriteJSON(w, nodes, http.StatusOK) } - -func isNodeAvailable(endpoint string) (bool, error){ - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - rq := &util.Request{ - Method: http.MethodGet, - Url: fmt.Sprintf("%s%s", endpoint, "/pipeline/tasks/_dynamic"), - Context: ctx, - } - resp, err := util.ExecuteRequest(rq) - if err != nil { - return false, err - } - resBody := struct { - Success bool `json:"success"` - }{} - err = util.FromJSONBytes(resp.Body, &resBody) - if err != nil { - return false, err - } - return resBody.Success, nil -} \ No newline at end of file diff --git a/plugin/elastic/activity.go b/plugin/elastic/activity.go index 6f9cfcc4..37ff1079 100644 --- a/plugin/elastic/activity.go +++ b/plugin/elastic/activity.go @@ -263,7 +263,7 @@ func (processor *ActivityProcessor) HandleMessage(ctx *pipeline.Context, qConfig if timeout { log.Tracef("timeout on queue:[%v]", qConfig.Name) - ctx.Failed() + ctx.Failed(fmt.Errorf("timeout on queue:[%v]", qConfig.Name)) return } diff --git a/plugin/migration/api.go b/plugin/migration/api.go index ee75eac9..a73faf9e 100644 --- a/plugin/migration/api.go +++ b/plugin/migration/api.go @@ -7,16 +7,14 @@ package migration import ( "context" "fmt" - "github.com/buger/jsonparser" log "github.com/cihub/seelog" - "infini.sh/framework/core/agent" + "infini.sh/console/model" "infini.sh/framework/core/api" "infini.sh/framework/core/api/rbac" httprouter "infini.sh/framework/core/api/router" "infini.sh/framework/core/elastic" "infini.sh/framework/core/global" "infini.sh/framework/core/orm" - "infini.sh/framework/core/proxy" task2 "infini.sh/framework/core/task" "infini.sh/framework/core/util" "net/http" @@ -26,9 +24,8 @@ import ( ) -func InitAPI(bulkResultIndexName string) { +func InitAPI() { handler := APIHandler{ - bulkResultIndexName: bulkResultIndexName, } api.HandleAPIMethod(api.GET, "/migration/data/_search", handler.RequireLogin(handler.searchDataMigrationTask)) api.HandleAPIMethod(api.POST, "/migration/data", handler.RequireLogin(handler.createDataMigrationTask)) @@ -40,7 +37,7 @@ func InitAPI(bulkResultIndexName string) { api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_stop", handler.RequireLogin(handler.stopDataMigrationTask)) //api.HandleAPIMethod(api.GET, "/migration/data/:task_id", handler.getMigrationTask) api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info", handler.RequireLogin(handler.getDataMigrationTaskInfo)) - api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info/index", handler.RequireLogin(handler.getDataMigrationTaskOfIndex)) + api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info/:index/:doc_type", handler.RequireLogin(handler.getDataMigrationTaskOfIndex)) api.HandleAPIMethod(api.PUT, "/migration/data/:task_id/status", handler.RequireLogin(handler.updateDataMigrationTaskStatus)) } @@ -82,7 +79,7 @@ func (h *APIHandler) createDataMigrationTask(w http.ResponseWriter, req *http.Re Metadata: task2.Metadata{ Type: "pipeline", Labels: util.MapStr{ - "pipeline_id": "cluster_migration", + "business_id": "cluster_migration", "source_cluster_id": clusterTaskConfig.Cluster.Source.Id, "target_cluster_id": clusterTaskConfig.Cluster.Target.Id, "source_total_docs": totalDocs, @@ -91,11 +88,7 @@ func (h *APIHandler) createDataMigrationTask(w http.ResponseWriter, req *http.Re Cancellable: true, Runnable: false, Status: task2.StatusInit, - Parameters: map[string]interface{}{ - "pipeline": util.MapStr{ - "config": clusterTaskConfig, - }, - }, + Config: clusterTaskConfig, } t.ID = util.GetUUID() err = orm.Create(nil, &t) @@ -116,7 +109,7 @@ func (h *APIHandler) searchDataMigrationTask(w http.ResponseWriter, req *http.Re ) mustQ = append(mustQ, util.MapStr{ "term": util.MapStr{ - "metadata.labels.pipeline_id": util.MapStr{ + "metadata.labels.business_id": util.MapStr{ "value": "cluster_migration", }, }, @@ -173,43 +166,23 @@ func (h *APIHandler) searchDataMigrationTask(w http.ResponseWriter, req *http.Re h.WriteError(w, err.Error(), http.StatusInternalServerError) return } -mainLoop: for _, hit := range searchRes.Hits.Hits { sourceM := util.MapStr(hit.Source) - config, err := sourceM.GetValue("parameters.pipeline.config") - if err != nil { - log.Error(err) - continue - } - buf := util.MustToJSONBytes(config) + buf := util.MustToJSONBytes(sourceM["config"]) dataConfig := ElasticDataConfig{} err = util.FromJSONBytes(buf, &dataConfig) if err != nil { log.Error(err) continue } - var targetTotalDocs int64 + //var targetTotalDocs int64 if hit.Source["status"] == task2.StatusRunning { - esClient := elastic.GetClientNoPanic(dataConfig.Cluster.Target.Id) - if esClient == nil { - log.Warnf("cluster [%s] was not found", dataConfig.Cluster.Target.Id) + ts, err := getMajorTaskStatsFromInstances(hit.ID) + if err != nil { + log.Warnf("fetch progress info of task error: %v", err) continue } - for _, index := range dataConfig.Indices { - count, err := getIndexTaskDocCount(&index, esClient) - if err != nil { - log.Error(err) - continue mainLoop - } - targetTotalDocs += count - } - sourceM.Put("metadata.labels.target_total_docs", targetTotalDocs) - sourceTotalDocs, _ := sourceM.GetValue("metadata.labels.source_total_docs") - if sv, ok := sourceTotalDocs.(float64); ok{ - if int64(sv) == targetTotalDocs { - hit.Source["status"] = task2.StatusComplete - } - } + sourceM.Put("metadata.labels.target_total_docs", ts.IndexDocs) } } @@ -251,87 +224,7 @@ func (h *APIHandler) startDataMigration(w http.ResponseWriter, req *http.Request h.WriteError(w, fmt.Sprintf("task [%s] not found", taskID), http.StatusInternalServerError) return } - if obj.Status == "init" { - //root task - obj.Status = task2.StatusReady - }else if obj.Status == task2.StatusStopped { - if obj.Metadata.Labels != nil && obj.Metadata.Labels["level"] == "partition" { - obj.Status = task2.StatusReady - //update parent task status - if len(obj.ParentId) == 0 { - h.WriteError(w, fmt.Sprintf("empty parent id of task [%s]", taskID), http.StatusInternalServerError) - return - } - query := util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "terms": util.MapStr{ - "id": obj.ParentId, - }, - }, - }, - }, - } - queryDsl := util.MapStr{ - "query": query, - "script": util.MapStr{ - "source": fmt.Sprintf("ctx._source['status'] = '%s'", task2.StatusRunning), - }, - } - - err = orm.UpdateBy(obj, util.MustToJSONBytes(queryDsl)) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - }else{ - obj.Status = task2.StatusRunning - //update sub task status - query := util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "parent_id": util.MapStr{ - "value": taskID, - }, - }, - }, - { - "term": util.MapStr{ - "metadata.labels.pipeline_id": util.MapStr{ - "value": "index_migration", - }, - }, - }, - { - "terms": util.MapStr{ - "status": []string{task2.StatusError, task2.StatusStopped}, - }, - }, - }, - }, - } - queryDsl := util.MapStr{ - "query": query, - "script": util.MapStr{ - "source": fmt.Sprintf("ctx._source['status'] = '%s'", task2.StatusReady), - }, - } - - err = orm.UpdateBy(task2.Task{}, util.MustToJSONBytes(queryDsl)) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - } - - }else if obj.Status == task2.StatusError { - obj.Status = task2.StatusReady - } + obj.Status = task2.StatusReady err = orm.Update(nil, &obj) if err != nil { @@ -340,52 +233,41 @@ func (h *APIHandler) startDataMigration(w http.ResponseWriter, req *http.Request return } + if obj.Metadata.Labels != nil && obj.Metadata.Labels["business_id"] == "index_migration" && len(obj.ParentId) > 0 { + //update status of major task to running + query := util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "id": util.MapStr{ + "value": obj.ParentId[0], + }, + }, + }, + }, + }, + } + queryDsl := util.MapStr{ + "query": query, + "script": util.MapStr{ + "source": fmt.Sprintf("ctx._source['status'] = '%s'", task2.StatusRunning), + }, + } + + err = orm.UpdateBy(obj, util.MustToJSONBytes(queryDsl)) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + } + h.WriteJSON(w, util.MapStr{ "success": true, }, 200) } -func getNodeEndpoint(nodeID string) (string, error){ - indexName := ".infini_agent,.infini_instance" - query := util.MapStr{ - "size": 1, - "query": util.MapStr{ - "term": util.MapStr{ - "id": util.MapStr{ - "value": nodeID, - }, - }, - }, - } - q := orm.Query{ - IndexName: indexName, - RawQuery: util.MustToJSONBytes(query), - } - err, result := orm.Search(nil, &q) - if err != nil { - return "", err - } - if len(result.Result) == 0 { - return "", fmt.Errorf("node [%s] not found", nodeID) - } - if info, ok := result.Result[0].(map[string]interface{}); ok { - if v, ok := info["endpoint"]; ok { - if endpoint, ok := v.(string); ok { - return endpoint, nil - } - return "", fmt.Errorf("got invalid endpoint value: %v", v) - } - ag := agent.Instance{} - buf := util.MustToJSONBytes(info) - err = util.FromJSONBytes(buf, &ag) - if err != nil { - return "", err - } - return ag.GetEndpoint(), nil - } - return "", fmt.Errorf("got unexpect node info: %s", util.MustToJSON(result.Result[0])) -} - func (h *APIHandler) stopDataMigrationTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { id := ps.MustGetParameter("task_id") obj := task2.Task{} @@ -399,78 +281,58 @@ func (h *APIHandler) stopDataMigrationTask(w http.ResponseWriter, req *http.Requ }, http.StatusNotFound) return } - execution, _ := util.MapStr(obj.Parameters).GetValue("pipeline.config.settings.execution") - if execution == nil { - execution, err = util.MapStr(obj.Parameters).GetValue("pipeline.config.execution") - if err != nil { - errStr := fmt.Sprintf("get execution config in task %s error: %s", id, err.Error()) - h.WriteError(w, errStr, http.StatusInternalServerError) - log.Error(errStr) - return - } - } - buf := util.MustToJSONBytes(execution) - executionConfig := ExecutionConfig{} - err = util.FromJSONBytes(buf, &executionConfig) + //query all pipeline task(scroll/bulk_indexing) and then stop it + err = stopPipelineTasks(id) if err != nil { - h.WriteError(w, err.Error(), http.StatusInternalServerError) log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) return } - - if len(executionConfig.Nodes.Permit) == 0 { - h.WriteError(w, "node of running task can not found", http.StatusInternalServerError) - return - } - - query := util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "parent_id": util.MapStr{ - "value": id, - }, - }, - }, - { - "term": util.MapStr{ - "metadata.labels.pipeline_id": util.MapStr{ - "value": "index_migration", - }, - }, - }, - { - "terms": util.MapStr{ - "status": []string{task2.StatusRunning, task2.StatusInit}, - }, - }, - }, - }, - } - //todo reset stat_time? - queryDsl := util.MapStr{ - "query": util.MapStr{ + if obj.Metadata.Labels != nil && obj.Metadata.Labels["business_id"] == "cluster_migration" { + //update status of subtask to pending stop + query := util.MapStr{ "bool": util.MapStr{ - "minimum_should_match": 1, - "should": []util.MapStr{ + "must": []util.MapStr{ { "term": util.MapStr{ - "id": util.MapStr{ - "value": id, + "parent_id": util.MapStr{ + "value": obj.ID, + }, + }, + }, + { + "term": util.MapStr{ + "status": util.MapStr{ + "value": task2.StatusRunning, + }, + }, + }, + { + "term": util.MapStr{ + "metadata.labels.business_id": util.MapStr{ + "value": "index_migration", }, }, }, - query, }, }, - }, - "script": util.MapStr{ - "source": "ctx._source['status'] = 'stopped'", - }, - } + } + queryDsl := util.MapStr{ + "query": query, + "script": util.MapStr{ + "source": fmt.Sprintf("ctx._source['status'] = '%s'", task2.StatusPendingStop), + }, + } - err = orm.UpdateBy(task2.Task{}, util.MustToJSONBytes(queryDsl)) + err = orm.UpdateBy(obj, util.MustToJSONBytes(queryDsl)) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + } + obj.Status = task2.StatusPendingStop + err = orm.Update(nil, &obj) if err != nil { log.Error(err) h.WriteError(w, err.Error(), http.StatusInternalServerError) @@ -483,11 +345,7 @@ func (h *APIHandler) stopDataMigrationTask(w http.ResponseWriter, req *http.Requ } func getTaskConfig(task *task2.Task, config interface{}) error{ - configSec, err := util.MapStr(task.Parameters).GetValue("pipeline.config") - if err != nil { - return err - } - configBytes, err := util.ToJSONBytes(configSec) + configBytes, err := util.ToJSONBytes(task.Config) if err != nil { return err } @@ -593,93 +451,36 @@ func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.R return } - taskErrors, err := getErrorPartitionTasks(id) + indexState, err := getMajorTaskInfoByIndex(id) if err != nil { log.Error(err) h.WriteError(w, err.Error(), http.StatusInternalServerError) return } - //get status of sub task - //todo size config? - query := util.MapStr{ - "size": 1000, - "query": util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "parent_id": util.MapStr{ - "value": id, - }, - }, - },{ - "term": util.MapStr{ - "metadata.labels.pipeline_id": util.MapStr{ - "value": "index_migration", - }, - }, - }, - { - "term": util.MapStr{ - "metadata.labels.level": util.MapStr{ - "value": "index", - }, - }, - }, - }, - }, - }, - } - q := orm.Query{ - RawQuery: util.MustToJSONBytes(query), - } - err, result := orm.Search(task2.Task{}, &q) + realtimeIndexState, err := getMajorTaskByIndexFromES(id) if err != nil { log.Error(err) h.WriteError(w, err.Error(), http.StatusInternalServerError) return } - statusM := util.MapStr{} - for _, row := range result.Result { - if rowM, ok := row.(map[string]interface{}); ok { - if v, ok := rowM["id"].(string); ok { - statusM[v] = rowM["status"] - } - } - } var completedIndices int - targetESClient := elastic.GetClientNoPanic(taskConfig.Cluster.Target.Id) for i, index := range taskConfig.Indices { - if st, ok := statusM[index.TaskID]; ok { - taskConfig.Indices[i].Status = st.(string) - } - var count = index.Target.Docs - if taskConfig.Indices[i].Status != task2.StatusComplete || count == 0 { - if targetESClient == nil { - log.Warnf("cluster [%s] was not found", taskConfig.Cluster.Target.Id) - break - } - count, err = getIndexTaskDocCount(&index, targetESClient) - if err != nil { - log.Error(err) - continue - } - taskConfig.Indices[i].Target.Docs = count - } - percent := float64(count * 100) / float64(index.Source.Docs) + indexName := index.Source.GetUniqueIndexName() + count := indexState[indexName].IndexDocs + realtimeIndexState[indexName].IndexDocs + percent := count * 100 / float64(index.Source.Docs) if percent > 100 { percent = 100 } + taskConfig.Indices[i].Target.Docs = int64(count) taskConfig.Indices[i].Percent = util.ToFixed(percent, 2) - taskConfig.Indices[i].ErrorPartitions = taskErrors[index.TaskID] - if count == index.Source.Docs { + taskConfig.Indices[i].ErrorPartitions = indexState[indexName].ErrorPartitions + if int64(count) == index.Source.Docs { completedIndices ++ - taskConfig.Indices[i].Status = task2.StatusComplete } } cfg := global.MustLookup("cluster_migration_config") - if migrationConfig, ok := cfg.(*ClusterMigrationConfig); ok { + if migrationConfig, ok := cfg.(*DispatcherConfig); ok { if obj.Metadata.Labels == nil { obj.Metadata.Labels = util.MapStr{} } @@ -688,19 +489,32 @@ func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.R "index_name": migrationConfig.LogIndexName, } } - util.MapStr(obj.Parameters).Put("pipeline.config", taskConfig) + obj.Config = taskConfig obj.Metadata.Labels["completed_indices"] = completedIndices h.WriteJSON(w, obj, http.StatusOK) } -func getErrorPartitionTasks(taskID string) (map[string]int, error){ +func getMajorTaskInfoByIndex(taskID string) (map[string]IndexStateInfo, error){ query := util.MapStr{ "size": 0, "aggs": util.MapStr{ "group_by_task": util.MapStr{ "terms": util.MapStr{ - "field": "parent_id", + "field": "metadata.labels.unique_index_name", "size": 100, }, + "aggs": util.MapStr{ + "group_by_status": util.MapStr{ + "terms": util.MapStr{ + "field": "status", + "size": 100, + }, + }, + "total_docs": util.MapStr{ + "sum": util.MapStr{ + "field": "metadata.labels.index_docs", + }, + }, + }, }, }, "query": util.MapStr{ @@ -708,25 +522,11 @@ func getErrorPartitionTasks(taskID string) (map[string]int, error){ "must": []util.MapStr{ { "term": util.MapStr{ - "metadata.labels.pipeline_id": util.MapStr{ + "metadata.labels.business_id": util.MapStr{ "value": "index_migration", }, }, }, - { - "term": util.MapStr{ - "runnable": util.MapStr{ - "value": true, - }, - }, - }, - { - "term": util.MapStr{ - "status": util.MapStr{ - "value": task2.StatusError, - }, - }, - }, { "term": util.MapStr{ "parent_id": util.MapStr{ @@ -751,15 +551,36 @@ func getErrorPartitionTasks(taskID string) (map[string]int, error){ if err != nil { return nil, err } - resBody := map[string]int{} + resBody := map[string]IndexStateInfo{} if taskAgg, ok := searchRes.Aggregations["group_by_task"]; ok { for _, bk := range taskAgg.Buckets { if key, ok := bk["key"].(string); ok { - if key == taskID { - continue + //resBody[key] = int(bk["doc_count"].(float64)) + resBody[key] = IndexStateInfo{} + if statusAgg, ok := bk["group_by_status"].(map[string]interface{}); ok { + if sbks, ok := statusAgg["buckets"].([]interface{}); ok { + for _, sbk := range sbks { + if sbkM, ok := sbk.(map[string]interface{}); ok { + if sbkM["key"] == task2.StatusError { + if v, ok := sbkM["doc_count"].(float64); ok { + st := resBody[key] + st.ErrorPartitions = int(v) + resBody[key] = st + } + } + } + } + } + } + if indexDocsAgg, ok := bk["total_docs"].(map[string]interface{}); ok { + if v, ok := indexDocsAgg["value"].(float64); ok { + st := resBody[key] + st.IndexDocs = v + resBody[key] = st + } + } - resBody[key] = int(bk["doc_count"].(float64)) } } } @@ -809,69 +630,23 @@ func getIndexTaskDocCount(index *IndexConfig, targetESClient elastic.API) (int64 return countRes.Count, nil } -func getExecutionConfig(parameters map[string]interface{}, key string)(*ExecutionConfig, error){ - execution, err := util.MapStr(parameters).GetValue(key) - if err != nil { - return nil, err - } - buf := util.MustToJSONBytes(execution) - executionConfig := ExecutionConfig{} - err = util.FromJSONBytes(buf, &executionConfig) - return &executionConfig, err -} - -func getTaskStats(nodeID string) (map[string]interface{}, error){ - endpoint, err := getNodeEndpoint(nodeID) - if err != nil { - return nil, err - } - res, err := proxy.DoProxyRequest(&proxy.Request{ - Method: http.MethodGet, - Endpoint: endpoint, - Path: "/stats", - }) - - if err != nil { - return nil, fmt.Errorf("call stats api error: %w", err) - } - resBody := struct { - Stats map[string]interface{} `json:"stats"` - }{} - err = util.FromJSONBytes(res.Body, &resBody) - return resBody.Stats, err -} - func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ id := ps.MustGetParameter("task_id") - indexTask := task2.Task{} - indexTask.ID = id - exists, err := orm.Get(&indexTask) + indexName := ps.MustGetParameter("index") + docType := ps.MustGetParameter("doc_type") + majorTask := task2.Task{} + majorTask.ID = id + exists, err := orm.Get(&majorTask) if !exists || err != nil { h.WriteError(w, fmt.Sprintf("task [%s] not found", id), http.StatusInternalServerError) return } - var durationInMS int64 - if indexTask.StartTimeInMillis > 0 { - durationInMS = time.Now().UnixMilli() - indexTask.StartTimeInMillis - if indexTask.CompletedTime != nil && indexTask.Status == task2.StatusComplete { - durationInMS = indexTask.CompletedTime.UnixMilli() - indexTask.StartTimeInMillis - } - } var completedTime int64 - if indexTask.CompletedTime != nil { - completedTime = indexTask.CompletedTime.UnixMilli() - } taskInfo := util.MapStr{ "task_id": id, - "start_time": indexTask.StartTimeInMillis, - "status": indexTask.Status, - "completed_time": completedTime, - "duration": durationInMS, - } - if len(indexTask.Metadata.Labels) > 0 { - taskInfo["data_partition"] = indexTask.Metadata.Labels["partition_count"] + "start_time": majorTask.StartTimeInMillis, } partitionTaskQuery := util.MapStr{ "size": 500, @@ -891,10 +666,11 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt "value": id, }, }, - },{ + }, + { "term": util.MapStr{ - "metadata.labels.pipeline_id": util.MapStr{ - "value": "index_migration", + "metadata.labels.unique_index_name": util.MapStr{ + "value": fmt.Sprintf("%s:%s", indexName, docType), }, }, }, @@ -911,55 +687,103 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt h.WriteError(w, err.Error(), http.StatusInternalServerError) return } - executionConfig, err := getExecutionConfig(indexTask.Parameters, "pipeline.config.execution") - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - if len(executionConfig.Nodes.Permit) == 0 { - h.WriteError(w, "node of running task can not found", http.StatusInternalServerError) - return - } - stats, err := getTaskStats(executionConfig.Nodes.Permit[0].ID) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - var ptasks = make([]task2.Task, 0, len(result.Result)) - var ptaskIds = make([]string, 0, len(result.Result)) + + var subTasks []task2.Task + var pipelineTaskIDs = map[string][]string{} + pipelineSubParentIDs := map[string]string{} + subTaskStatus := map[string]string{} for _, row := range result.Result { buf := util.MustToJSONBytes(row) - ptask := task2.Task{} - err = util.FromJSONBytes(buf, &ptask) + subTask := task2.Task{} + err = util.FromJSONBytes(buf, &subTask) if err != nil { log.Error(err) continue } - ptasks = append(ptasks, ptask) - ptaskIds = append(ptaskIds, ptask.ID) + if subTask.Metadata.Labels != nil { + if subTask.Metadata.Labels["business_id"] == "index_migration" { + subTasks = append(subTasks, subTask) + subTaskStatus[subTask.ID] = subTask.Status + continue + } + if subTask.Metadata.Labels["pipeline_id"] == "es_scroll" || subTask.Metadata.Labels["pipeline_id"] == "bulk_indexing" { + if instID, ok := subTask.Metadata.Labels["execution_instance_id"].(string); ok { + pipelineTaskIDs[instID] = append(pipelineTaskIDs[instID], subTask.ID) + if pl := len(subTask.ParentId); pl > 0 { + if subTaskStatus[subTask.ParentId[pl-1]] == task2.StatusRunning { + pipelineSubParentIDs[subTask.ID] = subTask.ParentId[pl-1] + } + } + } + } + } } - indexingStats, err := getIndexingStats(ptaskIds, h.bulkResultIndexName) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return + taskInfo["data_partition"] = len(subTasks) + var taskStats = map[string]struct{ + ScrolledDocs float64 + IndexDocs float64 + }{} + for instID, taskIDs := range pipelineTaskIDs { + inst := &model.Instance{} + inst.ID = instID + _, err = orm.Get(inst) + if err != nil { + log.Error(err) + continue + } + pipelines, err := inst.GetPipelinesByIDs(taskIDs) + if err != nil { + log.Error(err) + continue + } + + for pipelineID, status := range pipelines { + if pid, ok := pipelineSubParentIDs[pipelineID]; ok { + if v, err := status.Context.GetValue("es_scroll.scrolled_docs"); err == nil { + if vv, ok := v.(float64); ok { + stat := taskStats[pid] + stat.ScrolledDocs = vv + taskStats[pid] = stat + } + } + if v, err := status.Context.GetValue("bulk_indexing.success.count"); err == nil { + if vv, ok := v.(float64); ok { + stat := taskStats[pid] + stat.IndexDocs = vv + taskStats[pid] = stat + } + } + } + } } var ( partitionTaskInfos []util.MapStr completedPartitions int + startTime int64 ) - for i, ptask := range ptasks { - start, _ := util.MapStr(ptask.Parameters).GetValue("pipeline.config.source.start") - end, _ := util.MapStr(ptask.Parameters).GetValue("pipeline.config.source.end") + if len(subTasks) > 0 { + startTime = subTasks[0].StartTimeInMillis + } + for i, ptask := range subTasks { + var ( + cfg map[string]interface{} + ok bool + ) + if cfg, ok = ptask.Config.(map[string]interface{}); !ok { + continue + } + start, _ := util.MapStr(cfg).GetValue("source.start") + end, _ := util.MapStr(cfg).GetValue("source.end") if i == 0 { - step, _ := util.MapStr(ptask.Parameters).GetValue("pipeline.config.source.step") + step, _ := util.MapStr(cfg).GetValue("source.step") taskInfo["step"] = step } - durationInMS = 0 + var durationInMS int64 = 0 if ptask.StartTimeInMillis > 0 { + if ptask.StartTimeInMillis < startTime { + startTime = ptask.StartTimeInMillis + } durationInMS = time.Now().UnixMilli() - ptask.StartTimeInMillis if ptask.CompletedTime != nil && (ptask.Status == task2.StatusComplete || ptask.Status == task2.StatusError) { durationInMS = ptask.CompletedTime.UnixMilli() - ptask.StartTimeInMillis @@ -969,23 +793,31 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt scrollDocs float64 indexDocs float64 ) - var stKey = fmt.Sprintf("scrolling_processing.%s", ptask.ID) - if pt, ok := stats[stKey]; ok { - if ptv, ok := pt.(map[string]interface{}); ok { - if v, ok := ptv["docs"].(float64); ok { - scrollDocs = v + if stat, ok := taskStats[ptask.ID]; ok { + scrollDocs = stat.ScrolledDocs + indexDocs = stat.IndexDocs + }else{ + if ptask.Status == task2.StatusComplete || ptask.Status == task2.StatusError { + if ptask.Metadata.Labels != nil { + if v, ok := ptask.Metadata.Labels["scrolled_docs"].(float64); ok{ + scrollDocs = v + } + if v, ok := ptask.Metadata.Labels["index_docs"].(float64); ok{ + indexDocs = v + } } } } - if v, ok := indexingStats[ptask.ID]; ok { - indexDocs = v - } + var subCompletedTime int64 if ptask.CompletedTime != nil { subCompletedTime = ptask.CompletedTime.UnixMilli() + if subCompletedTime > completedTime { + completedTime = subCompletedTime + } } - partitionTotalDocs, _ := util.MapStr(ptask.Parameters).GetValue("pipeline.config.source.doc_count") + partitionTotalDocs, _ := util.MapStr(cfg).GetValue("source.doc_count") partitionTaskInfos = append(partitionTaskInfos, util.MapStr{ "task_id": ptask.ID, "status": ptask.Status, @@ -998,10 +830,17 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt "index_docs": indexDocs, "total_docs": partitionTotalDocs, }) - if ptask.Status == task2.StatusComplete { + if ptask.Status == task2.StatusComplete || ptask.Status == task2.StatusError { completedPartitions++ } } + if len(subTasks) == completedPartitions { + taskInfo["completed_time"] = completedTime + taskInfo["duration"] = completedTime - startTime + }else{ + taskInfo["duration"] = time.Now().UnixMilli() - startTime + } + taskInfo["start_time"] = startTime taskInfo["partitions"] = partitionTaskInfos taskInfo["completed_partitions"] = completedPartitions h.WriteJSON(w, taskInfo, http.StatusOK) @@ -1179,49 +1018,244 @@ func (h *APIHandler) validateMultiType(w http.ResponseWriter, req *http.Request, } , http.StatusOK) } -func getIndexingStats(taskIDs []string, indexName string) (map[string]float64, error){ - if len(taskIDs) == 0 { - return nil, fmt.Errorf("taskIDs should not be empty") - } - q := util.MapStr{ - "size": 0, +func stopPipelineTasks(parentID string) error { + queryDsl := util.MapStr{ + "size": 1000, "query": util.MapStr{ - "terms": util.MapStr{ - "labels.queue": taskIDs, - }, - }, - "aggs": util.MapStr{ - "gp_task": util.MapStr{ - "terms": util.MapStr{ - "field": "labels.queue", - "size": len(taskIDs), - }, - "aggs": util.MapStr{ - "success_count": util.MapStr{ - "sum": util.MapStr{ - "field": "bulk_results.summary.success.count", + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "parent_id": util.MapStr{ + "value": parentID, + }, + }, + }, + { + "terms": util.MapStr{ + "metadata.labels.pipeline_id": []string{"es_scroll","bulk_indexing"}, }, }, }, }, }, } - query := orm.Query{ - RawQuery: util.MustToJSONBytes(q), - IndexName: indexName, + q := orm.Query{ + RawQuery: util.MustToJSONBytes(queryDsl), } - err, result := orm.Search(nil, &query) + err, result := orm.Search(task2.Task{}, &q) if err != nil { - return nil, fmt.Errorf("query indexing stats error: %w", err) + return err } - statsM := map[string]float64{} - jsonparser.ArrayEach(result.Raw, func(value []byte, dataType jsonparser.ValueType, offset int, err error) { - key, _ := jsonparser.GetString(value, "key") - successCount, err := jsonparser.GetFloat(value, "success_count", "value") + + for _, hit := range result.Result { + buf, err := util.ToJSONBytes(hit) + if err != nil { + return err + } + tk := task2.Task{} + err = util.FromJSONBytes(buf, &tk) + if err != nil { + return err + } + if tk.Metadata.Labels != nil { + if instID, ok := tk.Metadata.Labels["execution_instance_id"].(string); ok { + inst := model.Instance{} + inst.ID = instID + _, err = orm.Get(&inst) + if err != nil { + return err + } + err = inst.StopPipeline(context.Background(), tk.ID) + if err != nil { + log.Error(err) + continue + } + } + } + } + return nil +} + +func getMajorTaskStatsFromInstances(majorTaskID string) (taskStats MajorTaskState, err error) { + taskQuery := util.MapStr{ + "size": 500, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "parent_id": util.MapStr{ + "value": majorTaskID, + }, + }, + }, + { + "bool": util.MapStr{ + "minimum_should_match": 1, + "should": []util.MapStr{ + { + "term": util.MapStr{ + "metadata.labels.pipeline_id": util.MapStr{ + "value": "bulk_indexing", + }, + }, + }, + { + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "metadata.labels.business_id": util.MapStr{ + "value": "index_migration", + }, + }, + }, + { + "terms": util.MapStr{ + "status": []string{task2.StatusComplete, task2.StatusError}, + }, + }, + }, + }, + }, + }, + }, + }, + + }, + }, + }, + } + q := &orm.Query{ + RawQuery: util.MustToJSONBytes(taskQuery), + } + err, result := orm.Search(task2.Task{}, q) + if err != nil { + return taskStats, err + } + var pipelineTaskIDs = map[string][]string{} + for _, row := range result.Result { + buf := util.MustToJSONBytes(row) + subTask := task2.Task{} + err = util.FromJSONBytes(buf, &subTask) if err != nil { log.Error(err) + continue } - statsM[key] = successCount - }, "aggregations", "gp_task", "buckets") - return statsM, nil + if subTask.Metadata.Labels != nil { + //add indexDocs of already complete + if subTask.Metadata.Labels["business_id"] == "index_migration" { + if v, ok := subTask.Metadata.Labels["index_docs"].(float64); ok { + taskStats.IndexDocs += v + } + continue + } + if instID, ok := subTask.Metadata.Labels["execution_instance_id"].(string); ok { + pipelineTaskIDs[instID] = append(pipelineTaskIDs[instID], subTask.ID) + } + } + } + for instID, taskIDs := range pipelineTaskIDs { + inst := &model.Instance{} + inst.ID = instID + _, err = orm.Get(inst) + if err != nil { + log.Error(err) + continue + } + pipelines, err := inst.GetPipelinesByIDs(taskIDs) + if err != nil { + log.Error(err) + continue + } + + for _, status := range pipelines { + if v, err := status.Context.GetValue("bulk_indexing.success.count"); err == nil { + if vv, ok := v.(float64); ok { + taskStats.IndexDocs += vv + } + } + } + } + return taskStats, nil +} + +func getMajorTaskByIndexFromES(majorTaskID string)(map[string]IndexStateInfo, error){ + taskQuery := util.MapStr{ + "size": 500, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "parent_id": util.MapStr{ + "value": majorTaskID, + }, + }, + }, + { + "term": util.MapStr{ + "metadata.labels.pipeline_id": util.MapStr{ + "value": "bulk_indexing", + }, + }, + }, + }, + }, + }, + } + q := &orm.Query{ + RawQuery: util.MustToJSONBytes(taskQuery), + } + err, result := orm.Search(task2.Task{}, q) + if err != nil { + return nil, err + } + var pipelineTaskIDs = map[string][]string{} + var pipelineIndexNames = map[string]string{} + for _, row := range result.Result { + buf := util.MustToJSONBytes(row) + subTask := task2.Task{} + err = util.FromJSONBytes(buf, &subTask) + if err != nil { + log.Error(err) + continue + } + if subTask.Metadata.Labels != nil { + if instID, ok := subTask.Metadata.Labels["execution_instance_id"].(string); ok { + pipelineTaskIDs[instID] = append(pipelineTaskIDs[instID], subTask.ID) + } + if indexName, ok := subTask.Metadata.Labels["unique_index_name"].(string); ok { + pipelineIndexNames[subTask.ID] = indexName + } + } + } + state := map[string]IndexStateInfo{} + for instID, taskIDs := range pipelineTaskIDs { + inst := &model.Instance{} + inst.ID = instID + _, err = orm.Get(inst) + if err != nil { + log.Error(err) + continue + } + pipelines, err := inst.GetPipelinesByIDs(taskIDs) + if err != nil { + log.Error(err) + continue + } + + for pipelineID, status := range pipelines { + indexName := pipelineIndexNames[pipelineID] + if v, err := status.Context.GetValue("bulk_indexing.success.count"); err == nil { + if vv, ok := v.(float64); ok && indexName != ""{ + st := state[indexName] + st.IndexDocs += vv + state[indexName] = st + } + } + } + } + return state, nil } \ No newline at end of file diff --git a/plugin/migration/model.go b/plugin/migration/model.go index 74a4139f..9baf15a4 100644 --- a/plugin/migration/model.go +++ b/plugin/migration/model.go @@ -4,6 +4,8 @@ package migration +import "fmt" + type ElasticDataConfig struct { Cluster struct { Source ClusterInfo `json:"source"` @@ -21,6 +23,9 @@ type ElasticDataConfig struct { Bulk struct { Docs int `json:"docs"` StoreSizeInMB int `json:"store_size_in_mb"` + MaxWorkerSize int `json:"max_worker_size"` + IdleTimeoutInSeconds int `json:"idle_timeout_in_seconds"` + SliceSize int `json:"slice_size"` } `json:"bulk"` Execution ExecutionConfig `json:"execution"` } `json:"settings"` @@ -54,8 +59,8 @@ type IndexConfig struct { IndexRename map[string]interface{} `json:"index_rename"` TypeRename map[string]interface{} `json:"type_rename"` Partition *IndexPartition `json:"partition,omitempty"` - TaskID string `json:"task_id,omitempty"` - Status string `json:"status,omitempty"` + //TaskID string `json:"task_id,omitempty"` + //Status string `json:"status,omitempty"` Percent float64 `json:"percent,omitempty"` ErrorPartitions int `json:"error_partitions,omitempty"` } @@ -72,7 +77,33 @@ type IndexInfo struct { StoreSizeInBytes int `json:"store_size_in_bytes"` } +func (ii *IndexInfo) GetUniqueIndexName() string{ + return fmt.Sprintf("%s:%s", ii.Name, ii.DocType) +} + type ClusterInfo struct { Id string `json:"id"` Name string `json:"name"` +} + +type TaskCompleteState struct { + IsComplete bool + Error string + ClearPipeline bool + PipelineIds []string + SuccessDocs float64 + ScrolledDocs float64 + RunningPhase int + TotalDocs interface{} +} + +type MajorTaskState struct{ + ScrolledDocs float64 + IndexDocs float64 + Status string +} + +type IndexStateInfo struct { + ErrorPartitions int + IndexDocs float64 } \ No newline at end of file diff --git a/plugin/migration/module.go b/plugin/migration/module.go index 67271880..de2d4b73 100644 --- a/plugin/migration/module.go +++ b/plugin/migration/module.go @@ -15,12 +15,11 @@ func (module *Module) Name() string { } func (module *Module) Setup() { - module.BulkResultIndexName = ".infini_async_bulk_results" exists, err := env.ParseConfig("migration", module) if exists && err != nil { log.Error(err) } - InitAPI(module.BulkResultIndexName) + InitAPI() } func (module *Module) Start() error { return nil @@ -31,7 +30,6 @@ func (module *Module) Stop() error { } type Module struct { - BulkResultIndexName string `config:"bulk_result_index_name"` } func init() { diff --git a/plugin/migration/pipeline.go b/plugin/migration/pipeline.go index ff74db7f..42593863 100644 --- a/plugin/migration/pipeline.go +++ b/plugin/migration/pipeline.go @@ -5,45 +5,55 @@ package migration import ( + "errors" "fmt" log "github.com/cihub/seelog" + "infini.sh/console/model" "infini.sh/framework/core/config" "infini.sh/framework/core/elastic" "infini.sh/framework/core/env" + "infini.sh/framework/core/event" "infini.sh/framework/core/global" "infini.sh/framework/core/orm" "infini.sh/framework/core/pipeline" task2 "infini.sh/framework/core/task" "infini.sh/framework/core/util" "infini.sh/framework/modules/elastic/common" - "runtime" + "math" + "strings" + "syscall" "time" ) -type ClusterMigrationProcessor struct { +type DispatcherProcessor struct { id string - config *ClusterMigrationConfig + config *DispatcherConfig + state map[string]DispatcherState } -type ClusterMigrationConfig struct { +type DispatcherConfig struct { Elasticsearch string `config:"elasticsearch,omitempty"` IndexName string `config:"index_name"` - DetectIntervalInMs int `config:"detect_interval_in_ms"` LogIndexName string `config:"log_index_name"` + MaxTasksPerInstance int `config:"max_tasks_per_instance"` + CheckInstanceAvailable bool `config:"check_instance_available"` + TaskBatchSize int `config:"task_batch_size"` +} + +type DispatcherState struct { + Total int } func init() { - pipeline.RegisterProcessorPlugin("cluster_migration", newClusterMigrationProcessor) + pipeline.RegisterProcessorPlugin("migration_dispatcher", newMigrationDispatcherProcessor) } -func newClusterMigrationProcessor(c *config.Config) (pipeline.Processor, error) { +func newMigrationDispatcherProcessor(c *config.Config) (pipeline.Processor, error) { - cfg := ClusterMigrationConfig{ - DetectIntervalInMs: 5000, - } + cfg := DispatcherConfig{} if err := c.Unpack(&cfg); err != nil { log.Error(err) - return nil, fmt.Errorf("failed to unpack the configuration of cluster migration processor: %s", err) + return nil, fmt.Errorf("failed to unpack the configuration of migration dispatcher processor: %s", err) } if cfg.IndexName == "" || cfg.LogIndexName == "" { ormConfig := common.ORMConfig{} @@ -53,7 +63,7 @@ func newClusterMigrationProcessor(c *config.Config) (pipeline.Processor, error) cfg.IndexName = fmt.Sprintf("%stask", ormConfig.IndexPrefix) } if cfg.LogIndexName == "" { - cfg.LogIndexName = fmt.Sprintf("%stask-log", ormConfig.IndexPrefix) + cfg.LogIndexName = fmt.Sprintf("%slogs", ormConfig.IndexPrefix) } }else{ err = fmt.Errorf("parse config elastic.orm error: %w", err) @@ -62,96 +72,92 @@ func newClusterMigrationProcessor(c *config.Config) (pipeline.Processor, error) } } global.Register("cluster_migration_config", &cfg) + if cfg.MaxTasksPerInstance <= 0 { + cfg.MaxTasksPerInstance = 10 + } + if cfg.TaskBatchSize <= 0 { + cfg.TaskBatchSize = 50 + } - processor := ClusterMigrationProcessor{ + //query and then init dispatcher state + processor := DispatcherProcessor{ id: util.GetUUID(), config: &cfg, + state: map[string]DispatcherState{}, } + state, err := processor.getInstanceTaskState() + if err != nil { + log.Error(err) + return nil, err + } + processor.state = state return &processor, nil } -func (p *ClusterMigrationProcessor) Name() string { - return "cluster_migration" +func (p *DispatcherProcessor) Name() string { + return "migration_dispatcher" } -func (p *ClusterMigrationProcessor) Process(ctx *pipeline.Context) error { - defer func() { - if !global.Env().IsDebug { - if r := recover(); r != nil { - var v string - switch r.(type) { - case error: - v = r.(error).Error() - case runtime.Error: - v = r.(runtime.Error).Error() - case string: - v = r.(string) - } - log.Errorf("error in %s processor: %v", p.Name(), v) - } - } - log.Tracef("exit %s processor", p.Name()) - }() - +func (p *DispatcherProcessor) Process(ctx *pipeline.Context) error { for { if ctx.IsCanceled() { return nil } - tasks, err := p.getClusterMigrationTasks(20) + tasks, err := p.getMigrationTasks(p.config.TaskBatchSize) if err != nil { - panic(err) + return err } if len(tasks) == 0 { log.Debug("got zero cluster migration task from es") - if p.config.DetectIntervalInMs > 0 { - time.Sleep(time.Millisecond * time.Duration(p.config.DetectIntervalInMs)) - } + return nil } for _, t := range tasks { if ctx.IsCanceled() { return nil } - t.Status = task2.StatusRunning - t.StartTimeInMillis = time.Now().UnixMilli() - p.writeTaskLog(&t, &task2.Log{ - ID: util.GetUUID(), - TaskId: t.ID, - Status: task2.StatusRunning, - Type: t.Metadata.Type, - Action: task2.LogAction{ - Parameters: t.Parameters, - }, - Content: fmt.Sprintf("starting to execute task [%s]", t.ID), - Timestamp: time.Now().UTC(), - }) - err = p.SplitMigrationTask(&t) - taskLog := &task2.Log{ - ID: util.GetUUID(), - TaskId: t.ID, - Status: task2.StatusRunning, - Type: t.Metadata.Type, - Action: task2.LogAction{ - Parameters: t.Parameters, - Result: &task2.LogResult{ - Success: true, - }, - }, - Content: fmt.Sprintf("success to split task [%s]", t.ID), - Timestamp: time.Now().UTC(), + if t.Metadata.Labels == nil { + log.Error("got migration task with empty labels, skip handling: %v", t) + continue } - if err != nil { - taskLog.Status = task2.StatusError - taskLog.Content = fmt.Sprintf("failed to split task [%s]: %v", t.ID, err) - taskLog.Action.Result = &task2.LogResult{ - Success: false, - Error: err.Error(), + if t.Metadata.Labels["business_id"] == "cluster_migration" { + //handle major task + switch t.Status { + case task2.StatusReady: + err = p.handleReadyMajorTask(&t) + case task2.StatusRunning: + err = p.handleRunningMajorTask(&t) + case task2.StatusPendingStop: + err = p.handlePendingStopMajorTask(&t) + } + }else if t.Metadata.Labels["business_id"] == "index_migration" { + //handle sub migration task + switch t.Status { + case task2.StatusReady: + err = p.handleReadySubTask(&t) + case task2.StatusRunning: + err = p.handleRunningSubTask(&t) + case task2.StatusPendingStop: + err = p.handlePendingStopSubTask(&t) } } - t.Status = taskLog.Status - p.writeTaskLog(&t, taskLog) if err != nil { - continue + t.Status = task2.StatusError + tn := time.Now() + t.CompletedTime = &tn + p.saveTaskAndWriteLog(&t, &task2.Log{ + ID: util.GetUUID(), + TaskId: t.ID, + Status: task2.StatusError, + Type: t.Metadata.Type, + Config: t.Config, + Result: &task2.LogResult{ + Success: false, + Error: err.Error(), + }, + Message: fmt.Sprintf("failed to handling task [%s]: [%v]", t.ID, err), + Timestamp: time.Now().UTC(), + },"") } } //es index refresh @@ -159,34 +165,773 @@ func (p *ClusterMigrationProcessor) Process(ctx *pipeline.Context) error { } } - -func (p *ClusterMigrationProcessor) SplitMigrationTask(taskItem *task2.Task) error { +func (p *DispatcherProcessor) handleReadyMajorTask(taskItem *task2.Task) error{ if taskItem.Metadata.Labels == nil { - return fmt.Errorf("empty metadata labels, unexpected cluster migration task: %s", util.MustToJSON(taskItem)) + return fmt.Errorf("got migration task with empty labels, skip handling: %v", taskItem) } - if taskItem.Metadata.Labels["pipeline_id"] != p.Name() { - log.Tracef("got unexpect task type of %s with task id [%s] in cluster migration processor", taskItem.Metadata.Type, taskItem.ID) - return nil + if taskItem.Metadata.Labels["is_split"] != true { + err := p.splitMajorMigrationTask(taskItem) + if err != nil { + return err + } + taskItem.Metadata.Labels["is_split"] = true + }else{ + taskItem.RetryTimes++ } - parameters := util.MapStr(taskItem.Parameters) - migrationConfig, err := parameters.GetValue("pipeline.config") + //update status of subtask to ready + query := util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "parent_id": util.MapStr{ + "value": taskItem.ID, + }, + }, + }, + { + "terms": util.MapStr{ + "status": []string{task2.StatusError, task2.StatusStopped}, + }, + }, + { + "term": util.MapStr{ + "metadata.labels.business_id": util.MapStr{ + "value": "index_migration", + }, + }, + }, + }, + }, + } + queryDsl := util.MapStr{ + "query": query, + "script": util.MapStr{ + "source": fmt.Sprintf("ctx._source['status'] = '%s'", task2.StatusReady), + }, + } + + esClient := elastic.GetClient(p.config.Elasticsearch) + _, err := esClient.UpdateByQuery(p.config.IndexName, util.MustToJSONBytes(queryDsl) ) if err != nil { return err } - buf := util.MustToJSONBytes(migrationConfig) + taskLog := &task2.Log{ + ID: util.GetUUID(), + TaskId: taskItem.ID, + Status: task2.StatusRunning, + Type: taskItem.Metadata.Type, + Config: taskItem.Config, + Result: &task2.LogResult{ + Success: true, + }, + Message: fmt.Sprintf("success to start task [%s]", taskItem.ID), + Timestamp: time.Now().UTC(), + } + taskItem.Status = task2.StatusRunning + p.saveTaskAndWriteLog(taskItem, taskLog, "") + return nil +} + +func (p *DispatcherProcessor) handlePendingStopMajorTask(taskItem *task2.Task) error{ + //check whether all pipeline task is stopped or not, then update task status + q := util.MapStr{ + "size": 200, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "metadata.labels.business_id": "index_migration", + }, + }, + { + "terms": util.MapStr{ + "status": []string{task2.StatusRunning, task2.StatusPendingStop}, + }, + }, + }, + }, + }, + } + tasks, err := p.getTasks(q) + if err != nil { + return err + } + // all subtask stopped or error or complete + if len(tasks) == 0 { + taskItem.Status = task2.StatusStopped + p.saveTaskAndWriteLog(taskItem, &task2.Log{ + ID: util.GetUUID(), + TaskId: taskItem.ID, + Status: task2.StatusStopped, + Type: taskItem.Metadata.Type, + Config: taskItem.Config, + Message: fmt.Sprintf("task [%s] is stopped", taskItem.ID), + Timestamp: time.Now().UTC(), + },"") + } + return nil +} +func (p *DispatcherProcessor) handleRunningMajorTask(taskItem *task2.Task) error{ + ts, err := p.getMajorTaskState(taskItem) + if err != nil { + return err + } + if ts.Status == task2.StatusComplete || ts.Status == task2.StatusError { + taskItem.Metadata.Labels["target_total_docs"] = ts.IndexDocs + taskItem.Status = ts.Status + tn := time.Now() + taskItem.CompletedTime = &tn + p.saveTaskAndWriteLog(taskItem, &task2.Log{ + ID: util.GetUUID(), + TaskId: taskItem.ID, + Status: taskItem.Status, + Type: taskItem.Metadata.Type, + Config: taskItem.Config, + Message: fmt.Sprintf("task [%s] is complete", taskItem.ID), + Timestamp: time.Now().UTC(), + },"") + } + return nil +} + +func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error{ + state, err := p.getTaskCompleteState(taskItem) + if err != nil { + return err + } + if state.IsComplete { + if taskItem.Metadata.Labels != nil { + taskItem.Metadata.Labels["index_docs"] = state.SuccessDocs + taskItem.Metadata.Labels["scrolled_docs"] = state.ScrolledDocs + if instanceID, ok := taskItem.Metadata.Labels["execution_instance_id"].(string); ok { + inst := model.Instance{} + inst.ID = instanceID + _, err = orm.Get(&inst) + if err == nil { + for _, pipelineID := range state.PipelineIds { + err = inst.DeletePipeline(pipelineID) + if err != nil { + log.Error(err) + continue + } + selector := util.MapStr{ + "labels": util.MapStr{ + "migration_task_id": taskItem.ID, + }, + } + //clear queue + err = inst.DeleteQueueBySelector(selector) + if err != nil { + log.Error(err) + } + } + } + if st, ok := p.state[instanceID]; ok { + st.Total -= 1 + p.state[instanceID] = st + } + } + + } + if state.Error != "" && state.TotalDocs != state.SuccessDocs { + taskItem.Status = task2.StatusError + }else { + taskItem.Status = task2.StatusComplete + } + + tn := time.Now() + taskItem.CompletedTime = &tn + p.saveTaskAndWriteLog(taskItem, &task2.Log{ + ID: util.GetUUID(), + TaskId: taskItem.ID, + Status: taskItem.Status, + Type: taskItem.Metadata.Type, + Config: taskItem.Config, + Result: &task2.LogResult{ + Success: state.Error == "", + Error: state.Error, + }, + Message: fmt.Sprintf("task [%s] is complete", taskItem.ID), + Timestamp: time.Now().UTC(), + },"") + }else{ + if state.RunningPhase == 1 && taskItem.Metadata.Labels["running_phase"] == float64(1) { + ptasks, err := p.getPipelineTasks(taskItem.ID) + if err != nil { + return err + } + var bulkTask *task2.Task + for i, t := range ptasks { + if t.Metadata.Labels != nil { + if t.Metadata.Labels["pipeline_id"] == "bulk_indexing" { + bulkTask = &ptasks[i] + } + } + } + if bulkTask == nil { + return fmt.Errorf("can not found bulk_indexing pipeline of sub task [%s]", taskItem.ID) + } + if bulkTask.Metadata.Labels != nil { + if instID, ok := bulkTask.Metadata.Labels["execution_instance_id"].(string); ok { + inst := &model.Instance{} + inst.ID = instID + _, err = orm.Get(inst) + if err != nil { + return err + } + err = inst.CreatePipeline(util.MustToJSONBytes(bulkTask.Config)) + if err != nil { + return err + } + taskItem.Metadata.Labels["running_phase"] = 2 + } + } + p.saveTaskAndWriteLog(taskItem,nil, "wait_for") + } + } + return nil +} + +func (p *DispatcherProcessor) handlePendingStopSubTask(taskItem *task2.Task) error{ + //check whether all pipeline task is stopped or not, then update task status + ptasks, err := p.getPipelineTasks(taskItem.ID) + if err != nil { + return err + } + var taskIDs []string + for _, t := range ptasks { + taskIDs = append(taskIDs, t.ID) + } + esClient := elastic.GetClient(p.config.Elasticsearch) + q := util.MapStr{ + "size": len(taskIDs), + "sort": []util.MapStr{ + { + "payload.pipeline.status_log.steps": util.MapStr{ + "order": "desc", + }, + }, + }, + "collapse": util.MapStr{ + "field": "metadata.labels.task_id", + }, + "query": util.MapStr{ + "terms": util.MapStr{ + "metadata.labels.task_id": taskIDs, + }, + }, + } + searchRes, err := esClient.SearchWithRawQueryDSL(p.config.LogIndexName, util.MustToJSONBytes(q)) + if err != nil { + return err + } + if len(searchRes.Hits.Hits) == 0 { + return nil + } + MainLoop: + for _, hit := range searchRes.Hits.Hits { + status, _ := util.MapStr(hit.Source).GetValue("payload.pipeline.status_log.status") + if status != "STOPPED" { + //call instance api to stop scroll/bulk_indexing pipeline task + if instID, ok := taskItem.Metadata.Labels["execution_instance_id"].(string); ok { + inst := model.Instance{} + inst.ID = instID + _, err = orm.Get(&inst) + if err != nil { + return err + } + hasStopped := true + for _, pipelineID := range taskIDs { + err = inst.StopPipelineWithTimeout(pipelineID, time.Second) + if err != nil { + if !errors.Is(err, syscall.ECONNREFUSED) && !strings.Contains(err.Error(), "task not found"){ + hasStopped = false + break + } + log.Error(err) + } + } + if hasStopped { + break MainLoop + } + } + return nil + } + } + taskItem.Status = task2.StatusStopped + + //delete pipeline and clear queue + if instanceID, ok := taskItem.Metadata.Labels["execution_instance_id"].(string); ok { + inst := model.Instance{} + inst.ID = instanceID + _, err = orm.Get(&inst) + if err != nil { + return err + } + for _, pipelineID := range taskIDs { + err = inst.DeletePipeline(pipelineID) + if err != nil { + log.Error(err) + continue + } + selector := util.MapStr{ + "labels": util.MapStr{ + "migration_task_id": taskItem.ID, + }, + } + //clear queue + err = inst.DeleteQueueBySelector(selector) + if err != nil { + log.Error(err) + } + } + if st, ok := p.state[instanceID]; ok { + st.Total -= 1 + p.state[instanceID] = st + } + } + p.saveTaskAndWriteLog(taskItem, &task2.Log{ + ID: util.GetUUID(), + TaskId: taskItem.ID, + Status: task2.StatusStopped, + Type: taskItem.Metadata.Type, + Config: taskItem.Config, + Message: fmt.Sprintf("task [%s] is stopped", taskItem.ID), + Timestamp: time.Now().UTC(), + },"") + return nil +} + +func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error{ + if taskItem.Metadata.Labels == nil { + return fmt.Errorf("empty labels") + } + var ( + scrollTask *task2.Task + bulkTask *task2.Task + ) + if taskItem.Metadata.Labels["is_split"] == true { + //query split pipeline task + ptasks, err := p.getPipelineTasks(taskItem.ID) + if err != nil { + return err + } + for i, t := range ptasks { + if t.Metadata.Labels != nil { + if cfg, ok := ptasks[i].Config.(map[string]interface{}); ok { + util.MapStr(cfg).Put("labels.retry_no", taskItem.RetryTimes + 1) + } + if t.Metadata.Labels["pipeline_id"] == "es_scroll" { + scrollTask = &ptasks[i] + }else if t.Metadata.Labels["pipeline_id"] == "bulk_indexing" { + bulkTask = &ptasks[i] + } + } + } + if scrollTask == nil || bulkTask == nil { + return fmt.Errorf("es_scroll or bulk_indexing pipeline task not found") + } + taskItem.RetryTimes++ + }else { + //split task to scroll/bulk_indexing pipeline and then persistent + var pids []string + pids = append(pids, taskItem.ParentId...) + pids = append(pids, taskItem.ID) + scrollID := util.GetUUID() + var ( + cfg map[string]interface{} + ok bool + ) + if cfg, ok = taskItem.Config.(map[string]interface{}); !ok { + return fmt.Errorf("got wrong config [%v] with task [%s]", taskItem.Config, taskItem.ID) + } + cfgm := util.MapStr(cfg) + var ( + sourceClusterID string + targetClusterID string + ) + if sourceClusterID, ok = getMapValue(cfgm, "source.cluster_id").(string); !ok { + return fmt.Errorf("got wrong source cluster id of task [%v]", *taskItem) + } + if targetClusterID, ok = getMapValue(cfgm, "target.cluster_id").(string); !ok { + return fmt.Errorf("got wrong target cluster id of task [%v]", *taskItem) + } + esConfig := elastic.GetConfig(sourceClusterID) + esTargetConfig := elastic.GetConfig(targetClusterID) + docType := common.GetClusterDocType(targetClusterID) + if len(taskItem.ParentId) == 0 { + return fmt.Errorf("got wrong parent id of task [%v]", *taskItem) + } + queryDsl := getMapValue(cfgm, "source.query_dsl") + scrollQueryDsl := util.MustToJSON(util.MapStr{ + "query": queryDsl, + }) + indexName := getMapValue(cfgm, "source.indices") + scrollTask = &task2.Task{ + ParentId: pids, + Runnable: true, + Cancellable: true, + Metadata: task2.Metadata{ + Type: "pipeline", + Labels: util.MapStr{ + "cluster_id": sourceClusterID, + "pipeline_id": "es_scroll", + "index_name": indexName, + "unique_index_name": taskItem.Metadata.Labels["unique_index_name"], + }, + }, + Config: util.MapStr{ + "name": scrollID, + "logging": util.MapStr{ + "enabled": true, + }, + "labels": util.MapStr{ + "parent_task_id": pids, + "unique_index_name": taskItem.Metadata.Labels["unique_index_name"], + "retry_no": taskItem.RetryTimes, + }, + "auto_start": true, + "keep_running": false, + "processor": []util.MapStr{ + { + "es_scroll": util.MapStr{ + "remove_type": docType == "", + "slice_size": getMapValue(cfgm, "source.slice_size"), + "batch_size": getMapValue(cfgm, "source.batch_size"), + "indices": indexName, + "elasticsearch": sourceClusterID, + "elasticsearch_config": util.MapStr{ + "name": sourceClusterID, + "enabled": true, + "endpoint": esConfig.Endpoint, + "basic_auth": esConfig.BasicAuth, + }, + "queue": util.MapStr{ + "name": scrollID, + "labels": util.MapStr{ + "migration_task_id": taskItem.ID, + }, + }, + "partition_size": 20, + "scroll_time": getMapValue(cfgm, "source.scroll_time"), + "query_dsl": scrollQueryDsl, + "index_rename": getMapValue(cfgm, "source.index_rename"), + "type_rename": getMapValue(cfgm, "source.type_rename"), + }, + }, + }, + }, + } + scrollTask.ID = scrollID + + bulkID := util.GetUUID() + bulkTask = &task2.Task{ + ParentId: pids, + Runnable: true, + Cancellable: true, + Metadata: task2.Metadata{ + Type: "pipeline", + Labels: util.MapStr{ + "cluster_id": targetClusterID, + "pipeline_id": "bulk_indexing", + "index_name": indexName, + "unique_index_name": taskItem.Metadata.Labels["unique_index_name"], + }, + }, + Config: util.MapStr{ + "name": bulkID, + "logging": util.MapStr{ + "enabled": true, + }, + "labels": util.MapStr{ + "parent_task_id": pids, + "unique_index_name": taskItem.Metadata.Labels["unique_index_name"], + "retry_no": taskItem.RetryTimes, + }, + "auto_start": true, + "keep_running": false, + "processor": []util.MapStr{ + { + "bulk_indexing": util.MapStr{ + "detect_active_queue": false, + "bulk": util.MapStr{ + "batch_size_in_mb": getMapValue(cfgm, "target.bulk.batch_size_in_mb"), + "batch_size_in_docs": getMapValue(cfgm, "target.bulk.batch_size_in_docs"), + "invalid_queue": "bulk_indexing_400", + //"retry_rules": util.MapStr{ + // "default": false, + // "retry_4xx": false, + // "retry_429": true, + //}, + }, + "max_worker_size": getMapValue(cfgm, "target.bulk.max_worker_size"), + "num_of_slices": getMapValue(cfgm, "target.bulk.slice_size"), + "idle_timeout_in_seconds": getMapValue(cfgm, "target.bulk.idle_timeout_in_seconds"), + "elasticsearch": targetClusterID, + "elasticsearch_config": util.MapStr{ + "name": targetClusterID, + "enabled": true, + "endpoint": esTargetConfig.Endpoint, + "basic_auth": esTargetConfig.BasicAuth, + }, + "queues": util.MapStr{ + "type": "scroll_docs", + "migration_task_id": taskItem.ID, + }, + }, + }, + }, + }, + } + bulkTask.ID = bulkID + } + instance, err := p.getPreferenceInstance(taskItem.ParentId[0]) + if err != nil { + return fmt.Errorf("get preference intance error: %w", err) + } + if p.state[instance.ID].Total >= p.config.MaxTasksPerInstance { + log.Infof("hit max tasks per instance with %d, skip dispatch", p.config.MaxTasksPerInstance) + return nil + } + scrollTask.Metadata.Labels["execution_instance_id"] = instance.ID + bulkTask.Metadata.Labels["execution_instance_id"] = instance.ID + + //try to clear queue when tasks are retried + if taskItem.RetryTimes > 0 { + selector := util.MapStr{ + "labels": util.MapStr{ + "migration_task_id": taskItem.ID, + }, + } + _ = instance.DeleteQueueBySelector(selector) + } + + //call instance api to create pipeline task + err = instance.CreatePipeline(util.MustToJSONBytes(scrollTask.Config)) + if err != nil { + return err + } + //err = instance.CreatePipeline(util.MustToJSONBytes(bulkTask.Config)) + //if err != nil { + // return err + //} + //save task info + if taskItem.Metadata.Labels["is_split"] != true { + err = orm.Create(nil, scrollTask) + if err != nil { + return fmt.Errorf("create scroll pipeline task error: %w", err) + } + err = orm.Create(nil, bulkTask) + if err != nil { + return fmt.Errorf("create bulk_indexing pipeline task error: %w", err) + } + }else{ + err = orm.Update(nil, scrollTask) + if err != nil { + return fmt.Errorf("update scroll pipeline task error: %w", err) + } + err = orm.Update(nil, bulkTask) + if err != nil { + return fmt.Errorf("update bulk_indexing pipeline task error: %w", err) + } + } + taskItem.Metadata.Labels["is_split"] = true + taskItem.Metadata.Labels["running_phase"] = 1 + //update dispatcher state + instanceState := p.state[instance.ID] + instanceState.Total = instanceState.Total + 1 + p.state[instance.ID] = instanceState + //update sub migration task status to ready and save task log + taskItem.Metadata.Labels["execution_instance_id"] = instance.ID + taskItem.Metadata.Labels["index_docs"] = 0 + taskItem.Metadata.Labels["scrolled_docs"] = 0 + taskItem.Status = task2.StatusRunning + taskItem.StartTimeInMillis = time.Now().UnixMilli() + + taskLog := &task2.Log{ + ID: util.GetUUID(), + TaskId: taskItem.ID, + Status: task2.StatusRunning, + Type: taskItem.Metadata.Type, + Config: taskItem.Config, + Result: &task2.LogResult{ + Success: true, + }, + Message: fmt.Sprintf("dispatch task [%s] to instance ", taskItem.ID), + Timestamp: time.Now().UTC(), + } + p.saveTaskAndWriteLog(taskItem, taskLog, "wait_for") + return nil +} + +func getMapValue(m util.MapStr, key string) interface{}{ + v, _ := m.GetValue(key) + return v +} + +func (p *DispatcherProcessor) getPreferenceInstance(majorTaskID string) (instance model.Instance, err error) { + majorTask := task2.Task{} + majorTask.ID = majorTaskID + _, err = orm.Get(&majorTask) + if err != nil { + return + } + cfg := ElasticDataConfig{} + buf, err := util.ToJSONBytes(majorTask.Config) + if err != nil { + return + } + err = util.FromJSONBytes(buf, &cfg) + if err != nil { + return + } + var ( + total = math.MaxInt + tempInst = model.Instance{} + ) + for _, node := range cfg.Settings.Execution.Nodes.Permit { + if p.state[node.ID].Total < total { + if p.config.CheckInstanceAvailable { + tempInst.ID = node.ID + _, err = orm.Get(&tempInst) + if err != nil { + log.Error(err) + continue + } + err = tempInst.TryConnectWithTimeout(time.Second) + if err != nil { + log.Debugf("instance [%s] is not available, caused by: %v", tempInst.ID, err) + continue + } + } + instance.ID = node.ID + total = p.state[node.ID].Total + } + } + if instance.ID == "" && p.config.CheckInstanceAvailable { + return instance, fmt.Errorf("no available instance") + } + if instance.ID == tempInst.ID { + return tempInst, nil + } + _, err = orm.Get(&instance) + return +} +func (p *DispatcherProcessor) getMigrationTasks(size int)([]task2.Task, error){ + majorTaskQ := util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "metadata.labels.business_id": "cluster_migration", + }, + }, + { + "terms": util.MapStr{ + "status": []string{task2.StatusReady, task2.StatusRunning, task2.StatusPendingStop}, + }, + }, + }, + }, + } + subTaskQ := util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "metadata.labels.business_id": "index_migration", + }, + }, + { + "terms": util.MapStr{ + "status": []string{task2.StatusReady, task2.StatusRunning, task2.StatusPendingStop}, + }, + }, + }, + }, + } + + queryDsl := util.MapStr{ + "size": size, + "sort": []util.MapStr{ + { + "created": util.MapStr{ + "order": "asc", + }, + }, + }, + "query": util.MapStr{ + "bool": util.MapStr{ + "should": []util.MapStr{ + majorTaskQ, subTaskQ, + }, + "minimum_should_match": 1, + }, + }, + } + return p.getTasks(queryDsl) +} + +func (p *DispatcherProcessor) saveTaskAndWriteLog(taskItem *task2.Task, logItem *task2.Log, refresh string) { + esClient := elastic.GetClient(p.config.Elasticsearch) + _, err := esClient.Index(p.config.IndexName,"", taskItem.ID, taskItem, refresh ) + if err != nil{ + log.Error(err) + } + if logItem != nil { + event.SaveLog(event.Event{ + Metadata: event.EventMetadata{ + Category: "migration", + Name: "logging", + Datatype: "event", + Labels: util.MapStr{ + "task_id": logItem.TaskId, + "parent_task_id": taskItem.ParentId, + "retry_no": taskItem.RetryTimes, + }, + }, + Fields: util.MapStr{ + "migration": util.MapStr{ + "logging": util.MapStr{ + "config": logItem.Config, + "context": logItem.Context, + "status": logItem.Status, + "message": logItem.Message, + "result": logItem.Result, + }, + }, + }, + }) + } +} + +func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) error { + if taskItem.Metadata.Labels == nil { + return fmt.Errorf("empty metadata labels, unexpected cluster migration task: %s", util.MustToJSON(taskItem)) + } + if taskItem.Metadata.Labels["is_split"] == true { + return nil + } + if taskItem.Metadata.Labels["business_id"] != "cluster_migration" { + log.Tracef("got unexpect task type of %s with task id [%s] in cluster migration processor", taskItem.Metadata.Type, taskItem.ID) + return nil + } + + buf := util.MustToJSONBytes(taskItem.Config) clusterMigrationTask := ElasticDataConfig{} - err = util.FromJSONBytes(buf, &clusterMigrationTask) + err := util.FromJSONBytes(buf, &clusterMigrationTask) if err != nil { return err } defer func() { - parameters.Put("pipeline.config", clusterMigrationTask) + taskItem.Config = clusterMigrationTask }() esSourceClient := elastic.GetClient(clusterMigrationTask.Cluster.Source.Id) esTargetClient := elastic.GetClient(clusterMigrationTask.Cluster.Target.Id) - //esClient := elastic.GetClient(p.config.Elasticsearch) - for i, index := range clusterMigrationTask.Indices { + for _, index := range clusterMigrationTask.Indices { source := util.MapStr{ "cluster_id": clusterMigrationTask.Cluster.Source.Id, "indices": index.Source.Name, @@ -256,40 +1001,37 @@ func (p *ClusterMigrationProcessor) SplitMigrationTask(taskItem *task2.Task) err "bulk": util.MapStr{ "batch_size_in_mb": clusterMigrationTask.Settings.Bulk.StoreSizeInMB, "batch_size_in_docs": clusterMigrationTask.Settings.Bulk.Docs, + "max_worker_size": clusterMigrationTask.Settings.Bulk.MaxWorkerSize, + "idle_timeout_in_seconds": clusterMigrationTask.Settings.Bulk.IdleTimeoutInSeconds, + "slice_size": clusterMigrationTask.Settings.Bulk.SliceSize, }, } - indexParameters := map[string]interface{}{ - "pipeline": util.MapStr{ - "id": "index_migration", - "config": util.MapStr{ - "source": source, - "target": target, - "execution": clusterMigrationTask.Settings.Execution, - }, - }, + indexParameters := util.MapStr{ + "source": source, + "target": target, } indexMigrationTask := task2.Task{ ParentId: []string{taskItem.ID}, Cancellable: true, Runnable: false, - Status: task2.StatusRunning, + Status: task2.StatusReady, StartTimeInMillis: time.Now().UnixMilli(), Metadata: task2.Metadata{ Type: "pipeline", Labels: util.MapStr{ - "pipeline_id": "index_migration", + "business_id": "index_migration", "source_cluster_id": clusterMigrationTask.Cluster.Source.Id, "target_cluster_id": clusterMigrationTask.Cluster.Target.Id, - "level": "index", "partition_count": 1, + "index_name": index.Source.Name, + "unique_index_name": index.Source.GetUniqueIndexName(), }, }, - Parameters: indexParameters, + Config: indexParameters, } indexMigrationTask.ID = util.GetUUID() - clusterMigrationTask.Indices[i].TaskID = indexMigrationTask.ID if index.Partition != nil { partitionQ := &elastic.PartitionQuery{ IndexName: index.Source.Name, @@ -356,122 +1098,68 @@ func (p *ClusterMigrationProcessor) SplitMigrationTask(taskItem *task2.Task) err } partitionMigrationTask := task2.Task{ - ParentId: []string{taskItem.ID, indexMigrationTask.ID}, + ParentId: []string{taskItem.ID}, Cancellable: false, Runnable: true, Status: task2.StatusReady, Metadata: task2.Metadata{ Type: "pipeline", Labels: util.MapStr{ - "pipeline_id": "index_migration", + "business_id": "index_migration", "source_cluster_id": clusterMigrationTask.Cluster.Source.Id, "target_cluster_id": clusterMigrationTask.Cluster.Target.Id, - "level": "partition", "index_name": index.Source.Name, - "execution": util.MapStr{ - "nodes": util.MapStr{ - "permit": clusterMigrationTask.Settings.Execution.Nodes.Permit, - }, - }, + "unique_index_name": index.Source.GetUniqueIndexName(), }, }, - Parameters: map[string]interface{}{ - "pipeline": util.MapStr{ - "id": "index_migration", - "config": util.MapStr{ - "source": partitionSource, - "target": target, - "execution": clusterMigrationTask.Settings.Execution, - }, - }, + Config: util.MapStr{ + "source": partitionSource, + "target": target, + "execution": clusterMigrationTask.Settings.Execution, }, } partitionMigrationTask.ID = util.GetUUID() err = orm.Create(nil, &partitionMigrationTask) - //_, err = esClient.Index(p.config.IndexName, "", partitionMigrationTask.ID, partitionMigrationTask, "") delete(target, "query_dsl") if err != nil { return fmt.Errorf("store index migration task(partition) error: %w", err) } } - indexMigrationTask.Metadata.Labels["partition_count"] = partitionID }else{ source["doc_count"] = index.Source.Docs - if targetMust != nil { - target["query_dsl"] = util.MapStr{ - "bool": util.MapStr{ - "must": targetMust, - }, - } - } - partitionMigrationTask := task2.Task{ - ParentId: []string{taskItem.ID, indexMigrationTask.ID}, - Cancellable: false, - Runnable: true, - Status: task2.StatusReady, - Metadata: task2.Metadata{ - Type: "pipeline", - Labels: util.MapStr{ - "pipeline_id": "index_migration", - "source_cluster_id": clusterMigrationTask.Cluster.Source.Id, - "target_cluster_id": clusterMigrationTask.Cluster.Target.Id, - "level": "partition", - "index_name": index.Source.Name, - "execution": util.MapStr{ - "nodes": util.MapStr{ - "permit": clusterMigrationTask.Settings.Execution.Nodes.Permit, - }, - }, - }, - }, - Parameters: indexParameters, - } - orm.Create(nil, &partitionMigrationTask) - //_, err = esClient.Index(p.config.IndexName, "", partitionMigrationTask.ID, partitionMigrationTask, "") - delete(target, "query_dsl") + err = orm.Create(nil, &indexMigrationTask) if err != nil { - return fmt.Errorf("store index migration task(partition) error: %w", err) + return fmt.Errorf("store index migration task error: %w", err) } } - err = orm.Create(nil, &indexMigrationTask) - //_, err = esClient.Index(p.config.IndexName, "", indexMigrationTask.ID, indexMigrationTask, "") - if err != nil { - return fmt.Errorf("store index migration task error: %w", err) - } } return nil } -func (p *ClusterMigrationProcessor) getClusterMigrationTasks(size int)([]task2.Task, error){ +func (p *DispatcherProcessor) getPipelineTasks(subTaskID string) ([]task2.Task, error){ queryDsl := util.MapStr{ - "size": size, - "sort": []util.MapStr{ - { - "created": util.MapStr{ - "order": "asc", - }, - }, - }, + "size": 2, "query": util.MapStr{ "bool": util.MapStr{ "must": []util.MapStr{ { "term": util.MapStr{ - "status": task2.StatusReady, - }, - }, - { - "term": util.MapStr{ - "metadata.labels.pipeline_id": p.Name(), + "parent_id": util.MapStr{ + "value": subTaskID, + }, }, }, }, }, }, } + return p.getTasks(queryDsl) +} + +func (p *DispatcherProcessor) getTasks(query interface{}) ([]task2.Task, error){ esClient := elastic.GetClient(p.config.Elasticsearch) - res, err := esClient.SearchWithRawQueryDSL(p.config.IndexName, util.MustToJSONBytes(queryDsl)) + res, err := esClient.SearchWithRawQueryDSL(p.config.IndexName, util.MustToJSONBytes(query)) if err != nil { return nil, err } @@ -494,14 +1182,255 @@ func (p *ClusterMigrationProcessor) getClusterMigrationTasks(size int)([]task2.T return migrationTasks, nil } -func (p *ClusterMigrationProcessor) writeTaskLog(taskItem *task2.Task, logItem *task2.Log) { +func (p *DispatcherProcessor) getTaskCompleteState(subTask *task2.Task) (*TaskCompleteState, error){ + ptasks, err := p.getPipelineTasks(subTask.ID) + if err != nil { + return nil, err + } + var pids []string + for _, t := range ptasks { + pids = append(pids, t.ID) + } + + if len(pids) == 0 { + return nil, fmt.Errorf("pipeline task not found") + } + query := util.MapStr{ + "sort": []util.MapStr{ + { + "timestamp": util.MapStr{ + "order": "desc", + }, + }, + { + "payload.pipeline.logging.steps": util.MapStr{ + "order": "desc", + }, + }, + }, + "collapse": util.MapStr{ + "field": "metadata.labels.task_id", + }, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "terms": util.MapStr{ + "metadata.labels.task_id": pids, + }, + }, + { + "range": util.MapStr{ + "timestamp": util.MapStr{ + "gt": subTask.StartTimeInMillis - 30*1000, + }, + }, + }, + }, + }, + + }, + } esClient := elastic.GetClient(p.config.Elasticsearch) - _, err := esClient.Index(p.config.IndexName,"", logItem.TaskId, taskItem, "" ) - if err != nil{ - log.Error(err) + res, err := esClient.SearchWithRawQueryDSL(p.config.LogIndexName, util.MustToJSONBytes(query)) + if err != nil { + return nil, err } - _, err = esClient.Index(p.config.LogIndexName,"", logItem.ID, logItem, "" ) - if err != nil{ - log.Error(err) + var ( + cfg map[string]interface{} + ok bool + ) + if cfg, ok = subTask.Config.(map[string]interface{}); !ok { + return nil, fmt.Errorf("got wrong config of task %v", *subTask) } + totalDocs, err := util.MapStr(cfg).GetValue("source.doc_count") + if err != nil { + return nil, err + } + + var ( + indexDocs float64 + successDocs float64 + scrolledDocs interface{} + state TaskCompleteState + ) + state.TotalDocs = totalDocs + state.PipelineIds = pids + for _, hit := range res.Hits.Hits { + resultErr, _ := util.MapStr(hit.Source).GetValue("payload.pipeline.logging.result.error") + if errStr, ok := resultErr.(string); ok && errStr != "" { + state.Error = errStr + state.IsComplete = true + state.ClearPipeline = true + } + for _, key := range []string{"payload.pipeline.logging.context.bulk_indexing.success.count", "payload.pipeline.logging.context.bulk_indexing.failure.count", "payload.pipeline.logging.context.bulk_indexing.invalid.count"}{ + v, err := util.MapStr(hit.Source).GetValue(key) + if err == nil { + if fv, ok := v.(float64); ok { + indexDocs += fv + if key == "payload.pipeline.logging.context.bulk_indexing.success.count" { + successDocs = fv + state.SuccessDocs = successDocs + } + } + }else{ + break + } + } + v, err := util.MapStr(hit.Source).GetValue("payload.pipeline.logging.context.es_scroll.scrolled_docs") + if err == nil { + scrolledDocs = v + if vv, ok := v.(float64); ok { + state.ScrolledDocs = vv + } + } + } + if totalDocs == scrolledDocs { + state.RunningPhase = 1 + } + if (totalDocs == indexDocs || successDocs == totalDocs) && totalDocs == scrolledDocs { + if successDocs != totalDocs { + if state.Error == "" { + if successDocs > 0 { + state.Error = "partial complete" + }else{ + state.Error = "invalid request" + } + } + } + state.IsComplete = true + return &state, nil + } + //check instance is available + if subTask.Metadata.Labels != nil { + if instID, ok := subTask.Metadata.Labels["execution_instance_id"].(string); ok { + inst := model.Instance{} + inst.ID = instID + _, err = orm.Get(&inst) + if err != nil { + return nil, err + } + err = inst.TryConnectWithTimeout(time.Second * 3) + if err != nil && errors.Is(err, syscall.ECONNREFUSED) { + state.Error = fmt.Errorf("instance [%s] is unavailable: %w", instID, err).Error() + state.IsComplete = true + } + } + } + return &state, nil +} + +func (p *DispatcherProcessor) getMajorTaskState(majorTask *task2.Task) (taskState MajorTaskState, err error){ + query := util.MapStr{ + "size": 0, + "aggs": util.MapStr{ + "total_docs": util.MapStr{ + "sum": util.MapStr{ + "field": "metadata.labels.index_docs", + }, + }, + "grp": util.MapStr{ + "terms": util.MapStr{ + "field": "status", + }, + }, + }, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "parent_id": util.MapStr{ + "value": majorTask.ID, + }, + }, + }, + { + "term": util.MapStr{ + "metadata.labels.business_id": util.MapStr{ + "value": "index_migration", + }, + }, + }, + }, + }, + + }, + } + esClient := elastic.GetClient(p.config.Elasticsearch) + res, err := esClient.SearchWithRawQueryDSL( p.config.IndexName, util.MustToJSONBytes(query)) + if err != nil { + return taskState, err + } + if v, ok := res.Aggregations["total_docs"].Value.(float64); ok { + taskState.IndexDocs = v + } + var ( + hasError bool + ) + for _, bk := range res.Aggregations["grp"].Buckets { + if bk["key"] == task2.StatusReady || bk["key"] == task2.StatusRunning { + taskState.Status = task2.StatusRunning + return taskState, nil + } + if bk["key"] == task2.StatusError { + hasError = true + } + } + if hasError { + taskState.Status = task2.StatusError + }else { + taskState.Status = task2.StatusComplete + } + return taskState, nil +} + +func (p *DispatcherProcessor) getInstanceTaskState()(map[string]DispatcherState, error){ + query := util.MapStr{ + "size": 0, + "aggs": util.MapStr{ + "grp": util.MapStr{ + "terms": util.MapStr{ + "field": "metadata.labels.execution_instance_id", + "size": 1000, + }, + }, + }, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "metadata.labels.business_id": util.MapStr{ + "value": "index_migration", + }, + }, + }, + { + "term": util.MapStr{ + "status": util.MapStr{ + "value": task2.StatusRunning, + }, + }, + }, + }, + }, + }, + } + esClient := elastic.GetClient(p.config.Elasticsearch) + res, err := esClient.SearchWithRawQueryDSL(p.config.IndexName, util.MustToJSONBytes(query)) + if err != nil { + return nil, err + } + state := map[string]DispatcherState{} + for _, bk := range res.Aggregations["grp"].Buckets { + if key, ok := bk["key"].(string); ok { + if v, ok := bk["doc_count"].(float64); ok { + state[key] = DispatcherState{ + Total: int(v), + } + } + } + } + return state, nil } \ No newline at end of file