diff --git a/plugin/setup/setup.go b/plugin/setup/setup.go index 2c04eda7..dcc4cf20 100644 --- a/plugin/setup/setup.go +++ b/plugin/setup/setup.go @@ -5,6 +5,14 @@ import ( "crypto/md5" "encoding/hex" "fmt" + "io" + "net/http" + uri2 "net/url" + "path" + "path/filepath" + "runtime" + "time" + log "github.com/cihub/seelog" "github.com/valyala/fasttemplate" "golang.org/x/crypto/bcrypt" @@ -27,13 +35,6 @@ import ( elastic1 "infini.sh/framework/modules/elastic/common" "infini.sh/framework/modules/security" "infini.sh/framework/plugins/replay" - "io" - "net/http" - uri2 "net/url" - "path" - "path/filepath" - "runtime" - "time" ) type Module struct { @@ -44,13 +45,13 @@ func (module *Module) Name() string { return "setup" } -func init() { +func init() { module.RegisterSystemModule(&Module{}) } func (module *Module) Setup() { - if !global.Env().SetupRequired(){ + if !global.Env().SetupRequired() { return } @@ -59,13 +60,14 @@ func (module *Module) Setup() { elastic3.InitTestAPI() } -var setupFinishedCallback= []func() {} +var setupFinishedCallback = []func(){} + func RegisterSetupCallback(f func()) { - setupFinishedCallback=append(setupFinishedCallback,f) + setupFinishedCallback = append(setupFinishedCallback, f) } -func InvokeSetupCallback() { - for _,v:=range setupFinishedCallback{ +func InvokeSetupCallback() { + for _, v := range setupFinishedCallback { v() } } @@ -100,47 +102,47 @@ func (module *Module) Stop() error { type SetupRequest struct { Cluster struct { - Host string `json:"host"` - Schema string `json:"schema"` + Host string `json:"host"` + Schema string `json:"schema"` Endpoint string `json:"endpoint"` Username string `json:"username"` Password string `json:"password"` } `json:"cluster"` - Skip bool `json:"skip"` + Skip bool `json:"skip"` BootstrapUsername string `json:"bootstrap_username"` BootstrapPassword string `json:"bootstrap_password"` - CredentialSecret string `json:"credential_secret"` + CredentialSecret string `json:"credential_secret"` } -var GlobalSystemElasticsearchID="infini_default_system_cluster" +var GlobalSystemElasticsearchID = "infini_default_system_cluster" -const VersionTooOld ="elasticsearch_version_too_old" -const IndicesExists ="elasticsearch_indices_exists" -const TemplateExists ="elasticsearch_template_exists" -const VersionNotSupport ="unknown_cluster_version" +const VersionTooOld = "elasticsearch_version_too_old" +const IndicesExists = "elasticsearch_indices_exists" +const TemplateExists = "elasticsearch_template_exists" +const VersionNotSupport = "unknown_cluster_version" var cfg1 elastic1.ORMConfig func (module *Module) validate(w http.ResponseWriter, r *http.Request, ps httprouter.Params) { - if !global.Env().SetupRequired(){ + if !global.Env().SetupRequired() { module.WriteError(w, "setup not permitted", 500) return } - success:=false + success := false var err error var errType string var fixTips string var code int - code=200 + code = 200 defer func() { global.Env().CheckSetup() result := util.MapStr{} - result["success"]=success + result["success"] = success if r := recover(); r != nil { var v string @@ -152,32 +154,31 @@ func (module *Module) validate(w http.ResponseWriter, r *http.Request, ps httpro case string: v = r.(string) } - if v!=""{ - success=false - result["error"]=util.MapStr{ - "reason":v, + if v != "" { + success = false + result["error"] = util.MapStr{ + "reason": v, } - if errType!=""{ - result["type"]=errType + if errType != "" { + result["type"] = errType } - if fixTips!=""{ - result["fix_tips"]=fixTips + if fixTips != "" { + result["fix_tips"] = fixTips } - code=500 + code = 500 } } module.WriteJSON(w, result, code) }() - - err, client,_ := module.initTempClient(r) - if err!=nil{ + err, client, _ := module.initTempClient(r) + if err != nil { panic(err) } //validate version verInfo, err := adapter.ClusterVersion(elastic.GetMetadata(cfg.ID)) - if verInfo.Version.Distribution == elastic.Elasticsarch { + if verInfo.Version.Distribution == elastic.Elasticsearch { if verInfo.Version.Number != "" { ver := &util.Version{} ver, err = util.ParseSemantic(verInfo.Version.Number) @@ -192,7 +193,7 @@ func (module *Module) validate(w http.ResponseWriter, r *http.Request, ps httpro panic(errors.Errorf("elasticsearch version(%v) should greater than v5.3", verInfo.Version.Number)) } } - }else if verInfo.Version.Distribution != elastic.Easysearch && verInfo.Version.Distribution != elastic.Opensearch { + } else if verInfo.Version.Distribution != elastic.Easysearch && verInfo.Version.Distribution != elastic.Opensearch { errType = VersionNotSupport panic(errors.Errorf("unknown distribution (%v)", verInfo.Version.Distribution)) } @@ -202,11 +203,11 @@ func (module *Module) validate(w http.ResponseWriter, r *http.Request, ps httpro panic(err) } - if cfg1.IndexPrefix==""{ - cfg1.IndexPrefix=".infini_" + if cfg1.IndexPrefix == "" { + cfg1.IndexPrefix = ".infini_" } - if cfg1.TemplateName==""{ - cfg1.TemplateName=".infini" + if cfg1.TemplateName == "" { + cfg1.TemplateName = ".infini" } //validate indices @@ -227,7 +228,7 @@ func (module *Module) validate(w http.ResponseWriter, r *http.Request, ps httpro tipBuff.WriteString("\n") } errType = IndicesExists - fixTips=tipBuff.String() + fixTips = tipBuff.String() panic(errors.Errorf("there are following indices exists in target elasticsearch: \n%v", buff.String())) } @@ -237,27 +238,29 @@ func (module *Module) validate(w http.ResponseWriter, r *http.Request, ps httpro } if ok { errType = TemplateExists - fixTips="DELETE /_template/"+util.TrimSpaces(cfg1.TemplateName) + fixTips = "DELETE /_template/" + util.TrimSpaces(cfg1.TemplateName) panic(errors.Errorf("there are following template already exists in target elasticsearch: \n%v", cfg1.TemplateName)) } success = true } + var cfg elastic.ElasticsearchConfig -func (module *Module) initTempClient(r *http.Request) (error, elastic.API,SetupRequest) { + +func (module *Module) initTempClient(r *http.Request) (error, elastic.API, SetupRequest) { request := SetupRequest{} err := module.DecodeJSON(r, &request) if err != nil { - return err,nil,request + return err, nil, request } - if request.Cluster.Endpoint==""&&request.Cluster.Host==""{ + if request.Cluster.Endpoint == "" && request.Cluster.Host == "" { panic("invalid configuration") } - if request.Cluster.Endpoint==""{ - if request.Cluster.Host!=""&&request.Cluster.Schema!=""{ - request.Cluster.Endpoint=fmt.Sprintf("%v://%v",request.Cluster.Schema,request.Cluster.Host) + if request.Cluster.Endpoint == "" { + if request.Cluster.Host != "" && request.Cluster.Schema != "" { + request.Cluster.Endpoint = fmt.Sprintf("%v://%v", request.Cluster.Schema, request.Cluster.Host) } } @@ -271,31 +274,30 @@ func (module *Module) initTempClient(r *http.Request) (error, elastic.API,SetupR }, } - if cfg.Endpoint!=""&&cfg.Host==""{ - uri,err:=uri2.Parse(cfg.Endpoint) - if err!=nil{ + if cfg.Endpoint != "" && cfg.Host == "" { + uri, err := uri2.Parse(cfg.Endpoint) + if err != nil { panic(err) } - cfg.Host=uri.Host - cfg.Schema=uri.Scheme + cfg.Host = uri.Host + cfg.Schema = uri.Scheme } - cfg.ID = GlobalSystemElasticsearchID - cfg.Name = "INFINI_SYSTEM ("+util.PickRandomName()+")" + cfg.Name = "INFINI_SYSTEM (" + util.PickRandomName() + ")" elastic.InitMetadata(&cfg, true) client, err := elastic1.InitClientWithConfig(cfg) if err != nil { - return err,nil,request + return err, nil, request } - global.Register(elastic.GlobalSystemElasticsearchID,GlobalSystemElasticsearchID) + global.Register(elastic.GlobalSystemElasticsearchID, GlobalSystemElasticsearchID) elastic.UpdateConfig(cfg) elastic.UpdateClient(cfg, client) health, err := client.ClusterHealth() if err != nil { - return err,nil,request + return err, nil, request } if health != nil { cfg.RawName = health.Name @@ -304,27 +306,27 @@ func (module *Module) initTempClient(r *http.Request) (error, elastic.API,SetupR cfg.Version = ver.Number cfg.Distribution = ver.Distribution - return err, client,request + return err, client, request } func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps httprouter.Params) { - if !global.Env().SetupRequired(){ + if !global.Env().SetupRequired() { module.WriteError(w, "setup not permitted", 500) return } - success:=false + success := false var err error var errType string var fixTips string var code int - code=200 + code = 200 defer func() { global.Env().CheckSetup() result := util.MapStr{} - result["success"]=success + result["success"] = success if r := recover(); r != nil { var v string @@ -336,51 +338,51 @@ func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps http case string: v = r.(string) } - if v!=""{ - success=false - result["error"]=util.MapStr{ - "reason":v, + if v != "" { + success = false + result["error"] = util.MapStr{ + "reason": v, } - if errType!=""{ - result["type"]=errType + if errType != "" { + result["type"] = errType } - if fixTips!=""{ - result["fix_tips"]=fixTips + if fixTips != "" { + result["fix_tips"] = fixTips } - code=500 + code = 500 } } module.WriteJSON(w, result, code) }() - err, client,request := module.initTempClient(r) - if err!=nil{ + err, client, request := module.initTempClient(r) + if err != nil { panic(err) } if request.CredentialSecret == "" { panic("invalid credential secret") } - if cfg1.IndexPrefix==""{ - cfg1.IndexPrefix=".infini_" + if cfg1.IndexPrefix == "" { + cfg1.IndexPrefix = ".infini_" } - if cfg1.TemplateName==""{ - cfg1.TemplateName=".infini" + if cfg1.TemplateName == "" { + cfg1.TemplateName = ".infini" } - if !cfg1.Enabled{ - cfg1.Enabled=true + if !cfg1.Enabled { + cfg1.Enabled = true } - if !cfg1.InitTemplate{ - cfg1.InitTemplate=true + if !cfg1.InitTemplate { + cfg1.InitTemplate = true } - cfg.Reserved=true - cfg.Monitored=true + cfg.Reserved = true + cfg.Monitored = true //处理ORM - handler := elastic2.ElasticORM{Client: client, Config:cfg1 } + handler := elastic2.ElasticORM{Client: client, Config: cfg1} orm.Register("elastic_setup_"+util.GetUUID(), handler) //生成凭据并保存 h := md5.New() @@ -393,7 +395,7 @@ func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps http panic(err) } - if !request.Skip{ + if !request.Skip { //处理模版 elastic2.InitTemplate(true) @@ -402,39 +404,38 @@ func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps http //INDEX_PREFIX ver := elastic.GetClient(GlobalSystemElasticsearchID).GetVersion() dslTplFileName := "initialization.tpl" - if ver.Distribution == "" || ver.Distribution == elastic.Elasticsarch { //elasticsearch distribution + if ver.Distribution == "" || ver.Distribution == elastic.Elasticsearch { //elasticsearch distribution majorVersion := elastic.GetClient(GlobalSystemElasticsearchID).GetMajorVersion() if majorVersion == 6 { dslTplFileName = "initialization_v6.tpl" - }else if majorVersion <= 5 { + } else if majorVersion <= 5 { dslTplFileName = "initialization_v5.tpl" } } + dslTplFile := path.Join(global.Env().GetConfigDir(), dslTplFileName) + dslFile := path.Join(global.Env().GetConfigDir(), "initialization.dsl") - dslTplFile:=path.Join(global.Env().GetConfigDir(), dslTplFileName) - dslFile:=path.Join(global.Env().GetConfigDir(),"initialization.dsl") - - if !util.FileExists(dslTplFile){ + if !util.FileExists(dslTplFile) { log.Error(filepath.Abs(dslTplFile)) panic("template file for setup was missing") } var dsl []byte - dsl,err=util.FileGetContent(dslTplFile) - if err!=nil{ + dsl, err = util.FileGetContent(dslTplFile) + if err != nil { panic(err) } - var dslWriteSuccess=false - if len(dsl)>0{ + var dslWriteSuccess = false + if len(dsl) > 0 { var tpl *fasttemplate.Template - tpl,err=fasttemplate.NewTemplate(string(dsl), "$[[", "]]") - if err!=nil{ + tpl, err = fasttemplate.NewTemplate(string(dsl), "$[[", "]]") + if err != nil { panic(err) } - if tpl!=nil{ - output:=tpl.ExecuteFuncString(func(w io.Writer, tag string) (int, error) { + if tpl != nil { + output := tpl.ExecuteFuncString(func(w io.Writer, tag string) (int, error) { switch tag { case "TEMPLATE_NAME": return w.Write([]byte(cfg1.TemplateName)) @@ -445,17 +446,17 @@ func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps http case "RESOURCE_NAME": return w.Write([]byte(cfg.Name)) } - panic(errors.Errorf("unknown tag: %v",tag)) + panic(errors.Errorf("unknown tag: %v", tag)) }) - _,err=util.FilePutContent(dslFile,output) - if err!=nil{ + _, err = util.FilePutContent(dslFile, output) + if err != nil { panic(err) } - dslWriteSuccess=true + dslWriteSuccess = true } } - if dslWriteSuccess{ + if dslWriteSuccess { lines := util.FileGetLines(dslFile) var ( username string @@ -465,8 +466,8 @@ func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps http username = cfg.BasicAuth.Username password = cfg.BasicAuth.Password } - _,err,_:=replay.ReplayLines(pipeline.AcquireContext(pipeline.PipelineConfigV2{}),lines,cfg.Schema,cfg.Host,username,password) - if err!=nil{ + _, err, _ := replay.ReplayLines(pipeline.AcquireContext(pipeline.PipelineConfigV2{}), lines, cfg.Schema, cfg.Host, username, password) + if err != nil { log.Error(err) } } @@ -480,7 +481,6 @@ func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps http //init security security.InitSecurity() - toSaveCfg := cfg if request.Cluster.Username != "" || request.Cluster.Password != "" { cred := credential.Credential{ @@ -502,57 +502,56 @@ func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps http toSaveCfg.CredentialID = cred.ID cfg.CredentialID = cred.ID err = orm.Save(nil, &cred) - if err!=nil{ + if err != nil { panic(err) } toSaveCfg.BasicAuth = nil } //保存默认集群 - err=orm.Create(nil, &toSaveCfg) - if err!=nil{ + err = orm.Create(nil, &toSaveCfg) + if err != nil { panic(err) } - if request.BootstrapUsername!=""&&request.BootstrapPassword!=""{ + if request.BootstrapUsername != "" && request.BootstrapPassword != "" { //Save bootstrap user - user:=rbac.User{} - user.ID="default_user_"+request.BootstrapUsername - user.Name=request.BootstrapUsername - user.NickName=request.BootstrapUsername + user := rbac.User{} + user.ID = "default_user_" + request.BootstrapUsername + user.Name = request.BootstrapUsername + user.NickName = request.BootstrapUsername var hash []byte hash, err = bcrypt.GenerateFromPassword([]byte(request.BootstrapPassword), bcrypt.DefaultCost) - if err!=nil{ + if err != nil { panic(err) } - user.Password=string(hash) - role:=[]rbac.UserRole{} - role=append(role,rbac.UserRole{ - ID: rbac.RoleAdminName, + user.Password = string(hash) + role := []rbac.UserRole{} + role = append(role, rbac.UserRole{ + ID: rbac.RoleAdminName, Name: rbac.RoleAdminName, }) - user.Roles=role - err=orm.Create(nil, &user) - if err!=nil{ + user.Roles = role + err = orm.Create(nil, &user) + if err != nil { panic(err) } } } err = keystore.SetValue("SYSTEM_CLUSTER_PASS", []byte(cfg.BasicAuth.Password)) - if err!=nil{ + if err != nil { panic(err) } - //save to local file - file:=path.Join(global.Env().GetConfigDir(),"system_config.yml") - _,err=util.FilePutContent(file,fmt.Sprintf("configs.template:\n - name: \"system\"\n path: ./config/system_config.tpl\n variable:\n " + - "CLUSTER_ID: %v\n CLUSTER_ENDPINT: \"%v\"\n " + + file := path.Join(global.Env().GetConfigDir(), "system_config.yml") + _, err = util.FilePutContent(file, fmt.Sprintf("configs.template:\n - name: \"system\"\n path: ./config/system_config.tpl\n variable:\n "+ + "CLUSTER_ID: %v\n CLUSTER_ENDPINT: \"%v\"\n "+ "CLUSTER_USER: \"%v\"\n CLUSTER_VER: \"%v\"\n CLUSTER_DISTRIBUTION: \"%v\"\n INDEX_PREFIX: \"%v\"", - GlobalSystemElasticsearchID,cfg.Endpoint,cfg.BasicAuth.Username,cfg.Version,cfg.Distribution,cfg1.IndexPrefix )) - if err!=nil{ + GlobalSystemElasticsearchID, cfg.Endpoint, cfg.BasicAuth.Username, cfg.Version, cfg.Distribution, cfg1.IndexPrefix)) + if err != nil { panic(err) } @@ -560,12 +559,12 @@ func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps http InvokeSetupCallback() //place setup lock file - setupLock:=path.Join(global.Env().GetDataDir(),".setup_lock") - _,err=util.FilePutContent(setupLock,time.Now().String()) - if err!=nil{ + setupLock := path.Join(global.Env().GetDataDir(), ".setup_lock") + _, err = util.FilePutContent(setupLock, time.Now().String()) + if err != nil { panic(err) } - success=true + success = true }