Merge branch 'master' of ssh://git.infini.ltd:64221/infini/console

This commit is contained in:
medcl 2023-02-08 10:18:43 +08:00
commit f9245a5c3b
5 changed files with 161 additions and 125 deletions

View File

@ -48,4 +48,7 @@ metrics:
enabled: true
cluster_stats: true
node_stats: true
index_stats: true
index_stats: true
badger:
value_log_max_entries: 1000000
value_log_file_size: 104857600

12
main.go
View File

@ -80,15 +80,9 @@ func main() {
module.RegisterSystemModule(uiModule)
if !global.Env().SetupRequired(){
module.RegisterSystemModule(&stats.SimpleStatsModule{})
module.RegisterSystemModule(&elastic2.ElasticModule{})
module.RegisterSystemModule(&queue2.DiskQueue{})
module.RegisterSystemModule(&redis.RedisModule{})
module.RegisterSystemModule(&pipeline.PipeModule{})
module.RegisterSystemModule(&task.TaskModule{})
module.RegisterSystemModule(&agent.AgentModule{})
module.RegisterSystemModule(&metrics.MetricsModule{})
module.RegisterSystemModule(&security.Module{})
for _, v := range modules {
module.RegisterSystemModule(v)
}
}else{
for _, v := range modules {
v.Setup()

View File

@ -13,6 +13,7 @@ import (
"infini.sh/framework/core/api/rbac"
httprouter "infini.sh/framework/core/api/router"
"infini.sh/framework/core/elastic"
"infini.sh/framework/core/global"
"infini.sh/framework/core/orm"
"infini.sh/framework/core/proxy"
task2 "infini.sh/framework/core/task"
@ -57,14 +58,15 @@ func (h *APIHandler) createDataMigrationTask(w http.ResponseWriter, req *http.Re
h.WriteError(w, "indices must not be empty", http.StatusInternalServerError)
return
}
clusterTaskConfig.Creator = struct {
Name string `json:"name"`
Id string `json:"id"`
}{}
claims, ok := req.Context().Value("user").(*rbac.UserClaims)
if ok {
clusterTaskConfig.Creator.Name = claims.Username
clusterTaskConfig.Creator.Id = claims.ID
user, err := rbac.FromUserContext(req.Context())
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
if user != nil {
clusterTaskConfig.Creator.Name = user.Username
clusterTaskConfig.Creator.Id = user.UserId
}
var totalDocs int64
@ -87,23 +89,18 @@ func (h *APIHandler) createDataMigrationTask(w http.ResponseWriter, req *http.Re
Status: task2.StatusInit,
Parameters: map[string]interface{}{
"pipeline": util.MapStr{
"id": "cluster_migration",
"config": clusterTaskConfig,
},
},
}
t.ID = util.GetUUID()
err = orm.Create(nil, &t)
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err)
return
}
h.WriteJSON(w, util.MapStr{
"_id": t.ID,
"result": "created",
}, 200)
h.WriteCreatedOKJSON(w, t.ID)
}
func (h *APIHandler) searchDataMigrationTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
@ -172,7 +169,7 @@ func (h *APIHandler) searchDataMigrationTask(w http.ResponseWriter, req *http.Re
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
mainLoop:
mainLoop:
for _, hit := range searchRes.Hits.Hits {
sourceM := util.MapStr(hit.Source)
config, err := sourceM.GetValue("parameters.pipeline.config")
@ -188,8 +185,7 @@ func (h *APIHandler) searchDataMigrationTask(w http.ResponseWriter, req *http.Re
continue
}
var targetTotalDocs int64
targetTotal, _ := sourceM.GetValue("metadata.labels.target_total_docs")
if _, ok := targetTotal.(float64); !ok || hit.Source["status"] != task2.StatusComplete {
if hit.Source["status"] == task2.StatusRunning {
esClient := elastic.GetClientNoPanic(dataConfig.Cluster.Target.Id)
if esClient == nil {
log.Warnf("cluster [%s] was not found", dataConfig.Cluster.Target.Id)
@ -255,7 +251,7 @@ func (h *APIHandler) startDataMigration(w http.ResponseWriter, req *http.Request
//root task
obj.Status = task2.StatusReady
}else if obj.Status == task2.StatusStopped {
if obj.Metadata.Labels["level"] == "partition" {
if obj.Metadata.Labels != nil && obj.Metadata.Labels["level"] == "partition" {
obj.Status = task2.StatusReady
//update parent task status
if len(obj.ParentId) == 0 {
@ -386,34 +382,6 @@ func getNodeEndpoint(nodeID string) (string, error){
return "", fmt.Errorf("got unexpect node info: %s", util.MustToJSON(result.Result[0]))
}
func stopTask(nodeID, taskID string) error {
endpoint, err := getNodeEndpoint(nodeID)
if err != nil {
return err
}
res, err := proxy.DoProxyRequest(&proxy.Request{
Method: http.MethodPost,
Endpoint: endpoint,
Path: fmt.Sprintf("/pipeline/task/%s/_stop", taskID),
})
if err != nil {
return fmt.Errorf("call stop task api error: %w", err)
}
resBody := struct {
Acknowledged bool `json:"acknowledged"`
Error string `json:"error"`
}{}
err = util.FromJSONBytes(res.Body, &resBody)
if err != nil {
return err
}
if resBody.Acknowledged {
return nil
}
return fmt.Errorf(resBody.Error)
}
func (h *APIHandler) stopDataMigrationTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
id := ps.MustGetParameter("task_id")
obj := task2.Task{}
@ -451,37 +419,30 @@ func (h *APIHandler) stopDataMigrationTask(w http.ResponseWriter, req *http.Requ
return
}
err = stopTask(executionConfig.Nodes.Permit[0].ID, id)
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err)
return
}
query := util.MapStr{
"bool": util.MapStr{
"must": []util.MapStr{
{
"term": util.MapStr{
"parent_id": util.MapStr{
"value": id,
},
},
},
{
"term": util.MapStr{
"metadata.labels.pipeline_id": util.MapStr{
"value": "index_migration",
},
},
},
{
"terms": util.MapStr{
"status": []string{task2.StatusRunning, task2.StatusInit},
"bool": util.MapStr{
"must": []util.MapStr{
{
"term": util.MapStr{
"parent_id": util.MapStr{
"value": id,
},
},
},
{
"term": util.MapStr{
"metadata.labels.pipeline_id": util.MapStr{
"value": "index_migration",
},
},
},
{
"terms": util.MapStr{
"status": []string{task2.StatusRunning, task2.StatusInit},
},
},
},
},
}
//todo reset stat_time?
queryDsl := util.MapStr{
@ -530,6 +491,82 @@ func getTaskConfig(task *task2.Task, config interface{}) error{
return util.FromJSONBytes(configBytes, config)
}
func getIndexRefreshInterval(indexNames []string, targetESClient elastic.API)(map[string]string, error){
const step = 50
var (
length = len(indexNames)
end int
)
refreshIntervals := map[string]string{}
for i := 0; i < length; i += step {
end = i + step
if end > length - 1 {
end = length
}
tempNames := indexNames[i:end]
strNames := strings.Join(tempNames, ",")
resultM, err := targetESClient.GetIndexSettings(strNames)
if err != nil {
return refreshIntervals, nil
}
for indexName, v := range *resultM {
if m, ok := v.(map[string]interface{}); ok {
refreshInterval, _ := util.GetMapValueByKeys([]string{"settings", "index", "refresh_interval"}, m)
if ri, ok := refreshInterval.(string); ok {
refreshIntervals[indexName] = ri
continue
}
refreshInterval, _ = util.GetMapValueByKeys([]string{"defaults", "index", "refresh_interval"}, m)
if ri, ok := refreshInterval.(string); ok {
refreshIntervals[indexName] = ri
continue
}
}
}
}
return refreshIntervals, nil
}
func (h *APIHandler) getIndexRefreshIntervals(w http.ResponseWriter, req *http.Request, ps httprouter.Params){
id := ps.MustGetParameter("task_id")
obj := task2.Task{}
obj.ID = id
exists, err := orm.Get(&obj)
if !exists || err != nil {
h.WriteJSON(w, util.MapStr{
"_id": id,
"found": false,
}, http.StatusNotFound)
return
}
taskConfig := &ElasticDataConfig{}
err = getTaskConfig(&obj, taskConfig)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
var indexNames []string
for _, index := range taskConfig.Indices {
indexNames = append(indexNames, index.Target.Name)
}
targetESClient := elastic.GetClientNoPanic(taskConfig.Cluster.Target.Id)
if targetESClient == nil {
h.WriteJSON(w, util.MapStr{}, http.StatusOK)
}
vals, err := getIndexRefreshInterval(indexNames, targetESClient)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
h.WriteJSON(w, vals, http.StatusOK)
}
func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params){
id := ps.MustGetParameter("task_id")
@ -559,6 +596,7 @@ func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.R
return
}
//get status of sub task
//todo size config?
query := util.MapStr{
"size": 1000,
"query": util.MapStr{
@ -606,18 +644,17 @@ func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.R
}
}
var completedIndices int
targetESClient := elastic.GetClientNoPanic(taskConfig.Cluster.Target.Id)
for i, index := range taskConfig.Indices {
if st, ok := statusM[index.TaskID]; ok {
taskConfig.Indices[i].Status = st.(string)
}
var count = index.Target.Docs
if taskConfig.Indices[i].Status != task2.StatusComplete || count == 0 {
targetESClient := elastic.GetClientNoPanic(taskConfig.Cluster.Target.Id)
if targetESClient == nil {
log.Warnf("cluster [%s] was not found", taskConfig.Cluster.Target.Id)
continue
break
}
count, err = getIndexTaskDocCount(&index, targetESClient)
if err != nil {
@ -627,6 +664,9 @@ func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.R
taskConfig.Indices[i].Target.Docs = count
}
percent := float64(count * 100) / float64(index.Source.Docs)
if percent > 100 {
percent = 100
}
taskConfig.Indices[i].Percent = util.ToFixed(percent, 2)
taskConfig.Indices[i].ErrorPartitions = taskErrors[index.TaskID]
if count == index.Source.Docs {
@ -634,6 +674,16 @@ func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.R
taskConfig.Indices[i].Status = task2.StatusComplete
}
}
cfg := global.MustLookup("cluster_migration_config")
if migrationConfig, ok := cfg.(*ClusterMigrationConfig); ok {
if obj.Metadata.Labels == nil {
obj.Metadata.Labels = util.MapStr{}
}
obj.Metadata.Labels["log_info"] = util.MapStr{
"cluster_id": migrationConfig.Elasticsearch,
"index_name": migrationConfig.LogIndexName,
}
}
util.MapStr(obj.Parameters).Put("pipeline.config", taskConfig)
obj.Metadata.Labels["completed_indices"] = completedIndices
h.WriteJSON(w, obj, http.StatusOK)
@ -790,20 +840,14 @@ func getTaskStats(nodeID string) (map[string]interface{}, error){
func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *http.Request, ps httprouter.Params){
id := ps.MustGetParameter("task_id")
indexTask := task2.Task{}
indexTask.ID=id
indexTask.ID = id
exists, err := orm.Get(&indexTask)
if !exists || err != nil {
h.WriteError(w, fmt.Sprintf("task [%s] not found", id), http.StatusInternalServerError)
return
}
var durationInMS int64
if indexTask.StartTimeInMillis > 0 {
durationInMS = time.Now().UnixMilli() - indexTask.StartTimeInMillis
if indexTask.CompletedTime != nil && indexTask.Status == task2.StatusComplete {
durationInMS = indexTask.CompletedTime.UnixMilli() - indexTask.StartTimeInMillis
}
}
var durationInMS = indexTask.GetDurationInMS()
var completedTime int64
if indexTask.CompletedTime != nil {
completedTime = indexTask.CompletedTime.UnixMilli()
@ -892,13 +936,7 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt
step, _ := util.MapStr(ptask.Parameters).GetValue("pipeline.config.source.step")
taskInfo["step"] = step
}
durationInMS = 0
if ptask.StartTimeInMillis > 0 {
durationInMS = time.Now().UnixMilli() - ptask.StartTimeInMillis
if ptask.CompletedTime != nil && (ptask.Status == task2.StatusComplete || ptask.Status == task2.StatusError) {
durationInMS = ptask.CompletedTime.UnixMilli() - ptask.StartTimeInMillis
}
}
durationInMS = ptask.GetDurationInMS()
var (
scrollDocs float64
indexDocs float64

View File

@ -13,14 +13,15 @@ type ElasticDataConfig struct {
Settings struct {
ParallelIndices int `json:"parallel_indices"`
ParallelTaskPerIndex int `json:"parallel_task_per_index"`
ScrollSize struct {
Scroll struct {
SliceSize int `json:"slice_size"`
Docs int `json:"docs"`
Timeout string `json:"timeout"`
} `json:"scroll_size"`
BulkSize struct {
} `json:"scroll"`
Bulk struct {
Docs int `json:"docs"`
StoreSizeInMB int `json:"store_size_in_mb"`
} `json:"bulk_size"`
} `json:"bulk"`
Execution ExecutionConfig `json:"execution"`
} `json:"settings"`
Creator struct {

View File

@ -11,6 +11,7 @@ import (
"infini.sh/framework/core/elastic"
"infini.sh/framework/core/env"
"infini.sh/framework/core/global"
"infini.sh/framework/core/orm"
"infini.sh/framework/core/pipeline"
task2 "infini.sh/framework/core/task"
"infini.sh/framework/core/util"
@ -60,6 +61,7 @@ func newClusterMigrationProcessor(c *config.Config) (pipeline.Processor, error)
return nil, err
}
}
global.Register("cluster_migration_config", &cfg)
processor := ClusterMigrationProcessor{
id: util.GetUUID(),
@ -120,7 +122,7 @@ func (p *ClusterMigrationProcessor) Process(ctx *pipeline.Context) error {
Action: task2.LogAction{
Parameters: t.Parameters,
},
Content: fmt.Sprintf("starting to execute task [%s]", t.ID),
Message: fmt.Sprintf("starting to execute task [%s]", t.ID),
Timestamp: time.Now().UTC(),
})
err = p.SplitMigrationTask(&t)
@ -135,12 +137,12 @@ func (p *ClusterMigrationProcessor) Process(ctx *pipeline.Context) error {
Success: true,
},
},
Content: fmt.Sprintf("success to split task [%s]", t.ID),
Message: fmt.Sprintf("success to split task [%s]", t.ID),
Timestamp: time.Now().UTC(),
}
if err != nil {
taskLog.Status = task2.StatusError
taskLog.Content = fmt.Sprintf("failed to split task [%s]: %v", t.ID, err)
taskLog.Message = fmt.Sprintf("failed to split task [%s]: %v", t.ID, err)
taskLog.Action.Result = &task2.LogResult{
Success: false,
Error: err.Error(),
@ -182,15 +184,15 @@ func (p *ClusterMigrationProcessor) SplitMigrationTask(taskItem *task2.Task) err
}()
esSourceClient := elastic.GetClient(clusterMigrationTask.Cluster.Source.Id)
esTargetClient := elastic.GetClient(clusterMigrationTask.Cluster.Target.Id)
esClient := elastic.GetClient(p.config.Elasticsearch)
//esClient := elastic.GetClient(p.config.Elasticsearch)
for i, index := range clusterMigrationTask.Indices {
source := util.MapStr{
"cluster_id": clusterMigrationTask.Cluster.Source.Id,
"indices": index.Source.Name,
//"slice_size": 10,
"batch_size": clusterMigrationTask.Settings.ScrollSize.Docs,
"scroll_time": clusterMigrationTask.Settings.ScrollSize.Timeout,
"slice_size": clusterMigrationTask.Settings.Scroll.SliceSize,
"batch_size": clusterMigrationTask.Settings.Scroll.Docs,
"scroll_time": clusterMigrationTask.Settings.Scroll.Timeout,
}
if index.IndexRename != nil {
source["index_rename"] = index.IndexRename
@ -251,11 +253,9 @@ func (p *ClusterMigrationProcessor) SplitMigrationTask(taskItem *task2.Task) err
target := util.MapStr{
"cluster_id": clusterMigrationTask.Cluster.Target.Id,
//"max_worker_size": 10,
//"detect_interval": 100,
"bulk": util.MapStr{
"batch_size_in_mb": clusterMigrationTask.Settings.BulkSize.StoreSizeInMB,
"batch_size_in_docs": clusterMigrationTask.Settings.BulkSize.Docs,
"batch_size_in_mb": clusterMigrationTask.Settings.Bulk.StoreSizeInMB,
"batch_size_in_docs": clusterMigrationTask.Settings.Bulk.Docs,
},
}
indexParameters := map[string]interface{}{
@ -287,7 +287,7 @@ func (p *ClusterMigrationProcessor) SplitMigrationTask(taskItem *task2.Task) err
Parameters: indexParameters,
}
indexMigrationTask.ID=util.GetUUID()
indexMigrationTask.ID = util.GetUUID()
clusterMigrationTask.Indices[i].TaskID = indexMigrationTask.ID
if index.Partition != nil {
@ -386,9 +386,9 @@ func (p *ClusterMigrationProcessor) SplitMigrationTask(taskItem *task2.Task) err
},
},
}
partitionMigrationTask.ID=util.GetUUID()
_, err = esClient.Index(p.config.IndexName, "", partitionMigrationTask.ID, partitionMigrationTask, "")
partitionMigrationTask.ID = util.GetUUID()
err = orm.Create(nil, &partitionMigrationTask)
//_, err = esClient.Index(p.config.IndexName, "", partitionMigrationTask.ID, partitionMigrationTask, "")
delete(target, "query_dsl")
if err != nil {
return fmt.Errorf("store index migration task(partition) error: %w", err)
@ -427,15 +427,15 @@ func (p *ClusterMigrationProcessor) SplitMigrationTask(taskItem *task2.Task) err
},
Parameters: indexParameters,
}
partitionMigrationTask.ID=util.GetUUID()
_, err = esClient.Index(p.config.IndexName, "", partitionMigrationTask.ID, partitionMigrationTask, "")
orm.Create(nil, &partitionMigrationTask)
//_, err = esClient.Index(p.config.IndexName, "", partitionMigrationTask.ID, partitionMigrationTask, "")
delete(target, "query_dsl")
if err != nil {
return fmt.Errorf("store index migration task(partition) error: %w", err)
}
}
_, err = esClient.Index(p.config.IndexName, "", indexMigrationTask.ID, indexMigrationTask, "")
err = orm.Create(nil, &indexMigrationTask)
//_, err = esClient.Index(p.config.IndexName, "", indexMigrationTask.ID, indexMigrationTask, "")
if err != nil {
return fmt.Errorf("store index migration task error: %w", err)
}
@ -504,4 +504,4 @@ func (p *ClusterMigrationProcessor) writeTaskLog(taskItem *task2.Task, logItem *
if err != nil{
log.Error(err)
}
}
}