fixed merge master conflicts

This commit is contained in:
liugq 2022-10-25 16:07:43 +08:00
parent 3dedda4e62
commit 516e7462ac
7 changed files with 154 additions and 91 deletions

View File

@ -2,6 +2,7 @@
elasticsearch:
- id: $[[CLUSTER_ID]]
name: $[[CLUSTER_ID]]
version: $[[CLUSTER_VER]]
enabled: true
monitored: true
reserved: true

View File

@ -1,4 +1,5 @@
path.configs: "config"
configs.auto_reload: true
web:
enabled: true

19
main.go
View File

@ -81,7 +81,7 @@ func main() {
module.RegisterSystemModule(&setup1.Module{})
module.RegisterSystemModule(uiModule)
var initFunc= func() {
if !global.Env().SetupRequired(){
module.RegisterSystemModule(&stats.SimpleStatsModule{})
module.RegisterSystemModule(&elastic2.ElasticModule{})
module.RegisterSystemModule(&queue2.DiskQueue{})
@ -92,15 +92,10 @@ func main() {
module.RegisterSystemModule(&metrics.MetricsModule{})
module.RegisterSystemModule(&security.Module{})
module.RegisterSystemModule(&migration.MigrationModule{})
}
if !global.Env().SetupRequired(){
initFunc()
}else{
for _, v := range modules {
v.Setup()
}
setup1.RegisterSetupCallback(initFunc)
}
api.RegisterAPI("")
@ -130,11 +125,7 @@ func main() {
module.Start()
var initFunc= func() {
if global.Env().SetupRequired() {
for _, v := range modules {
v.Start()
}
}
elastic2.InitTemplate(false)
@ -153,6 +144,12 @@ func main() {
orm.RegisterSchemaWithIndexName(task1.Log{}, "task-log")
api.RegisterSchema()
if global.Env().SetupRequired() {
for _, v := range modules {
v.Start()
}
}
task1.RunWithinGroup("initialize_alerting",func(ctx context.Context) error {
err := alerting2.InitTasks()
if err != nil {

View File

@ -417,4 +417,54 @@ func (h *GatewayAPI) getExecutionNodes(w http.ResponseWriter, req *http.Request,
}
}
h.WriteJSON(w, nodes, http.StatusOK)
}
func (h *GatewayAPI) getExecutionNodesNew(w http.ResponseWriter, req *http.Request, ps httprouter.Params){
var (
keyword = h.GetParameterOrDefault(req, "keyword", "")
strSize = h.GetParameterOrDefault(req, "size", "10")
strFrom = h.GetParameterOrDefault(req, "from", "0")
)
size, _ := strconv.Atoi(strSize)
if size <= 0 {
size = 10
}
from, _ := strconv.Atoi(strFrom)
if from < 0 {
from = 0
}
query := util.MapStr{
"size": size,
"sort": []util.MapStr{
{
"created": util.MapStr{
"order": "desc",
},
},
},
"query": util.MapStr{
"bool": util.MapStr{
"must": []util.MapStr{
{
"term": util.MapStr{
"enrolled": util.MapStr{
"value": true,
},
},
},
{
"term": util.MapStr{
"status": util.MapStr{
"value": "online",
},
},
},
},
},
},
}
q := orm.Query{
RawQuery: util.MustToJSONBytes(query),
}
_ = q
}

View File

@ -76,6 +76,11 @@ func GenerateQuery(metric *insight.Metric) (interface{}, error) {
}
}
targetESVersion := elastic.GetMetadata(metric.ClusterId).Config.Version
if targetESVersion==""{
panic("invalid version")
}
intervalField, err := elastic.GetDateHistogramIntervalField(targetESVersion, metric.BucketSize)
if err != nil {
return nil, fmt.Errorf("get interval field error: %w", err)

View File

@ -80,6 +80,7 @@ type SetupRequest struct {
Password string `json:"password"`
} `json:"cluster"`
Skip bool `json:"skip"`
BootstrapUsername string `json:"bootstrap_username"`
BootstrapPassword string `json:"bootstrap_password"`
}
@ -249,15 +250,17 @@ func (module *Module) initTempClient(r *http.Request) (error, elastic.API,SetupR
cfg.ID = tempID
cfg.Name = "INFINI_SYSTEM ("+util.PickRandomName()+")"
elastic.InitMetadata(&cfg, true)
client, err := elastic1.InitClientWithConfig(cfg)
if err != nil {
return err,nil,request
}
cfg.Version=client.GetVersion()
meta:=elastic.InitMetadata(&cfg, true)
meta.Config.Version=cfg.Version
elastic.SetMetadata(tempID,meta)
elastic.UpdateConfig(cfg)
elastic.UpdateClient(cfg, client)
cfg.Version=client.GetVersion()
global.Register(elastic.GlobalSystemElasticsearchID,tempID)
return err, client,request
@ -336,93 +339,102 @@ func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps http
handler := elastic2.ElasticORM{Client: client, Config:cfg1 }
orm.Register("elastic_setup_"+util.GetUUID(), handler)
//处理模版
elastic2.InitTemplate(true)
if !request.Skip{
//处理模版
elastic2.InitTemplate(true)
//处理生命周期
//TEMPLATE_NAME
//INDEX_PREFIX
dslTplFile:=path.Join(global.Env().GetConfigDir(),"initialization.tpl")
dslFile:=path.Join(global.Env().GetConfigDir(),"initialization.dsl")
//处理生命周期
//TEMPLATE_NAME
//INDEX_PREFIX
dslTplFile:=path.Join(global.Env().GetConfigDir(),"initialization.tpl")
dslFile:=path.Join(global.Env().GetConfigDir(),"initialization.dsl")
var dsl []byte
dsl,err=util.FileGetContent(dslTplFile)
if err!=nil{
panic(err)
}
var dslWriteSuccess=false
if len(dsl)>0{
var tpl *fasttemplate.Template
tpl,err=fasttemplate.NewTemplate(string(dsl), "$[[", "]]")
var dsl []byte
dsl,err=util.FileGetContent(dslTplFile)
if err!=nil{
panic(err)
}
if tpl!=nil{
output:=tpl.ExecuteFuncString(func(w io.Writer, tag string) (int, error) {
switch tag {
case "TEMPLATE_NAME":
return w.Write([]byte(cfg1.TemplateName))
case "INDEX_PREFIX":
return w.Write([]byte(cfg1.IndexPrefix))
case "RESOURCE_ID":
return w.Write([]byte(cfg.ID))
case "RESOURCE_NAME":
return w.Write([]byte(cfg.Name))
}
panic(errors.Errorf("unknown tag: %v",tag))
})
_,err=util.FilePutContent(dslFile,output)
var dslWriteSuccess=false
if len(dsl)>0{
var tpl *fasttemplate.Template
tpl,err=fasttemplate.NewTemplate(string(dsl), "$[[", "]]")
if err!=nil{
panic(err)
}
dslWriteSuccess=true
if tpl!=nil{
output:=tpl.ExecuteFuncString(func(w io.Writer, tag string) (int, error) {
switch tag {
case "TEMPLATE_NAME":
return w.Write([]byte(cfg1.TemplateName))
case "INDEX_PREFIX":
return w.Write([]byte(cfg1.IndexPrefix))
case "RESOURCE_ID":
return w.Write([]byte(cfg.ID))
case "RESOURCE_NAME":
return w.Write([]byte(cfg.Name))
}
panic(errors.Errorf("unknown tag: %v",tag))
})
_,err=util.FilePutContent(dslFile,output)
if err!=nil{
panic(err)
}
dslWriteSuccess=true
}
}
}
if dslWriteSuccess{
lines := util.FileGetLines(dslFile)
_,err,_:=replay.ReplayLines(pipeline.AcquireContext(),lines,cfg.Schema,cfg.Host)
if err!=nil{
log.Error(err)
if dslWriteSuccess{
lines := util.FileGetLines(dslFile)
_,err,_:=replay.ReplayLines(pipeline.AcquireContext(),lines,cfg.Schema,cfg.Host)
if err!=nil{
log.Error(err)
}
}
}
//处理索引
elastic2.InitSchema()
//init security
security.InitSecurity()
//处理索引
elastic2.InitSchema()
//init security
security.InitSecurity()
//保存默认集群
err=orm.Save(&cfg)
if err!=nil{
panic(err)
}
if request.BootstrapUsername!=""&&request.BootstrapPassword!=""{
//Save bootstrap user
user:=rbac.User{}
user.ID="default_user_"+request.BootstrapUsername
user.Name=request.BootstrapUsername
user.NickName=request.BootstrapUsername
var hash []byte
hash, err = bcrypt.GenerateFromPassword([]byte(request.BootstrapPassword), bcrypt.DefaultCost)
//保存默认集群
err=orm.Save(&cfg)
if err!=nil{
panic(err)
}
user.Password=string(hash)
role:=[]rbac.UserRole{}
role=append(role,rbac.UserRole{
ID: rbac.RoleAdminName,
Name: rbac.RoleAdminName,
})
user.Roles=role
err=orm.Save(&user)
if request.BootstrapUsername!=""&&request.BootstrapPassword!=""{
//Save bootstrap user
user:=rbac.User{}
user.ID="default_user_"+request.BootstrapUsername
user.Name=request.BootstrapUsername
user.NickName=request.BootstrapUsername
var hash []byte
hash, err = bcrypt.GenerateFromPassword([]byte(request.BootstrapPassword), bcrypt.DefaultCost)
if err!=nil{
panic(err)
}
user.Password=string(hash)
role:=[]rbac.UserRole{}
role=append(role,rbac.UserRole{
ID: rbac.RoleAdminName,
Name: rbac.RoleAdminName,
})
user.Roles=role
err=orm.Save(&user)
if err!=nil{
panic(err)
}
}
//disable builtin auth
err=api.DisableBuiltinUserAdmin()
if err!=nil{
panic(err)
}
}
@ -430,23 +442,15 @@ func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps http
file:=path.Join(global.Env().GetConfigDir(),"system_config.yml")
_,err=util.FilePutContent(file,fmt.Sprintf("configs.template:\n - name: \"system\"\n path: ./config/system_config.tpl\n variable:\n " +
"CLUSTER_ID: %v\n CLUSTER_ENDPINT: \"%v\"\n " +
"CLUSTER_USER: \"%v\"\n CLUSTER_PASS: \"%v\"\n INDEX_PREFIX: \"%v\"",
tempID,cfg.Endpoint,cfg.BasicAuth.Username,cfg.BasicAuth.Password,cfg1.IndexPrefix ))
"CLUSTER_USER: \"%v\"\n CLUSTER_PASS: \"%v\"\n CLUSTER_VER: \"%v\"\n INDEX_PREFIX: \"%v\"",
tempID,cfg.Endpoint,cfg.BasicAuth.Username,cfg.BasicAuth.Password,cfg.Version,cfg1.IndexPrefix ))
if err!=nil{
panic(err)
}
//处理 ILM
//callback
InvokeSetupCallback()
//disable builtin auth
err=api.DisableBuiltinUserAdmin()
if err!=nil{
panic(err)
}
//place setup lock file
setupLock:=path.Join(global.Env().GetDataDir(),".setup_lock")
_,err=util.FilePutContent(setupLock,time.Now().String())

View File

@ -62,6 +62,11 @@ func (engine *Engine) GenerateQuery(rule *alerting.Rule, filterParam *alerting.F
if filterParam != nil && filterParam.BucketSize != "" {
periodInterval = filterParam.BucketSize
}
if targetESVersion==""{
panic("invalid version")
}
intervalField, err := elastic.GetDateHistogramIntervalField(targetESVersion, periodInterval )
if err != nil {
return nil, fmt.Errorf("get interval field error: %w", err)