From 8977808d30bee48c72c1f33d2e444c969b7f0f67 Mon Sep 17 00:00:00 2001 From: liugq Date: Fri, 14 Oct 2022 16:55:48 +0800 Subject: [PATCH 1/3] init migration --- main.go | 7 ++- model/gateway/instance.go | 2 +- plugin/api/gateway/api.go | 2 + plugin/api/gateway/instance.go | 88 ++++++++++++++++++++++++++++++++++ 4 files changed, 96 insertions(+), 3 deletions(-) diff --git a/main.go b/main.go index 3145e2bf..8c31eadb 100644 --- a/main.go +++ b/main.go @@ -16,11 +16,12 @@ import ( _ "infini.sh/framework/core/log" "infini.sh/framework/core/module" "infini.sh/framework/core/orm" + task2 "infini.sh/framework/core/task" "infini.sh/framework/modules/agent" _ "infini.sh/framework/modules/api" elastic2 "infini.sh/framework/modules/elastic" - "infini.sh/framework/modules/filter" "infini.sh/framework/modules/metrics" + "infini.sh/framework/modules/migration" "infini.sh/framework/modules/pipeline" queue2 "infini.sh/framework/modules/queue/disk_queue" "infini.sh/framework/modules/redis" @@ -67,7 +68,6 @@ func main() { } //load core modules first - module.RegisterSystemModule(&filter.FilterModule{}) module.RegisterSystemModule(&elastic2.ElasticModule{}) module.RegisterSystemModule(&stats.SimpleStatsModule{}) module.RegisterSystemModule(&queue2.DiskQueue{}) @@ -76,6 +76,7 @@ func main() { module.RegisterSystemModule(&pipeline.PipeModule{}) module.RegisterSystemModule(&task.TaskModule{}) module.RegisterSystemModule(&agent.AgentModule{}) + module.RegisterSystemModule(&migration.MigrationModule{}) module.RegisterUserPlugin(&metrics.MetricsModule{}) @@ -118,6 +119,8 @@ func main() { orm.RegisterSchemaWithIndexName(alerting.Channel{}, "channel") orm.RegisterSchemaWithIndexName(insight.Visualization{}, "visualization") orm.RegisterSchemaWithIndexName(insight.Dashboard{}, "dashboard") + orm.RegisterSchemaWithIndexName(task2.Task{}, "task") + orm.RegisterSchemaWithIndexName(task2.Log{}, "task-log") api.RegisterSchema() go func() { diff --git a/model/gateway/instance.go b/model/gateway/instance.go index 73977adb..59c32005 100644 --- a/model/gateway/instance.go +++ b/model/gateway/instance.go @@ -13,7 +13,7 @@ import ( type Instance struct { orm.ORMObjectBase - InstanceID string `json:"instance_id,omitempty" elastic_mapping:"instance_id: { type: keyword }"` + //InstanceID string `json:"instance_id,omitempty" elastic_mapping:"instance_id: { type: keyword }"` Name string `json:"name,omitempty" elastic_mapping:"name:{type:keyword,fields:{text: {type: text}}}"` Endpoint string `json:"endpoint,omitempty" elastic_mapping:"endpoint: { type: keyword }"` Version map[string]interface{} `json:"version,omitempty" elastic_mapping:"version: { type: object }"` diff --git a/plugin/api/gateway/api.go b/plugin/api/gateway/api.go index d41a9524..23c6929c 100644 --- a/plugin/api/gateway/api.go +++ b/plugin/api/gateway/api.go @@ -24,4 +24,6 @@ func init() { api.HandleAPIMethod(api.POST, "/gateway/instance/status", gateway.RequirePermission(gateway.getInstanceStatus, enum.PermissionGatewayInstanceRead)) api.HandleAPIMethod(api.POST, "/gateway/instance/:instance_id/_proxy", gateway.RequirePermission(gateway.proxy, enum.PermissionGatewayInstanceRead)) + + api.HandleAPIMethod(api.GET, "/_platform/nodes", gateway.getExecutionNodes) } diff --git a/plugin/api/gateway/instance.go b/plugin/api/gateway/instance.go index 9e09f7f1..0c1423a2 100644 --- a/plugin/api/gateway/instance.go +++ b/plugin/api/gateway/instance.go @@ -14,6 +14,7 @@ import ( "infini.sh/framework/core/orm" "infini.sh/framework/core/proxy" "infini.sh/framework/core/util" + "infini.sh/framework/modules/elastic" "net/http" "strconv" "strings" @@ -28,6 +29,24 @@ func (h *GatewayAPI) createInstance(w http.ResponseWriter, req *http.Request, ps return } + res, err := h.doConnect(obj.Endpoint, obj.BasicAuth) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + obj.ID = res.ID + + exists, err := orm.Get(obj) + if err != nil && err != elastic.ErrNotFound { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + if exists { + h.WriteError(w, "gateway instance already registered", http.StatusInternalServerError) + return + } err = orm.Create(obj) if err != nil { h.WriteError(w, err.Error(), http.StatusInternalServerError) @@ -329,4 +348,73 @@ func (h *GatewayAPI) tryConnect(w http.ResponseWriter, req *http.Request, ps htt return } h.WriteJSON(w, connectRes, http.StatusOK) +} + +func (h *GatewayAPI) getExecutionNodes(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ + query := util.MapStr{ + "size": 1000, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "enrolled": util.MapStr{ + "value": true, + }, + }, + }, + { + "term": util.MapStr{ + "status": util.MapStr{ + "value": "online", + }, + }, + }, + }, + }, + }, + } + q := orm.Query{ + RawQuery: util.MustToJSONBytes(query), + } + err, result := orm.Search(agent.Instance{}, &q) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + var nodes []util.MapStr + //nodes from agent + for _, row := range result.Result { + if rowM, ok := row.(map[string]interface{}); ok { + nodes = append(nodes, util.MapStr{ + "id": rowM["id"], + "name": rowM["name"], + "type": "agent", + }) + } + } + + q = orm.Query{ + Size: 1000, + } + err, result = orm.Search(gateway.Instance{}, &q) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + //nodes from gateway + for _, row := range result.Result { + if rowM, ok := row.(map[string]interface{}); ok { + nodes = append(nodes, util.MapStr{ + "id": rowM["id"], + "name": rowM["name"], + "type": "gateway", + }) + } + } + h.WriteJSON(w, nodes, http.StatusOK) } \ No newline at end of file From 516e7462ac19a45ee9323148458a4ef4c7b29a55 Mon Sep 17 00:00:00 2001 From: liugq Date: Tue, 25 Oct 2022 16:07:43 +0800 Subject: [PATCH 2/3] fixed merge master conflicts --- config/system_config.tpl | 1 + console.yml | 1 + main.go | 19 ++- plugin/api/gateway/instance.go | 50 +++++++ plugin/api/insight/metric_util.go | 5 + plugin/setup/setup.go | 164 ++++++++++++----------- service/alerting/elasticsearch/engine.go | 5 + 7 files changed, 154 insertions(+), 91 deletions(-) diff --git a/config/system_config.tpl b/config/system_config.tpl index e4ae273e..694ccdbc 100644 --- a/config/system_config.tpl +++ b/config/system_config.tpl @@ -2,6 +2,7 @@ elasticsearch: - id: $[[CLUSTER_ID]] name: $[[CLUSTER_ID]] + version: $[[CLUSTER_VER]] enabled: true monitored: true reserved: true diff --git a/console.yml b/console.yml index 23116695..62070ed8 100644 --- a/console.yml +++ b/console.yml @@ -1,4 +1,5 @@ path.configs: "config" +configs.auto_reload: true web: enabled: true diff --git a/main.go b/main.go index 6611bae3..d5b0fc09 100644 --- a/main.go +++ b/main.go @@ -81,7 +81,7 @@ func main() { module.RegisterSystemModule(&setup1.Module{}) module.RegisterSystemModule(uiModule) - var initFunc= func() { + if !global.Env().SetupRequired(){ module.RegisterSystemModule(&stats.SimpleStatsModule{}) module.RegisterSystemModule(&elastic2.ElasticModule{}) module.RegisterSystemModule(&queue2.DiskQueue{}) @@ -92,15 +92,10 @@ func main() { module.RegisterSystemModule(&metrics.MetricsModule{}) module.RegisterSystemModule(&security.Module{}) module.RegisterSystemModule(&migration.MigrationModule{}) - } - - if !global.Env().SetupRequired(){ - initFunc() }else{ for _, v := range modules { v.Setup() } - setup1.RegisterSetupCallback(initFunc) } api.RegisterAPI("") @@ -130,11 +125,7 @@ func main() { module.Start() var initFunc= func() { - if global.Env().SetupRequired() { - for _, v := range modules { - v.Start() - } - } + elastic2.InitTemplate(false) @@ -153,6 +144,12 @@ func main() { orm.RegisterSchemaWithIndexName(task1.Log{}, "task-log") api.RegisterSchema() + if global.Env().SetupRequired() { + for _, v := range modules { + v.Start() + } + } + task1.RunWithinGroup("initialize_alerting",func(ctx context.Context) error { err := alerting2.InitTasks() if err != nil { diff --git a/plugin/api/gateway/instance.go b/plugin/api/gateway/instance.go index 0c1423a2..b12585ac 100644 --- a/plugin/api/gateway/instance.go +++ b/plugin/api/gateway/instance.go @@ -417,4 +417,54 @@ func (h *GatewayAPI) getExecutionNodes(w http.ResponseWriter, req *http.Request, } } h.WriteJSON(w, nodes, http.StatusOK) +} + +func (h *GatewayAPI) getExecutionNodesNew(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ + var ( + keyword = h.GetParameterOrDefault(req, "keyword", "") + strSize = h.GetParameterOrDefault(req, "size", "10") + strFrom = h.GetParameterOrDefault(req, "from", "0") + ) + size, _ := strconv.Atoi(strSize) + if size <= 0 { + size = 10 + } + from, _ := strconv.Atoi(strFrom) + if from < 0 { + from = 0 + } + query := util.MapStr{ + "size": size, + "sort": []util.MapStr{ + { + "created": util.MapStr{ + "order": "desc", + }, + }, + }, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "enrolled": util.MapStr{ + "value": true, + }, + }, + }, + { + "term": util.MapStr{ + "status": util.MapStr{ + "value": "online", + }, + }, + }, + }, + }, + }, + } + q := orm.Query{ + RawQuery: util.MustToJSONBytes(query), + } + _ = q } \ No newline at end of file diff --git a/plugin/api/insight/metric_util.go b/plugin/api/insight/metric_util.go index aa26e5d9..cf2b63d7 100644 --- a/plugin/api/insight/metric_util.go +++ b/plugin/api/insight/metric_util.go @@ -76,6 +76,11 @@ func GenerateQuery(metric *insight.Metric) (interface{}, error) { } } targetESVersion := elastic.GetMetadata(metric.ClusterId).Config.Version + + if targetESVersion==""{ + panic("invalid version") + } + intervalField, err := elastic.GetDateHistogramIntervalField(targetESVersion, metric.BucketSize) if err != nil { return nil, fmt.Errorf("get interval field error: %w", err) diff --git a/plugin/setup/setup.go b/plugin/setup/setup.go index b4d7984c..a4885128 100644 --- a/plugin/setup/setup.go +++ b/plugin/setup/setup.go @@ -80,6 +80,7 @@ type SetupRequest struct { Password string `json:"password"` } `json:"cluster"` + Skip bool `json:"skip"` BootstrapUsername string `json:"bootstrap_username"` BootstrapPassword string `json:"bootstrap_password"` } @@ -249,15 +250,17 @@ func (module *Module) initTempClient(r *http.Request) (error, elastic.API,SetupR cfg.ID = tempID cfg.Name = "INFINI_SYSTEM ("+util.PickRandomName()+")" - elastic.InitMetadata(&cfg, true) + client, err := elastic1.InitClientWithConfig(cfg) if err != nil { return err,nil,request } - + cfg.Version=client.GetVersion() + meta:=elastic.InitMetadata(&cfg, true) + meta.Config.Version=cfg.Version + elastic.SetMetadata(tempID,meta) elastic.UpdateConfig(cfg) elastic.UpdateClient(cfg, client) - cfg.Version=client.GetVersion() global.Register(elastic.GlobalSystemElasticsearchID,tempID) return err, client,request @@ -336,93 +339,102 @@ func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps http handler := elastic2.ElasticORM{Client: client, Config:cfg1 } orm.Register("elastic_setup_"+util.GetUUID(), handler) - //处理模版 - elastic2.InitTemplate(true) + if !request.Skip{ + //处理模版 + elastic2.InitTemplate(true) - //处理生命周期 - //TEMPLATE_NAME - //INDEX_PREFIX - dslTplFile:=path.Join(global.Env().GetConfigDir(),"initialization.tpl") - dslFile:=path.Join(global.Env().GetConfigDir(),"initialization.dsl") + //处理生命周期 + //TEMPLATE_NAME + //INDEX_PREFIX + dslTplFile:=path.Join(global.Env().GetConfigDir(),"initialization.tpl") + dslFile:=path.Join(global.Env().GetConfigDir(),"initialization.dsl") - var dsl []byte - dsl,err=util.FileGetContent(dslTplFile) - if err!=nil{ - panic(err) - } - - var dslWriteSuccess=false - if len(dsl)>0{ - var tpl *fasttemplate.Template - tpl,err=fasttemplate.NewTemplate(string(dsl), "$[[", "]]") + var dsl []byte + dsl,err=util.FileGetContent(dslTplFile) if err!=nil{ panic(err) } - if tpl!=nil{ - output:=tpl.ExecuteFuncString(func(w io.Writer, tag string) (int, error) { - switch tag { - case "TEMPLATE_NAME": - return w.Write([]byte(cfg1.TemplateName)) - case "INDEX_PREFIX": - return w.Write([]byte(cfg1.IndexPrefix)) - case "RESOURCE_ID": - return w.Write([]byte(cfg.ID)) - case "RESOURCE_NAME": - return w.Write([]byte(cfg.Name)) - } - panic(errors.Errorf("unknown tag: %v",tag)) - }) - _,err=util.FilePutContent(dslFile,output) + + var dslWriteSuccess=false + if len(dsl)>0{ + var tpl *fasttemplate.Template + tpl,err=fasttemplate.NewTemplate(string(dsl), "$[[", "]]") if err!=nil{ panic(err) } - dslWriteSuccess=true + if tpl!=nil{ + output:=tpl.ExecuteFuncString(func(w io.Writer, tag string) (int, error) { + switch tag { + case "TEMPLATE_NAME": + return w.Write([]byte(cfg1.TemplateName)) + case "INDEX_PREFIX": + return w.Write([]byte(cfg1.IndexPrefix)) + case "RESOURCE_ID": + return w.Write([]byte(cfg.ID)) + case "RESOURCE_NAME": + return w.Write([]byte(cfg.Name)) + } + panic(errors.Errorf("unknown tag: %v",tag)) + }) + _,err=util.FilePutContent(dslFile,output) + if err!=nil{ + panic(err) + } + dslWriteSuccess=true + } } - } - if dslWriteSuccess{ - lines := util.FileGetLines(dslFile) - _,err,_:=replay.ReplayLines(pipeline.AcquireContext(),lines,cfg.Schema,cfg.Host) - if err!=nil{ - log.Error(err) + if dslWriteSuccess{ + lines := util.FileGetLines(dslFile) + _,err,_:=replay.ReplayLines(pipeline.AcquireContext(),lines,cfg.Schema,cfg.Host) + if err!=nil{ + log.Error(err) + } } - } + //处理索引 + elastic2.InitSchema() + //init security + security.InitSecurity() - //处理索引 - elastic2.InitSchema() - //init security - security.InitSecurity() - - //保存默认集群 - err=orm.Save(&cfg) - if err!=nil{ - panic(err) - } - - if request.BootstrapUsername!=""&&request.BootstrapPassword!=""{ - //Save bootstrap user - user:=rbac.User{} - user.ID="default_user_"+request.BootstrapUsername - user.Name=request.BootstrapUsername - user.NickName=request.BootstrapUsername - var hash []byte - hash, err = bcrypt.GenerateFromPassword([]byte(request.BootstrapPassword), bcrypt.DefaultCost) + //保存默认集群 + err=orm.Save(&cfg) if err!=nil{ panic(err) } - user.Password=string(hash) - role:=[]rbac.UserRole{} - role=append(role,rbac.UserRole{ - ID: rbac.RoleAdminName, - Name: rbac.RoleAdminName, - }) - user.Roles=role - err=orm.Save(&user) + if request.BootstrapUsername!=""&&request.BootstrapPassword!=""{ + //Save bootstrap user + user:=rbac.User{} + user.ID="default_user_"+request.BootstrapUsername + user.Name=request.BootstrapUsername + user.NickName=request.BootstrapUsername + var hash []byte + hash, err = bcrypt.GenerateFromPassword([]byte(request.BootstrapPassword), bcrypt.DefaultCost) + if err!=nil{ + panic(err) + } + + user.Password=string(hash) + role:=[]rbac.UserRole{} + role=append(role,rbac.UserRole{ + ID: rbac.RoleAdminName, + Name: rbac.RoleAdminName, + }) + user.Roles=role + err=orm.Save(&user) + if err!=nil{ + panic(err) + } + } + + + //disable builtin auth + err=api.DisableBuiltinUserAdmin() if err!=nil{ panic(err) } + } @@ -430,23 +442,15 @@ func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps http file:=path.Join(global.Env().GetConfigDir(),"system_config.yml") _,err=util.FilePutContent(file,fmt.Sprintf("configs.template:\n - name: \"system\"\n path: ./config/system_config.tpl\n variable:\n " + "CLUSTER_ID: %v\n CLUSTER_ENDPINT: \"%v\"\n " + - "CLUSTER_USER: \"%v\"\n CLUSTER_PASS: \"%v\"\n INDEX_PREFIX: \"%v\"", - tempID,cfg.Endpoint,cfg.BasicAuth.Username,cfg.BasicAuth.Password,cfg1.IndexPrefix )) + "CLUSTER_USER: \"%v\"\n CLUSTER_PASS: \"%v\"\n CLUSTER_VER: \"%v\"\n INDEX_PREFIX: \"%v\"", + tempID,cfg.Endpoint,cfg.BasicAuth.Username,cfg.BasicAuth.Password,cfg.Version,cfg1.IndexPrefix )) if err!=nil{ panic(err) } - //处理 ILM - //callback InvokeSetupCallback() - //disable builtin auth - err=api.DisableBuiltinUserAdmin() - if err!=nil{ - panic(err) - } - //place setup lock file setupLock:=path.Join(global.Env().GetDataDir(),".setup_lock") _,err=util.FilePutContent(setupLock,time.Now().String()) diff --git a/service/alerting/elasticsearch/engine.go b/service/alerting/elasticsearch/engine.go index 8aeb8363..81dfd1cf 100644 --- a/service/alerting/elasticsearch/engine.go +++ b/service/alerting/elasticsearch/engine.go @@ -62,6 +62,11 @@ func (engine *Engine) GenerateQuery(rule *alerting.Rule, filterParam *alerting.F if filterParam != nil && filterParam.BucketSize != "" { periodInterval = filterParam.BucketSize } + + if targetESVersion==""{ + panic("invalid version") + } + intervalField, err := elastic.GetDateHistogramIntervalField(targetESVersion, periodInterval ) if err != nil { return nil, fmt.Errorf("get interval field error: %w", err) From 362129d9f223a5eb350687af190d09b94266d0f2 Mon Sep 17 00:00:00 2001 From: liugq Date: Tue, 25 Oct 2022 17:36:01 +0800 Subject: [PATCH 3/3] fixed setup conflicts --- plugin/setup/setup.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/plugin/setup/setup.go b/plugin/setup/setup.go index a4885128..5671589e 100644 --- a/plugin/setup/setup.go +++ b/plugin/setup/setup.go @@ -250,17 +250,15 @@ func (module *Module) initTempClient(r *http.Request) (error, elastic.API,SetupR cfg.ID = tempID cfg.Name = "INFINI_SYSTEM ("+util.PickRandomName()+")" - + elastic.InitMetadata(&cfg, true) client, err := elastic1.InitClientWithConfig(cfg) if err != nil { return err,nil,request } - cfg.Version=client.GetVersion() - meta:=elastic.InitMetadata(&cfg, true) - meta.Config.Version=cfg.Version - elastic.SetMetadata(tempID,meta) + elastic.UpdateConfig(cfg) elastic.UpdateClient(cfg, client) + cfg.Version=client.GetVersion() global.Register(elastic.GlobalSystemElasticsearchID,tempID) return err, client,request