diff --git a/console.yml b/console.yml index 62070ed8..f29f1b89 100644 --- a/console.yml +++ b/console.yml @@ -48,4 +48,7 @@ metrics: enabled: true cluster_stats: true node_stats: true - index_stats: true \ No newline at end of file + index_stats: true +badger: + value_log_max_entries: 1000000 + value_log_file_size: 104857600 diff --git a/main.go b/main.go index 50ca3b66..88be737e 100644 --- a/main.go +++ b/main.go @@ -80,15 +80,9 @@ func main() { module.RegisterSystemModule(uiModule) if !global.Env().SetupRequired(){ - module.RegisterSystemModule(&stats.SimpleStatsModule{}) - module.RegisterSystemModule(&elastic2.ElasticModule{}) - module.RegisterSystemModule(&queue2.DiskQueue{}) - module.RegisterSystemModule(&redis.RedisModule{}) - module.RegisterSystemModule(&pipeline.PipeModule{}) - module.RegisterSystemModule(&task.TaskModule{}) - module.RegisterSystemModule(&agent.AgentModule{}) - module.RegisterSystemModule(&metrics.MetricsModule{}) - module.RegisterSystemModule(&security.Module{}) + for _, v := range modules { + module.RegisterSystemModule(v) + } }else{ for _, v := range modules { v.Setup() diff --git a/plugin/migration/api.go b/plugin/migration/api.go index 40af3b91..7a69b69b 100644 --- a/plugin/migration/api.go +++ b/plugin/migration/api.go @@ -13,6 +13,7 @@ import ( "infini.sh/framework/core/api/rbac" httprouter "infini.sh/framework/core/api/router" "infini.sh/framework/core/elastic" + "infini.sh/framework/core/global" "infini.sh/framework/core/orm" "infini.sh/framework/core/proxy" task2 "infini.sh/framework/core/task" @@ -57,14 +58,15 @@ func (h *APIHandler) createDataMigrationTask(w http.ResponseWriter, req *http.Re h.WriteError(w, "indices must not be empty", http.StatusInternalServerError) return } - clusterTaskConfig.Creator = struct { - Name string `json:"name"` - Id string `json:"id"` - }{} - claims, ok := req.Context().Value("user").(*rbac.UserClaims) - if ok { - clusterTaskConfig.Creator.Name = claims.Username - clusterTaskConfig.Creator.Id = claims.ID + user, err := rbac.FromUserContext(req.Context()) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + if user != nil { + clusterTaskConfig.Creator.Name = user.Username + clusterTaskConfig.Creator.Id = user.UserId } var totalDocs int64 @@ -87,23 +89,18 @@ func (h *APIHandler) createDataMigrationTask(w http.ResponseWriter, req *http.Re Status: task2.StatusInit, Parameters: map[string]interface{}{ "pipeline": util.MapStr{ - "id": "cluster_migration", "config": clusterTaskConfig, }, }, } + t.ID = util.GetUUID() err = orm.Create(nil, &t) if err != nil { h.WriteError(w, err.Error(), http.StatusInternalServerError) log.Error(err) return } - - h.WriteJSON(w, util.MapStr{ - "_id": t.ID, - "result": "created", - }, 200) - + h.WriteCreatedOKJSON(w, t.ID) } func (h *APIHandler) searchDataMigrationTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { @@ -172,7 +169,7 @@ func (h *APIHandler) searchDataMigrationTask(w http.ResponseWriter, req *http.Re h.WriteError(w, err.Error(), http.StatusInternalServerError) return } - mainLoop: +mainLoop: for _, hit := range searchRes.Hits.Hits { sourceM := util.MapStr(hit.Source) config, err := sourceM.GetValue("parameters.pipeline.config") @@ -188,8 +185,7 @@ func (h *APIHandler) searchDataMigrationTask(w http.ResponseWriter, req *http.Re continue } var targetTotalDocs int64 - targetTotal, _ := sourceM.GetValue("metadata.labels.target_total_docs") - if _, ok := targetTotal.(float64); !ok || hit.Source["status"] != task2.StatusComplete { + if hit.Source["status"] == task2.StatusRunning { esClient := elastic.GetClientNoPanic(dataConfig.Cluster.Target.Id) if esClient == nil { log.Warnf("cluster [%s] was not found", dataConfig.Cluster.Target.Id) @@ -255,7 +251,7 @@ func (h *APIHandler) startDataMigration(w http.ResponseWriter, req *http.Request //root task obj.Status = task2.StatusReady }else if obj.Status == task2.StatusStopped { - if obj.Metadata.Labels["level"] == "partition" { + if obj.Metadata.Labels != nil && obj.Metadata.Labels["level"] == "partition" { obj.Status = task2.StatusReady //update parent task status if len(obj.ParentId) == 0 { @@ -386,34 +382,6 @@ func getNodeEndpoint(nodeID string) (string, error){ return "", fmt.Errorf("got unexpect node info: %s", util.MustToJSON(result.Result[0])) } -func stopTask(nodeID, taskID string) error { - endpoint, err := getNodeEndpoint(nodeID) - if err != nil { - return err - } - res, err := proxy.DoProxyRequest(&proxy.Request{ - Method: http.MethodPost, - Endpoint: endpoint, - Path: fmt.Sprintf("/pipeline/task/%s/_stop", taskID), - }) - - if err != nil { - return fmt.Errorf("call stop task api error: %w", err) - } - resBody := struct { - Acknowledged bool `json:"acknowledged"` - Error string `json:"error"` - }{} - err = util.FromJSONBytes(res.Body, &resBody) - if err != nil { - return err - } - if resBody.Acknowledged { - return nil - } - return fmt.Errorf(resBody.Error) -} - func (h *APIHandler) stopDataMigrationTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { id := ps.MustGetParameter("task_id") obj := task2.Task{} @@ -451,37 +419,30 @@ func (h *APIHandler) stopDataMigrationTask(w http.ResponseWriter, req *http.Requ return } - err = stopTask(executionConfig.Nodes.Permit[0].ID, id) - if err != nil { - h.WriteError(w, err.Error(), http.StatusInternalServerError) - log.Error(err) - return - } - query := util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "parent_id": util.MapStr{ - "value": id, - }, - }, - }, - { - "term": util.MapStr{ - "metadata.labels.pipeline_id": util.MapStr{ - "value": "index_migration", - }, - }, - }, - { - "terms": util.MapStr{ - "status": []string{task2.StatusRunning, task2.StatusInit}, + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "parent_id": util.MapStr{ + "value": id, }, }, }, + { + "term": util.MapStr{ + "metadata.labels.pipeline_id": util.MapStr{ + "value": "index_migration", + }, + }, + }, + { + "terms": util.MapStr{ + "status": []string{task2.StatusRunning, task2.StatusInit}, + }, + }, }, + }, } //todo reset stat_time? queryDsl := util.MapStr{ @@ -530,6 +491,82 @@ func getTaskConfig(task *task2.Task, config interface{}) error{ return util.FromJSONBytes(configBytes, config) } +func getIndexRefreshInterval(indexNames []string, targetESClient elastic.API)(map[string]string, error){ + const step = 50 + var ( + length = len(indexNames) + end int + ) + refreshIntervals := map[string]string{} + for i := 0; i < length; i += step { + end = i + step + if end > length - 1 { + end = length + } + tempNames := indexNames[i:end] + strNames := strings.Join(tempNames, ",") + resultM, err := targetESClient.GetIndexSettings(strNames) + if err != nil { + return refreshIntervals, nil + } + for indexName, v := range *resultM { + if m, ok := v.(map[string]interface{}); ok { + refreshInterval, _ := util.GetMapValueByKeys([]string{"settings", "index", "refresh_interval"}, m) + if ri, ok := refreshInterval.(string); ok { + refreshIntervals[indexName] = ri + continue + } + refreshInterval, _ = util.GetMapValueByKeys([]string{"defaults", "index", "refresh_interval"}, m) + if ri, ok := refreshInterval.(string); ok { + refreshIntervals[indexName] = ri + continue + } + } + + } + } + return refreshIntervals, nil + +} + +func (h *APIHandler) getIndexRefreshIntervals(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ + id := ps.MustGetParameter("task_id") + + obj := task2.Task{} + obj.ID = id + + exists, err := orm.Get(&obj) + if !exists || err != nil { + h.WriteJSON(w, util.MapStr{ + "_id": id, + "found": false, + }, http.StatusNotFound) + return + } + taskConfig := &ElasticDataConfig{} + err = getTaskConfig(&obj, taskConfig) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + var indexNames []string + for _, index := range taskConfig.Indices { + indexNames = append(indexNames, index.Target.Name) + } + targetESClient := elastic.GetClientNoPanic(taskConfig.Cluster.Target.Id) + if targetESClient == nil { + h.WriteJSON(w, util.MapStr{}, http.StatusOK) + } + vals, err := getIndexRefreshInterval(indexNames, targetESClient) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + h.WriteJSON(w, vals, http.StatusOK) +} + func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ id := ps.MustGetParameter("task_id") @@ -559,6 +596,7 @@ func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.R return } //get status of sub task + //todo size config? query := util.MapStr{ "size": 1000, "query": util.MapStr{ @@ -606,18 +644,17 @@ func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.R } } - var completedIndices int + targetESClient := elastic.GetClientNoPanic(taskConfig.Cluster.Target.Id) for i, index := range taskConfig.Indices { if st, ok := statusM[index.TaskID]; ok { taskConfig.Indices[i].Status = st.(string) } var count = index.Target.Docs if taskConfig.Indices[i].Status != task2.StatusComplete || count == 0 { - targetESClient := elastic.GetClientNoPanic(taskConfig.Cluster.Target.Id) if targetESClient == nil { log.Warnf("cluster [%s] was not found", taskConfig.Cluster.Target.Id) - continue + break } count, err = getIndexTaskDocCount(&index, targetESClient) if err != nil { @@ -627,6 +664,9 @@ func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.R taskConfig.Indices[i].Target.Docs = count } percent := float64(count * 100) / float64(index.Source.Docs) + if percent > 100 { + percent = 100 + } taskConfig.Indices[i].Percent = util.ToFixed(percent, 2) taskConfig.Indices[i].ErrorPartitions = taskErrors[index.TaskID] if count == index.Source.Docs { @@ -634,6 +674,16 @@ func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.R taskConfig.Indices[i].Status = task2.StatusComplete } } + cfg := global.MustLookup("cluster_migration_config") + if migrationConfig, ok := cfg.(*ClusterMigrationConfig); ok { + if obj.Metadata.Labels == nil { + obj.Metadata.Labels = util.MapStr{} + } + obj.Metadata.Labels["log_info"] = util.MapStr{ + "cluster_id": migrationConfig.Elasticsearch, + "index_name": migrationConfig.LogIndexName, + } + } util.MapStr(obj.Parameters).Put("pipeline.config", taskConfig) obj.Metadata.Labels["completed_indices"] = completedIndices h.WriteJSON(w, obj, http.StatusOK) @@ -790,20 +840,14 @@ func getTaskStats(nodeID string) (map[string]interface{}, error){ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ id := ps.MustGetParameter("task_id") indexTask := task2.Task{} - indexTask.ID=id + indexTask.ID = id exists, err := orm.Get(&indexTask) if !exists || err != nil { h.WriteError(w, fmt.Sprintf("task [%s] not found", id), http.StatusInternalServerError) return } - var durationInMS int64 - if indexTask.StartTimeInMillis > 0 { - durationInMS = time.Now().UnixMilli() - indexTask.StartTimeInMillis - if indexTask.CompletedTime != nil && indexTask.Status == task2.StatusComplete { - durationInMS = indexTask.CompletedTime.UnixMilli() - indexTask.StartTimeInMillis - } - } + var durationInMS = indexTask.GetDurationInMS() var completedTime int64 if indexTask.CompletedTime != nil { completedTime = indexTask.CompletedTime.UnixMilli() @@ -892,13 +936,7 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt step, _ := util.MapStr(ptask.Parameters).GetValue("pipeline.config.source.step") taskInfo["step"] = step } - durationInMS = 0 - if ptask.StartTimeInMillis > 0 { - durationInMS = time.Now().UnixMilli() - ptask.StartTimeInMillis - if ptask.CompletedTime != nil && (ptask.Status == task2.StatusComplete || ptask.Status == task2.StatusError) { - durationInMS = ptask.CompletedTime.UnixMilli() - ptask.StartTimeInMillis - } - } + durationInMS = ptask.GetDurationInMS() var ( scrollDocs float64 indexDocs float64 diff --git a/plugin/migration/model.go b/plugin/migration/model.go index ffe5428d..74a4139f 100644 --- a/plugin/migration/model.go +++ b/plugin/migration/model.go @@ -13,14 +13,15 @@ type ElasticDataConfig struct { Settings struct { ParallelIndices int `json:"parallel_indices"` ParallelTaskPerIndex int `json:"parallel_task_per_index"` - ScrollSize struct { + Scroll struct { + SliceSize int `json:"slice_size"` Docs int `json:"docs"` Timeout string `json:"timeout"` - } `json:"scroll_size"` - BulkSize struct { + } `json:"scroll"` + Bulk struct { Docs int `json:"docs"` StoreSizeInMB int `json:"store_size_in_mb"` - } `json:"bulk_size"` + } `json:"bulk"` Execution ExecutionConfig `json:"execution"` } `json:"settings"` Creator struct { diff --git a/plugin/migration/pipeline.go b/plugin/migration/pipeline.go index 039e9ce7..7730ea55 100644 --- a/plugin/migration/pipeline.go +++ b/plugin/migration/pipeline.go @@ -11,6 +11,7 @@ import ( "infini.sh/framework/core/elastic" "infini.sh/framework/core/env" "infini.sh/framework/core/global" + "infini.sh/framework/core/orm" "infini.sh/framework/core/pipeline" task2 "infini.sh/framework/core/task" "infini.sh/framework/core/util" @@ -60,6 +61,7 @@ func newClusterMigrationProcessor(c *config.Config) (pipeline.Processor, error) return nil, err } } + global.Register("cluster_migration_config", &cfg) processor := ClusterMigrationProcessor{ id: util.GetUUID(), @@ -120,7 +122,7 @@ func (p *ClusterMigrationProcessor) Process(ctx *pipeline.Context) error { Action: task2.LogAction{ Parameters: t.Parameters, }, - Content: fmt.Sprintf("starting to execute task [%s]", t.ID), + Message: fmt.Sprintf("starting to execute task [%s]", t.ID), Timestamp: time.Now().UTC(), }) err = p.SplitMigrationTask(&t) @@ -135,12 +137,12 @@ func (p *ClusterMigrationProcessor) Process(ctx *pipeline.Context) error { Success: true, }, }, - Content: fmt.Sprintf("success to split task [%s]", t.ID), + Message: fmt.Sprintf("success to split task [%s]", t.ID), Timestamp: time.Now().UTC(), } if err != nil { taskLog.Status = task2.StatusError - taskLog.Content = fmt.Sprintf("failed to split task [%s]: %v", t.ID, err) + taskLog.Message = fmt.Sprintf("failed to split task [%s]: %v", t.ID, err) taskLog.Action.Result = &task2.LogResult{ Success: false, Error: err.Error(), @@ -182,15 +184,15 @@ func (p *ClusterMigrationProcessor) SplitMigrationTask(taskItem *task2.Task) err }() esSourceClient := elastic.GetClient(clusterMigrationTask.Cluster.Source.Id) esTargetClient := elastic.GetClient(clusterMigrationTask.Cluster.Target.Id) - esClient := elastic.GetClient(p.config.Elasticsearch) + //esClient := elastic.GetClient(p.config.Elasticsearch) for i, index := range clusterMigrationTask.Indices { source := util.MapStr{ "cluster_id": clusterMigrationTask.Cluster.Source.Id, "indices": index.Source.Name, - //"slice_size": 10, - "batch_size": clusterMigrationTask.Settings.ScrollSize.Docs, - "scroll_time": clusterMigrationTask.Settings.ScrollSize.Timeout, + "slice_size": clusterMigrationTask.Settings.Scroll.SliceSize, + "batch_size": clusterMigrationTask.Settings.Scroll.Docs, + "scroll_time": clusterMigrationTask.Settings.Scroll.Timeout, } if index.IndexRename != nil { source["index_rename"] = index.IndexRename @@ -251,11 +253,9 @@ func (p *ClusterMigrationProcessor) SplitMigrationTask(taskItem *task2.Task) err target := util.MapStr{ "cluster_id": clusterMigrationTask.Cluster.Target.Id, - //"max_worker_size": 10, - //"detect_interval": 100, "bulk": util.MapStr{ - "batch_size_in_mb": clusterMigrationTask.Settings.BulkSize.StoreSizeInMB, - "batch_size_in_docs": clusterMigrationTask.Settings.BulkSize.Docs, + "batch_size_in_mb": clusterMigrationTask.Settings.Bulk.StoreSizeInMB, + "batch_size_in_docs": clusterMigrationTask.Settings.Bulk.Docs, }, } indexParameters := map[string]interface{}{ @@ -287,7 +287,7 @@ func (p *ClusterMigrationProcessor) SplitMigrationTask(taskItem *task2.Task) err Parameters: indexParameters, } - indexMigrationTask.ID=util.GetUUID() + indexMigrationTask.ID = util.GetUUID() clusterMigrationTask.Indices[i].TaskID = indexMigrationTask.ID if index.Partition != nil { @@ -386,9 +386,9 @@ func (p *ClusterMigrationProcessor) SplitMigrationTask(taskItem *task2.Task) err }, }, } - partitionMigrationTask.ID=util.GetUUID() - - _, err = esClient.Index(p.config.IndexName, "", partitionMigrationTask.ID, partitionMigrationTask, "") + partitionMigrationTask.ID = util.GetUUID() + err = orm.Create(nil, &partitionMigrationTask) + //_, err = esClient.Index(p.config.IndexName, "", partitionMigrationTask.ID, partitionMigrationTask, "") delete(target, "query_dsl") if err != nil { return fmt.Errorf("store index migration task(partition) error: %w", err) @@ -427,15 +427,15 @@ func (p *ClusterMigrationProcessor) SplitMigrationTask(taskItem *task2.Task) err }, Parameters: indexParameters, } - partitionMigrationTask.ID=util.GetUUID() - - _, err = esClient.Index(p.config.IndexName, "", partitionMigrationTask.ID, partitionMigrationTask, "") + orm.Create(nil, &partitionMigrationTask) + //_, err = esClient.Index(p.config.IndexName, "", partitionMigrationTask.ID, partitionMigrationTask, "") delete(target, "query_dsl") if err != nil { return fmt.Errorf("store index migration task(partition) error: %w", err) } } - _, err = esClient.Index(p.config.IndexName, "", indexMigrationTask.ID, indexMigrationTask, "") + err = orm.Create(nil, &indexMigrationTask) + //_, err = esClient.Index(p.config.IndexName, "", indexMigrationTask.ID, indexMigrationTask, "") if err != nil { return fmt.Errorf("store index migration task error: %w", err) } @@ -504,4 +504,4 @@ func (p *ClusterMigrationProcessor) writeTaskLog(taskItem *task2.Task, logItem * if err != nil{ log.Error(err) } -} +} \ No newline at end of file