Merge pull request 'Elasticsearch, easysearch, opensearch compatibility update' (#32) from es_distribution into master

This commit is contained in:
silenceqi 2023-03-11 19:11:06 +08:00
commit 6d2f02af9c
8 changed files with 3283 additions and 42 deletions

View File

@ -24,7 +24,7 @@ func bootstrapRequirementCheck() error{
func checkElasticsearchRequirements() error{
log.Trace("start to check elasticsearch requirement")
log.Trace("start to check system cluster requirement")
var esConfigs = []elastic.ElasticsearchConfig{}
ok, err := env.ParseConfig("elasticsearch", &esConfigs)
if err != nil {
@ -37,7 +37,7 @@ func checkElasticsearchRequirements() error{
elasticsearchID:=global.Lookup(elastic.GlobalSystemElasticsearchID)
if elasticsearchID == nil||elasticsearchID=="" {
return fmt.Errorf("elasticsearch config in web section can not be empty")
return fmt.Errorf("cluster config in web section can not be empty")
}
esID:=elasticsearchID.(string)
@ -50,7 +50,7 @@ func checkElasticsearchRequirements() error{
}
if targetEsConfig == nil {
return fmt.Errorf("elasticsearch config %s was not found", esID)
return fmt.Errorf("cluster config %s was not found", esID)
}
var req = util.NewGetRequest(targetEsConfig.Endpoint, nil)
if targetEsConfig.BasicAuth != nil {
@ -59,23 +59,30 @@ func checkElasticsearchRequirements() error{
result, err := util.ExecuteRequest(req)
if err != nil {
return fmt.Errorf("check elasticsearch requirement error: %v", err)
return fmt.Errorf("check system cluster requirement error: %v", err)
}
if result==nil||result.Body==nil||len(result.Body)==0{
return fmt.Errorf("failed to retrive elasticsearch version info")
return fmt.Errorf("failed to retrive cluster version info")
}
versionNumber, err := jsonparser.GetString(result.Body, "version", "number")
if err != nil {
return fmt.Errorf("check elasticsearch requirement error: %v, got response: %s", err, string(result.Body))
return fmt.Errorf("check system cluster requirement error: %v, got response: %s", err, string(result.Body))
}
cr, err := util.VersionCompare(versionNumber, "7.3")
distribution, _ := jsonparser.GetString(result.Body, "version", "distribution")
if distribution == elastic.Easysearch || distribution == elastic.Opensearch {
return nil
} else if distribution != "" {
return fmt.Errorf("unkonw cluster distribution: %v", distribution)
}
cr, err := util.VersionCompare(versionNumber, "5.3")
if err !=nil {
return fmt.Errorf("check elasticsearch requirement error: %v", err)
return fmt.Errorf("check system cluster requirement error: %v", err)
}
if cr == -1 {
return fmt.Errorf("elasticsearch cluster version of store data required to be version 7.3 and above, but got %s", versionNumber)
return fmt.Errorf("system cluster version with distribution elasticsearch required to be version 5.3 and above, but got %s", versionNumber)
}
return nil
}

1578
config/initialization_v5.tpl Normal file

File diff suppressed because it is too large Load Diff

1635
config/initialization_v6.tpl Normal file

File diff suppressed because it is too large Load Diff

View File

@ -3,6 +3,7 @@ elasticsearch:
- id: $[[CLUSTER_ID]]
name: $[[CLUSTER_ID]]
version: $[[CLUSTER_VER]]
distribution: $[[CLUSTER_DISTRIBUTION]]
enabled: true
monitored: true
reserved: true

View File

@ -65,11 +65,7 @@ func (h *InsightAPI) HandleGetPreview(w http.ResponseWriter, req *http.Request,
timeFields = []string{reqBody.TimeField}
}
aggs := util.MapStr{
"doc_count": util.MapStr{
"value_count": util.MapStr{"field": "_id"},
},
}
aggs := util.MapStr{}
for _, tfield := range timeFields {
aggs["maxTime_"+tfield] = util.MapStr{
@ -80,6 +76,7 @@ func (h *InsightAPI) HandleGetPreview(w http.ResponseWriter, req *http.Request,
}
}
query := util.MapStr{
"size": 0,
"aggs": aggs,
}
if reqBody.Filter != nil {
@ -96,7 +93,7 @@ func (h *InsightAPI) HandleGetPreview(w http.ResponseWriter, req *http.Request,
return
}
result := util.MapStr{
"doc_count": searchRes.Aggregations["doc_count"].Value,
"doc_count": searchRes.GetTotal(),
}
tfieldsM := map[string]util.MapStr{}
for ak, av := range searchRes.Aggregations {

View File

@ -75,13 +75,13 @@ func GenerateQuery(metric *insight.Metric) (interface{}, error) {
return nil, err
}
}
targetESVersion := elastic.GetMetadata(metric.ClusterId).Config.Version
verInfo := elastic.GetClient(metric.ClusterId).GetVersion()
if targetESVersion==""{
if verInfo.Number==""{
panic("invalid version")
}
intervalField, err := elastic.GetDateHistogramIntervalField(targetESVersion, metric.BucketSize)
intervalField, err := elastic.GetDateHistogramIntervalField(verInfo.Distribution, verInfo.Number, metric.BucketSize)
if err != nil {
return nil, fmt.Errorf("get interval field error: %w", err)
}

View File

@ -5,6 +5,8 @@ import (
"crypto/md5"
"encoding/hex"
"fmt"
log "github.com/cihub/seelog"
"github.com/valyala/fasttemplate"
"golang.org/x/crypto/bcrypt"
"infini.sh/framework/core/api"
"infini.sh/framework/core/api/rbac"
@ -20,8 +22,9 @@ import (
"infini.sh/framework/core/pipeline"
"infini.sh/framework/core/util"
elastic2 "infini.sh/framework/modules/elastic"
elastic1 "infini.sh/framework/modules/elastic/common"
"infini.sh/framework/modules/elastic/adapter"
elastic3 "infini.sh/framework/modules/elastic/api"
elastic1 "infini.sh/framework/modules/elastic/common"
"infini.sh/framework/modules/security"
"infini.sh/framework/plugins/replay"
"io"
@ -30,8 +33,6 @@ import (
"path"
"path/filepath"
"runtime"
"github.com/valyala/fasttemplate"
log "github.com/cihub/seelog"
"time"
)
@ -117,6 +118,7 @@ var GlobalSystemElasticsearchID="infini_default_system_cluster"
const VersionTooOld ="elasticsearch_version_too_old"
const IndicesExists ="elasticsearch_indices_exists"
const TemplateExists ="elasticsearch_template_exists"
const VersionNotSupport ="unknown_cluster_version"
var cfg1 elastic1.ORMConfig
@ -174,22 +176,25 @@ func (module *Module) validate(w http.ResponseWriter, r *http.Request, ps httpro
}
//validate version
version := client.GetVersion()
if version != "" {
ver := &util.Version{}
ver, err = util.ParseSemantic(version)
if err != nil {
panic(err)
}
if ver.Major() >= 7 {
if ver.Major() == 7 && ver.Minor() < 3 {
errType = VersionTooOld
panic(errors.Errorf("elasticsearch version(%v) should greater than v7.3", version))
verInfo, err := adapter.ClusterVersion(elastic.GetMetadata(cfg.ID))
if verInfo.Version.Distribution == elastic.Elasticsarch {
if verInfo.Version.Number != "" {
ver := &util.Version{}
ver, err = util.ParseSemantic(verInfo.Version.Number)
if err != nil {
panic(err)
}
if ver.Major() == 5 && ver.Minor() < 3 {
errType = VersionTooOld
panic(errors.Errorf("elasticsearch version(%v) should greater than v5.3", verInfo.Version.Number))
} else if ver.Major() < 5 {
errType = VersionTooOld
panic(errors.Errorf("elasticsearch version(%v) should greater than v5.3", verInfo.Version.Number))
}
} else {
errType = VersionTooOld
panic(errors.Errorf("elasticsearch version(%v) should greater than v7.3", version))
}
}else if verInfo.Version.Distribution != elastic.Easysearch && verInfo.Version.Distribution != elastic.Opensearch {
errType = VersionNotSupport
panic(errors.Errorf("unknown distribution (%v)", verInfo.Version.Distribution))
}
cfg1 = elastic1.ORMConfig{}
exist, err := env.ParseConfig("elastic.orm", &cfg1)
@ -295,7 +300,9 @@ func (module *Module) initTempClient(r *http.Request) (error, elastic.API,SetupR
if health != nil {
cfg.RawName = health.Name
}
cfg.Version=client.GetVersion()
ver := client.GetVersion()
cfg.Version = ver.Number
cfg.Distribution = ver.Distribution
return err, client,request
}
@ -393,7 +400,19 @@ func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps http
//处理生命周期
//TEMPLATE_NAME
//INDEX_PREFIX
dslTplFile:=path.Join(global.Env().GetConfigDir(),"initialization.tpl")
ver := elastic.GetClient(GlobalSystemElasticsearchID).GetVersion()
dslTplFileName := "initialization.tpl"
if ver.Distribution == "" || ver.Distribution == elastic.Elasticsarch { //elasticsearch distribution
majorVersion := elastic.GetClient(GlobalSystemElasticsearchID).GetMajorVersion()
if majorVersion == 6 {
dslTplFileName = "initialization_v6.tpl"
}else if majorVersion <= 5 {
dslTplFileName = "initialization_v5.tpl"
}
}
dslTplFile:=path.Join(global.Env().GetConfigDir(), dslTplFileName)
dslFile:=path.Join(global.Env().GetConfigDir(),"initialization.dsl")
if !util.FileExists(dslTplFile){
@ -452,6 +471,10 @@ func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps http
}
}
if err != nil {
panic(err)
}
//处理索引
elastic2.InitSchema()
//init security
@ -527,8 +550,8 @@ func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps http
file:=path.Join(global.Env().GetConfigDir(),"system_config.yml")
_,err=util.FilePutContent(file,fmt.Sprintf("configs.template:\n - name: \"system\"\n path: ./config/system_config.tpl\n variable:\n " +
"CLUSTER_ID: %v\n CLUSTER_ENDPINT: \"%v\"\n " +
"CLUSTER_USER: \"%v\"\n CLUSTER_VER: \"%v\"\n INDEX_PREFIX: \"%v\"",
GlobalSystemElasticsearchID,cfg.Endpoint,cfg.BasicAuth.Username,cfg.Version,cfg1.IndexPrefix ))
"CLUSTER_USER: \"%v\"\n CLUSTER_VER: \"%v\"\n CLUSTER_DISTRIBUTION: \"%v\"\n INDEX_PREFIX: \"%v\"",
GlobalSystemElasticsearchID,cfg.Endpoint,cfg.BasicAuth.Username,cfg.Version,cfg.Distribution,cfg1.IndexPrefix ))
if err!=nil{
panic(err)
}

View File

@ -57,17 +57,17 @@ func (engine *Engine) GenerateQuery(rule *alerting.Rule, filterParam *alerting.F
return nil, err
}
}
targetESVersion := elastic.GetMetadata(rule.Resource.ID).Config.Version
verInfo := elastic.GetClient(rule.Resource.ID).GetVersion()
var periodInterval = rule.Metrics.BucketSize
if filterParam != nil && filterParam.BucketSize != "" {
periodInterval = filterParam.BucketSize
}
if targetESVersion==""{
if verInfo.Number==""{
panic("invalid version")
}
intervalField, err := elastic.GetDateHistogramIntervalField(targetESVersion, periodInterval )
intervalField, err := elastic.GetDateHistogramIntervalField(verInfo.Distribution,verInfo.Number, periodInterval )
if err != nil {
return nil, fmt.Errorf("get interval field error: %w", err)
}