[plugin] fix naming (#42)

[plugin] fix naming

Co-authored-by: Kassian Sun <kassiansun@outlook.com>
This commit is contained in:
sunjiacheng 2023-03-29 13:02:55 +08:00 committed by medcl
parent 24ca0c91a5
commit 08a1673c1a
1 changed files with 138 additions and 139 deletions

View File

@ -5,6 +5,14 @@ import (
"crypto/md5" "crypto/md5"
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"io"
"net/http"
uri2 "net/url"
"path"
"path/filepath"
"runtime"
"time"
log "github.com/cihub/seelog" log "github.com/cihub/seelog"
"github.com/valyala/fasttemplate" "github.com/valyala/fasttemplate"
"golang.org/x/crypto/bcrypt" "golang.org/x/crypto/bcrypt"
@ -27,13 +35,6 @@ import (
elastic1 "infini.sh/framework/modules/elastic/common" elastic1 "infini.sh/framework/modules/elastic/common"
"infini.sh/framework/modules/security" "infini.sh/framework/modules/security"
"infini.sh/framework/plugins/replay" "infini.sh/framework/plugins/replay"
"io"
"net/http"
uri2 "net/url"
"path"
"path/filepath"
"runtime"
"time"
) )
type Module struct { type Module struct {
@ -50,7 +51,7 @@ func init() {
func (module *Module) Setup() { func (module *Module) Setup() {
if !global.Env().SetupRequired(){ if !global.Env().SetupRequired() {
return return
} }
@ -59,13 +60,14 @@ func (module *Module) Setup() {
elastic3.InitTestAPI() elastic3.InitTestAPI()
} }
var setupFinishedCallback= []func() {} var setupFinishedCallback = []func(){}
func RegisterSetupCallback(f func()) { func RegisterSetupCallback(f func()) {
setupFinishedCallback=append(setupFinishedCallback,f) setupFinishedCallback = append(setupFinishedCallback, f)
} }
func InvokeSetupCallback() { func InvokeSetupCallback() {
for _,v:=range setupFinishedCallback{ for _, v := range setupFinishedCallback {
v() v()
} }
} }
@ -113,34 +115,34 @@ type SetupRequest struct {
CredentialSecret string `json:"credential_secret"` CredentialSecret string `json:"credential_secret"`
} }
var GlobalSystemElasticsearchID="infini_default_system_cluster" var GlobalSystemElasticsearchID = "infini_default_system_cluster"
const VersionTooOld ="elasticsearch_version_too_old" const VersionTooOld = "elasticsearch_version_too_old"
const IndicesExists ="elasticsearch_indices_exists" const IndicesExists = "elasticsearch_indices_exists"
const TemplateExists ="elasticsearch_template_exists" const TemplateExists = "elasticsearch_template_exists"
const VersionNotSupport ="unknown_cluster_version" const VersionNotSupport = "unknown_cluster_version"
var cfg1 elastic1.ORMConfig var cfg1 elastic1.ORMConfig
func (module *Module) validate(w http.ResponseWriter, r *http.Request, ps httprouter.Params) { func (module *Module) validate(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
if !global.Env().SetupRequired(){ if !global.Env().SetupRequired() {
module.WriteError(w, "setup not permitted", 500) module.WriteError(w, "setup not permitted", 500)
return return
} }
success:=false success := false
var err error var err error
var errType string var errType string
var fixTips string var fixTips string
var code int var code int
code=200 code = 200
defer func() { defer func() {
global.Env().CheckSetup() global.Env().CheckSetup()
result := util.MapStr{} result := util.MapStr{}
result["success"]=success result["success"] = success
if r := recover(); r != nil { if r := recover(); r != nil {
var v string var v string
@ -152,32 +154,31 @@ func (module *Module) validate(w http.ResponseWriter, r *http.Request, ps httpro
case string: case string:
v = r.(string) v = r.(string)
} }
if v!=""{ if v != "" {
success=false success = false
result["error"]=util.MapStr{ result["error"] = util.MapStr{
"reason":v, "reason": v,
} }
if errType!=""{ if errType != "" {
result["type"]=errType result["type"] = errType
} }
if fixTips!=""{ if fixTips != "" {
result["fix_tips"]=fixTips result["fix_tips"] = fixTips
} }
code=500 code = 500
} }
} }
module.WriteJSON(w, result, code) module.WriteJSON(w, result, code)
}() }()
err, client, _ := module.initTempClient(r)
err, client,_ := module.initTempClient(r) if err != nil {
if err!=nil{
panic(err) panic(err)
} }
//validate version //validate version
verInfo, err := adapter.ClusterVersion(elastic.GetMetadata(cfg.ID)) verInfo, err := adapter.ClusterVersion(elastic.GetMetadata(cfg.ID))
if verInfo.Version.Distribution == elastic.Elasticsarch { if verInfo.Version.Distribution == elastic.Elasticsearch {
if verInfo.Version.Number != "" { if verInfo.Version.Number != "" {
ver := &util.Version{} ver := &util.Version{}
ver, err = util.ParseSemantic(verInfo.Version.Number) ver, err = util.ParseSemantic(verInfo.Version.Number)
@ -192,7 +193,7 @@ func (module *Module) validate(w http.ResponseWriter, r *http.Request, ps httpro
panic(errors.Errorf("elasticsearch version(%v) should greater than v5.3", verInfo.Version.Number)) panic(errors.Errorf("elasticsearch version(%v) should greater than v5.3", verInfo.Version.Number))
} }
} }
}else if verInfo.Version.Distribution != elastic.Easysearch && verInfo.Version.Distribution != elastic.Opensearch { } else if verInfo.Version.Distribution != elastic.Easysearch && verInfo.Version.Distribution != elastic.Opensearch {
errType = VersionNotSupport errType = VersionNotSupport
panic(errors.Errorf("unknown distribution (%v)", verInfo.Version.Distribution)) panic(errors.Errorf("unknown distribution (%v)", verInfo.Version.Distribution))
} }
@ -202,11 +203,11 @@ func (module *Module) validate(w http.ResponseWriter, r *http.Request, ps httpro
panic(err) panic(err)
} }
if cfg1.IndexPrefix==""{ if cfg1.IndexPrefix == "" {
cfg1.IndexPrefix=".infini_" cfg1.IndexPrefix = ".infini_"
} }
if cfg1.TemplateName==""{ if cfg1.TemplateName == "" {
cfg1.TemplateName=".infini" cfg1.TemplateName = ".infini"
} }
//validate indices //validate indices
@ -227,7 +228,7 @@ func (module *Module) validate(w http.ResponseWriter, r *http.Request, ps httpro
tipBuff.WriteString("\n") tipBuff.WriteString("\n")
} }
errType = IndicesExists errType = IndicesExists
fixTips=tipBuff.String() fixTips = tipBuff.String()
panic(errors.Errorf("there are following indices exists in target elasticsearch: \n%v", buff.String())) panic(errors.Errorf("there are following indices exists in target elasticsearch: \n%v", buff.String()))
} }
@ -237,27 +238,29 @@ func (module *Module) validate(w http.ResponseWriter, r *http.Request, ps httpro
} }
if ok { if ok {
errType = TemplateExists errType = TemplateExists
fixTips="DELETE /_template/"+util.TrimSpaces(cfg1.TemplateName) fixTips = "DELETE /_template/" + util.TrimSpaces(cfg1.TemplateName)
panic(errors.Errorf("there are following template already exists in target elasticsearch: \n%v", cfg1.TemplateName)) panic(errors.Errorf("there are following template already exists in target elasticsearch: \n%v", cfg1.TemplateName))
} }
success = true success = true
} }
var cfg elastic.ElasticsearchConfig var cfg elastic.ElasticsearchConfig
func (module *Module) initTempClient(r *http.Request) (error, elastic.API,SetupRequest) {
func (module *Module) initTempClient(r *http.Request) (error, elastic.API, SetupRequest) {
request := SetupRequest{} request := SetupRequest{}
err := module.DecodeJSON(r, &request) err := module.DecodeJSON(r, &request)
if err != nil { if err != nil {
return err,nil,request return err, nil, request
} }
if request.Cluster.Endpoint==""&&request.Cluster.Host==""{ if request.Cluster.Endpoint == "" && request.Cluster.Host == "" {
panic("invalid configuration") panic("invalid configuration")
} }
if request.Cluster.Endpoint==""{ if request.Cluster.Endpoint == "" {
if request.Cluster.Host!=""&&request.Cluster.Schema!=""{ if request.Cluster.Host != "" && request.Cluster.Schema != "" {
request.Cluster.Endpoint=fmt.Sprintf("%v://%v",request.Cluster.Schema,request.Cluster.Host) request.Cluster.Endpoint = fmt.Sprintf("%v://%v", request.Cluster.Schema, request.Cluster.Host)
} }
} }
@ -271,31 +274,30 @@ func (module *Module) initTempClient(r *http.Request) (error, elastic.API,SetupR
}, },
} }
if cfg.Endpoint!=""&&cfg.Host==""{ if cfg.Endpoint != "" && cfg.Host == "" {
uri,err:=uri2.Parse(cfg.Endpoint) uri, err := uri2.Parse(cfg.Endpoint)
if err!=nil{ if err != nil {
panic(err) panic(err)
} }
cfg.Host=uri.Host cfg.Host = uri.Host
cfg.Schema=uri.Scheme cfg.Schema = uri.Scheme
} }
cfg.ID = GlobalSystemElasticsearchID cfg.ID = GlobalSystemElasticsearchID
cfg.Name = "INFINI_SYSTEM ("+util.PickRandomName()+")" cfg.Name = "INFINI_SYSTEM (" + util.PickRandomName() + ")"
elastic.InitMetadata(&cfg, true) elastic.InitMetadata(&cfg, true)
client, err := elastic1.InitClientWithConfig(cfg) client, err := elastic1.InitClientWithConfig(cfg)
if err != nil { if err != nil {
return err,nil,request return err, nil, request
} }
global.Register(elastic.GlobalSystemElasticsearchID,GlobalSystemElasticsearchID) global.Register(elastic.GlobalSystemElasticsearchID, GlobalSystemElasticsearchID)
elastic.UpdateConfig(cfg) elastic.UpdateConfig(cfg)
elastic.UpdateClient(cfg, client) elastic.UpdateClient(cfg, client)
health, err := client.ClusterHealth() health, err := client.ClusterHealth()
if err != nil { if err != nil {
return err,nil,request return err, nil, request
} }
if health != nil { if health != nil {
cfg.RawName = health.Name cfg.RawName = health.Name
@ -304,27 +306,27 @@ func (module *Module) initTempClient(r *http.Request) (error, elastic.API,SetupR
cfg.Version = ver.Number cfg.Version = ver.Number
cfg.Distribution = ver.Distribution cfg.Distribution = ver.Distribution
return err, client,request return err, client, request
} }
func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps httprouter.Params) { func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
if !global.Env().SetupRequired(){ if !global.Env().SetupRequired() {
module.WriteError(w, "setup not permitted", 500) module.WriteError(w, "setup not permitted", 500)
return return
} }
success:=false success := false
var err error var err error
var errType string var errType string
var fixTips string var fixTips string
var code int var code int
code=200 code = 200
defer func() { defer func() {
global.Env().CheckSetup() global.Env().CheckSetup()
result := util.MapStr{} result := util.MapStr{}
result["success"]=success result["success"] = success
if r := recover(); r != nil { if r := recover(); r != nil {
var v string var v string
@ -336,51 +338,51 @@ func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps http
case string: case string:
v = r.(string) v = r.(string)
} }
if v!=""{ if v != "" {
success=false success = false
result["error"]=util.MapStr{ result["error"] = util.MapStr{
"reason":v, "reason": v,
} }
if errType!=""{ if errType != "" {
result["type"]=errType result["type"] = errType
} }
if fixTips!=""{ if fixTips != "" {
result["fix_tips"]=fixTips result["fix_tips"] = fixTips
} }
code=500 code = 500
} }
} }
module.WriteJSON(w, result, code) module.WriteJSON(w, result, code)
}() }()
err, client,request := module.initTempClient(r) err, client, request := module.initTempClient(r)
if err!=nil{ if err != nil {
panic(err) panic(err)
} }
if request.CredentialSecret == "" { if request.CredentialSecret == "" {
panic("invalid credential secret") panic("invalid credential secret")
} }
if cfg1.IndexPrefix==""{ if cfg1.IndexPrefix == "" {
cfg1.IndexPrefix=".infini_" cfg1.IndexPrefix = ".infini_"
} }
if cfg1.TemplateName==""{ if cfg1.TemplateName == "" {
cfg1.TemplateName=".infini" cfg1.TemplateName = ".infini"
} }
if !cfg1.Enabled{ if !cfg1.Enabled {
cfg1.Enabled=true cfg1.Enabled = true
} }
if !cfg1.InitTemplate{ if !cfg1.InitTemplate {
cfg1.InitTemplate=true cfg1.InitTemplate = true
} }
cfg.Reserved=true cfg.Reserved = true
cfg.Monitored=true cfg.Monitored = true
//处理ORM //处理ORM
handler := elastic2.ElasticORM{Client: client, Config:cfg1 } handler := elastic2.ElasticORM{Client: client, Config: cfg1}
orm.Register("elastic_setup_"+util.GetUUID(), handler) orm.Register("elastic_setup_"+util.GetUUID(), handler)
//生成凭据并保存 //生成凭据并保存
h := md5.New() h := md5.New()
@ -393,7 +395,7 @@ func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps http
panic(err) panic(err)
} }
if !request.Skip{ if !request.Skip {
//处理模版 //处理模版
elastic2.InitTemplate(true) elastic2.InitTemplate(true)
@ -402,39 +404,38 @@ func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps http
//INDEX_PREFIX //INDEX_PREFIX
ver := elastic.GetClient(GlobalSystemElasticsearchID).GetVersion() ver := elastic.GetClient(GlobalSystemElasticsearchID).GetVersion()
dslTplFileName := "initialization.tpl" dslTplFileName := "initialization.tpl"
if ver.Distribution == "" || ver.Distribution == elastic.Elasticsarch { //elasticsearch distribution if ver.Distribution == "" || ver.Distribution == elastic.Elasticsearch { //elasticsearch distribution
majorVersion := elastic.GetClient(GlobalSystemElasticsearchID).GetMajorVersion() majorVersion := elastic.GetClient(GlobalSystemElasticsearchID).GetMajorVersion()
if majorVersion == 6 { if majorVersion == 6 {
dslTplFileName = "initialization_v6.tpl" dslTplFileName = "initialization_v6.tpl"
}else if majorVersion <= 5 { } else if majorVersion <= 5 {
dslTplFileName = "initialization_v5.tpl" dslTplFileName = "initialization_v5.tpl"
} }
} }
dslTplFile := path.Join(global.Env().GetConfigDir(), dslTplFileName)
dslFile := path.Join(global.Env().GetConfigDir(), "initialization.dsl")
dslTplFile:=path.Join(global.Env().GetConfigDir(), dslTplFileName) if !util.FileExists(dslTplFile) {
dslFile:=path.Join(global.Env().GetConfigDir(),"initialization.dsl")
if !util.FileExists(dslTplFile){
log.Error(filepath.Abs(dslTplFile)) log.Error(filepath.Abs(dslTplFile))
panic("template file for setup was missing") panic("template file for setup was missing")
} }
var dsl []byte var dsl []byte
dsl,err=util.FileGetContent(dslTplFile) dsl, err = util.FileGetContent(dslTplFile)
if err!=nil{ if err != nil {
panic(err) panic(err)
} }
var dslWriteSuccess=false var dslWriteSuccess = false
if len(dsl)>0{ if len(dsl) > 0 {
var tpl *fasttemplate.Template var tpl *fasttemplate.Template
tpl,err=fasttemplate.NewTemplate(string(dsl), "$[[", "]]") tpl, err = fasttemplate.NewTemplate(string(dsl), "$[[", "]]")
if err!=nil{ if err != nil {
panic(err) panic(err)
} }
if tpl!=nil{ if tpl != nil {
output:=tpl.ExecuteFuncString(func(w io.Writer, tag string) (int, error) { output := tpl.ExecuteFuncString(func(w io.Writer, tag string) (int, error) {
switch tag { switch tag {
case "TEMPLATE_NAME": case "TEMPLATE_NAME":
return w.Write([]byte(cfg1.TemplateName)) return w.Write([]byte(cfg1.TemplateName))
@ -445,17 +446,17 @@ func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps http
case "RESOURCE_NAME": case "RESOURCE_NAME":
return w.Write([]byte(cfg.Name)) return w.Write([]byte(cfg.Name))
} }
panic(errors.Errorf("unknown tag: %v",tag)) panic(errors.Errorf("unknown tag: %v", tag))
}) })
_,err=util.FilePutContent(dslFile,output) _, err = util.FilePutContent(dslFile, output)
if err!=nil{ if err != nil {
panic(err) panic(err)
} }
dslWriteSuccess=true dslWriteSuccess = true
} }
} }
if dslWriteSuccess{ if dslWriteSuccess {
lines := util.FileGetLines(dslFile) lines := util.FileGetLines(dslFile)
var ( var (
username string username string
@ -465,8 +466,8 @@ func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps http
username = cfg.BasicAuth.Username username = cfg.BasicAuth.Username
password = cfg.BasicAuth.Password password = cfg.BasicAuth.Password
} }
_,err,_:=replay.ReplayLines(pipeline.AcquireContext(pipeline.PipelineConfigV2{}),lines,cfg.Schema,cfg.Host,username,password) _, err, _ := replay.ReplayLines(pipeline.AcquireContext(pipeline.PipelineConfigV2{}), lines, cfg.Schema, cfg.Host, username, password)
if err!=nil{ if err != nil {
log.Error(err) log.Error(err)
} }
} }
@ -480,7 +481,6 @@ func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps http
//init security //init security
security.InitSecurity() security.InitSecurity()
toSaveCfg := cfg toSaveCfg := cfg
if request.Cluster.Username != "" || request.Cluster.Password != "" { if request.Cluster.Username != "" || request.Cluster.Password != "" {
cred := credential.Credential{ cred := credential.Credential{
@ -502,57 +502,56 @@ func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps http
toSaveCfg.CredentialID = cred.ID toSaveCfg.CredentialID = cred.ID
cfg.CredentialID = cred.ID cfg.CredentialID = cred.ID
err = orm.Save(nil, &cred) err = orm.Save(nil, &cred)
if err!=nil{ if err != nil {
panic(err) panic(err)
} }
toSaveCfg.BasicAuth = nil toSaveCfg.BasicAuth = nil
} }
//保存默认集群 //保存默认集群
err=orm.Create(nil, &toSaveCfg) err = orm.Create(nil, &toSaveCfg)
if err!=nil{ if err != nil {
panic(err) panic(err)
} }
if request.BootstrapUsername!=""&&request.BootstrapPassword!=""{ if request.BootstrapUsername != "" && request.BootstrapPassword != "" {
//Save bootstrap user //Save bootstrap user
user:=rbac.User{} user := rbac.User{}
user.ID="default_user_"+request.BootstrapUsername user.ID = "default_user_" + request.BootstrapUsername
user.Name=request.BootstrapUsername user.Name = request.BootstrapUsername
user.NickName=request.BootstrapUsername user.NickName = request.BootstrapUsername
var hash []byte var hash []byte
hash, err = bcrypt.GenerateFromPassword([]byte(request.BootstrapPassword), bcrypt.DefaultCost) hash, err = bcrypt.GenerateFromPassword([]byte(request.BootstrapPassword), bcrypt.DefaultCost)
if err!=nil{ if err != nil {
panic(err) panic(err)
} }
user.Password=string(hash) user.Password = string(hash)
role:=[]rbac.UserRole{} role := []rbac.UserRole{}
role=append(role,rbac.UserRole{ role = append(role, rbac.UserRole{
ID: rbac.RoleAdminName, ID: rbac.RoleAdminName,
Name: rbac.RoleAdminName, Name: rbac.RoleAdminName,
}) })
user.Roles=role user.Roles = role
err=orm.Create(nil, &user) err = orm.Create(nil, &user)
if err!=nil{ if err != nil {
panic(err) panic(err)
} }
} }
} }
err = keystore.SetValue("SYSTEM_CLUSTER_PASS", []byte(cfg.BasicAuth.Password)) err = keystore.SetValue("SYSTEM_CLUSTER_PASS", []byte(cfg.BasicAuth.Password))
if err!=nil{ if err != nil {
panic(err) panic(err)
} }
//save to local file //save to local file
file:=path.Join(global.Env().GetConfigDir(),"system_config.yml") file := path.Join(global.Env().GetConfigDir(), "system_config.yml")
_,err=util.FilePutContent(file,fmt.Sprintf("configs.template:\n - name: \"system\"\n path: ./config/system_config.tpl\n variable:\n " + _, err = util.FilePutContent(file, fmt.Sprintf("configs.template:\n - name: \"system\"\n path: ./config/system_config.tpl\n variable:\n "+
"CLUSTER_ID: %v\n CLUSTER_ENDPINT: \"%v\"\n " + "CLUSTER_ID: %v\n CLUSTER_ENDPINT: \"%v\"\n "+
"CLUSTER_USER: \"%v\"\n CLUSTER_VER: \"%v\"\n CLUSTER_DISTRIBUTION: \"%v\"\n INDEX_PREFIX: \"%v\"", "CLUSTER_USER: \"%v\"\n CLUSTER_VER: \"%v\"\n CLUSTER_DISTRIBUTION: \"%v\"\n INDEX_PREFIX: \"%v\"",
GlobalSystemElasticsearchID,cfg.Endpoint,cfg.BasicAuth.Username,cfg.Version,cfg.Distribution,cfg1.IndexPrefix )) GlobalSystemElasticsearchID, cfg.Endpoint, cfg.BasicAuth.Username, cfg.Version, cfg.Distribution, cfg1.IndexPrefix))
if err!=nil{ if err != nil {
panic(err) panic(err)
} }
@ -560,12 +559,12 @@ func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps http
InvokeSetupCallback() InvokeSetupCallback()
//place setup lock file //place setup lock file
setupLock:=path.Join(global.Env().GetDataDir(),".setup_lock") setupLock := path.Join(global.Env().GetDataDir(), ".setup_lock")
_,err=util.FilePutContent(setupLock,time.Now().String()) _, err = util.FilePutContent(setupLock, time.Now().String())
if err!=nil{ if err != nil {
panic(err) panic(err)
} }
success=true success = true
} }