From 1ccfdb1abb350007193a6afbc238cc9b47c0d762 Mon Sep 17 00:00:00 2001 From: medcl Date: Sat, 22 Oct 2022 18:40:52 +0800 Subject: [PATCH] add setup module to console --- Jenkinsfile | 2 +- env_check.go => bootstrap_check.go | 35 +- config/initialization.tpl | 333 ++++++++++++++++++ config/system_config.tpl | 76 ++++ config/system_config.yml | 9 + console.yml | 80 +---- main.go | 118 +++++-- plugin/api/alerting/alert.go | 3 +- plugin/api/alerting/api.go | 2 - plugin/api/alerting/message.go | 3 +- plugin/api/alerting/rule.go | 6 +- plugin/api/index_management/common_command.go | 9 +- plugin/api/index_management/elasticsearch.go | 9 +- plugin/api/index_management/index.go | 4 +- plugin/api/index_management/rebuild.go | 7 +- plugin/api/init.go | 1 - plugin/setup/setup.go | 326 +++++++++++++++++ 17 files changed, 871 insertions(+), 152 deletions(-) rename env_check.go => bootstrap_check.go (74%) create mode 100644 config/initialization.tpl create mode 100644 config/system_config.tpl create mode 100644 config/system_config.yml create mode 100644 plugin/setup/setup.go diff --git a/Jenkinsfile b/Jenkinsfile index 37618649..9e96317e 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -32,7 +32,7 @@ pipeline { sh label: 'copy-license', script: 'cd /home/jenkins/go/src/infini.sh/console && cp ../framework/LICENSE bin && cat ../framework/NOTICE NOTICE > bin/NOTICE' - sh label: 'copy-configs', script: 'cd /home/jenkins/go/src/infini.sh/console && mkdir -p bin/config && cp config/*.json bin/config' + sh label: 'copy-configs', script: 'cd /home/jenkins/go/src/infini.sh/console && mkdir -p bin/config && cp config/*.json bin/config && cp config/*.yml bin/config && cp config/*.tpl bin/config' sh label: 'package-linux-amd64', script: 'cd /home/jenkins/go/src/infini.sh/console/bin && tar cfz ${WORKSPACE}/console-$VERSION-$BUILD_NUMBER-linux-amd64.tar.gz console-linux-amd64 console.yml LICENSE NOTICE config' sh label: 'package-linux-386', script: 'cd /home/jenkins/go/src/infini.sh/console/bin && tar cfz ${WORKSPACE}/console-$VERSION-$BUILD_NUMBER-linux-386.tar.gz console-linux-386 console.yml LICENSE NOTICE config' diff --git a/env_check.go b/bootstrap_check.go similarity index 74% rename from env_check.go rename to bootstrap_check.go index 395c7df1..c934f070 100644 --- a/env_check.go +++ b/bootstrap_check.go @@ -8,14 +8,14 @@ import ( "fmt" "github.com/buger/jsonparser" log "github.com/cihub/seelog" - "infini.sh/console/config" "infini.sh/framework/core/elastic" "infini.sh/framework/core/env" + "infini.sh/framework/core/global" "infini.sh/framework/core/util" ) func bootstrapRequirementCheck() error{ - err := checkElasticsearchRequire() + err := checkElasticsearchRequirements() if err != nil { return err } @@ -23,7 +23,7 @@ func bootstrapRequirementCheck() error{ } -func checkElasticsearchRequire() error{ +func checkElasticsearchRequirements() error{ log.Trace("start to check elasticsearch requirement") var esConfigs = []elastic.ElasticsearchConfig{} ok, err := env.ParseConfig("elasticsearch", &esConfigs) @@ -33,27 +33,24 @@ func checkElasticsearchRequire() error{ if !ok { return fmt.Errorf("elasticsearch config section not found") } - appConfig = &config.AppConfig{ - Elasticsearch: "default", - } - ok, err = env.ParseConfig("web", appConfig) - if err != nil { - return fmt.Errorf("parse web config section error: %v", err) - } - if !ok { - return fmt.Errorf("web config section not found") - } - if appConfig.Elasticsearch == "" { - return fmt.Errorf("elasticsearch config of web section can not be empty") + + elasticsearchID:=global.Lookup(elastic.GlobalSystemElasticsearchID) + + if elasticsearchID == nil||elasticsearchID=="" { + return fmt.Errorf("elasticsearch config in web section can not be empty") } + + esID:=elasticsearchID.(string) + var targetEsConfig *elastic.ElasticsearchConfig for _, esConfig := range esConfigs { - if esConfig.Name == appConfig.Elasticsearch { + if esConfig.ID == esID||(esConfig.ID==""&&esConfig.Name==esID) { targetEsConfig = &esConfig } } + if targetEsConfig == nil { - return fmt.Errorf("elasticsearch config named %s not found", appConfig.Elasticsearch) + return fmt.Errorf("elasticsearch config %s was not found", esID) } var req = util.NewGetRequest(targetEsConfig.Endpoint, nil) if targetEsConfig.BasicAuth != nil { @@ -65,6 +62,10 @@ func checkElasticsearchRequire() error{ return fmt.Errorf("check elasticsearch requirement error: %v", err) } + if result==nil||result.Body==nil||len(result.Body)==0{ + return fmt.Errorf("failed to retrive elasticsearch version info") + } + versionNumber, err := jsonparser.GetString(result.Body, "version", "number") if err != nil { return fmt.Errorf("check elasticsearch requirement error: %v, got response: %s", err, string(result.Body)) diff --git a/config/initialization.tpl b/config/initialization.tpl new file mode 100644 index 00000000..0f5b22bd --- /dev/null +++ b/config/initialization.tpl @@ -0,0 +1,333 @@ +PUT _template/.infini +{ + "order": 0, + "index_patterns": [ + ".infini_*" + ], + "settings": { + "index": { + "max_result_window": "10000000", + "mapping": { + "total_fields": { + "limit": "20000" + } + }, + "analysis": { + "analyzer": { + "suggest_text_search": { + "filter": [ + "word_delimiter" + ], + "tokenizer": "classic" + } + } + }, + "number_of_shards": "1" + } + }, + "mappings": { + "dynamic_templates": [ + { + "strings": { + "mapping": { + "ignore_above": 256, + "type": "keyword" + }, + "match_mapping_type": "string" + } + } + ] + }, + "aliases": {} +} + +PUT _ilm/policy/infini_metrics-30days-retention +{ + "policy": { + "phases": { + "hot": { + "min_age": "0ms", + "actions": { + "rollover": { + "max_age": "30d", + "max_size": "50gb" + }, + "set_priority": { + "priority": 100 + } + } + }, + "delete": { + "min_age": "30d", + "actions": { + "delete": { + "delete_searchable_snapshot": true + } + } + } + } + } +} + +PUT _template/.infini_metrics-rollover +{ + "order" : 100000, + "index_patterns" : [ + ".infini_metrics*" + ], + "settings" : { + "index" : { + "format" : "7", + "lifecycle" : { + "name" : "infini_metrics-30days-retention", + "rollover_alias" : ".infini_metrics" + }, + "codec" : "best_compression", + "number_of_shards" : "1", + "translog.durability":"async" + } + }, + "mappings" : { + "dynamic_templates" : [ + { + "strings" : { + "mapping" : { + "ignore_above" : 256, + "type" : "keyword" + }, + "match_mapping_type" : "string" + } + } + ] + }, + "aliases" : { } + } + + +# DELETE .infini_metrics +# DELETE .infini_metrics-00001 +PUT .infini_metrics-00001 +{ + "settings": { + "index.lifecycle.rollover_alias":".infini_metrics" + , "refresh_interval": "5s" + }, + "aliases":{ + ".infini_metrics":{ + "is_write_index":true + } + } +} + + + +PUT _template/.infini_alert-history-rollover +{ + "order" : 100000, + "index_patterns" : [ + ".infini_alert-history*" + ], + "settings" : { + "index" : { + "format" : "7", + "lifecycle" : { + "name" : "infini_metrics-30days-retention", + "rollover_alias" : ".infini_alert-history" + }, + "codec" : "best_compression", + "number_of_shards" : "1", + "translog.durability":"async" + } + }, + "mappings" : { + "dynamic_templates" : [ + { + "strings" : { + "mapping" : { + "ignore_above" : 256, + "type" : "keyword" + }, + "match_mapping_type" : "string" + } + } + ] + }, + "aliases" : { } + } + +# DELETE .infini_alert-history +# DELETE .infini_alert-history-00001 +PUT .infini_alert-history-00001 +{ + "settings": { + "index.lifecycle.rollover_alias":".infini_alert-history" + , "refresh_interval": "5s" + }, + "aliases":{ + ".infini_alert-history":{ + "is_write_index":true + } + }, + "mappings": { + "properties" : { + "condition" : { + "properties" : { + "items" : { + "properties" : { + "expression" : { + "type" : "keyword", + "ignore_above" : 256 + }, + "minimum_period_match" : { + "type" : "long" + }, + "operator" : { + "type" : "keyword", + "ignore_above" : 256 + }, + "severity" : { + "type" : "keyword", + "ignore_above" : 256 + }, + "values" : { + "type" : "keyword", + "ignore_above" : 256 + } + } + }, + "operator" : { + "type" : "keyword", + "ignore_above" : 256 + } + } + }, + "condition_result" : { + "type" : "object", + "enabled" : false + }, + "context" : { + "type" : "keyword", + "copy_to" : [ + "search_text" + ] + }, + "created" : { + "type" : "date" + }, + "expression" : { + "type" : "keyword", + "copy_to" : [ + "search_text" + ] + }, + "id" : { + "type" : "keyword" + }, + "is_escalated" : { + "type" : "boolean" + }, + "is_notified" : { + "type" : "boolean" + }, + "message" : { + "type" : "keyword", + "ignore_above" : 256 + }, + "objects" : { + "type" : "keyword", + "copy_to" : [ + "search_text" + ] + }, + "resource_id" : { + "type" : "keyword" + }, + "resource_name" : { + "type" : "keyword" + }, + "rule_id" : { + "type" : "keyword" + }, + "rule_name" : { + "type" : "keyword" + }, + "search_text" : { + "type" : "text", + "analyzer" : "suggest_text_search", + "index_prefixes" : { + "min_chars" : 2, + "max_chars" : 5 + }, + "index_phrases" : true + }, + "severity" : { + "type" : "keyword" + }, + "state" : { + "type" : "keyword", + "ignore_above" : 256 + }, + "title" : { + "type" : "keyword" + }, + "updated" : { + "type" : "date" + } + } + } +} + + + + +PUT _template/.infini_activities-rollover +{ + "order" : 100000, + "index_patterns" : [ + ".infini_activities*" + ], + "settings" : { + "index" : { + "format" : "7", + "lifecycle" : { + "name" : "infini_metrics-30days-retention", + "rollover_alias" : ".infini_activities" + }, + "codec" : "best_compression", + "number_of_shards" : "1", + "translog.durability":"async" + } + }, + "mappings" : { + "dynamic_templates" : [ + { + "strings" : { + "mapping" : { + "ignore_above" : 256, + "type" : "keyword" + }, + "match_mapping_type" : "string" + } + } + ] + }, + "aliases" : { } + } + +#DELETE .infini_activities +#DELETE .infini_activities-00001 + +PUT .infini_activities-00001 +{ +"settings": { + "index.lifecycle.rollover_alias":".infini_activities" + , "refresh_interval": "5s" +}, +"aliases":{ + ".infini_activities":{ + "is_write_index":true + } +} +} + + diff --git a/config/system_config.tpl b/config/system_config.tpl new file mode 100644 index 00000000..c1f9aaa4 --- /dev/null +++ b/config/system_config.tpl @@ -0,0 +1,76 @@ + +elasticsearch: + - id: $[[CLUSTER_ID]] + name: $[[CLUSTER_ID]] + enabled: true + monitored: false + reserved: true + endpoint: $[[CLUSTER_ENDPINT]] + basic_auth: + username: $[[CLUSTER_USER]] + password: $[[CLUSTER_PASS]] + +elastic.elasticsearch: $[[CLUSTER_ID]] + +pipeline: + - name: indexing_merge + auto_start: true + keep_running: true + processor: + - indexing_merge: + input_queue: "metrics" + elasticsearch: "$[[CLUSTER_ID]]" + index_name: "$[[INDEX_PREFIX]]metrics" + output_queue: + name: "metrics_requests" + label: + tag: "metrics" + worker_size: 1 + bulk_size_in_mb: 5 + - name: consume-metrics_requests + auto_start: true + keep_running: true + processor: + - bulk_indexing: + bulk: + compress: true + batch_size_in_mb: 5 + batch_size_in_docs: 5000 + consumer: + fetch_max_messages: 100 + queues: + type: indexing_merge + when: + cluster_available: ["$[[CLUSTER_ID]]"] + - name: metadata_ingest + auto_start: true + keep_running: true + processor: + - metadata: + bulk_size_in_mb: 5 + bulk_max_docs_count: 5000 + fetch_max_messages: 100 + elasticsearch: "$[[CLUSTER_ID]]" + queues: + type: metadata + category: elasticsearch + consumer: + group: metadata + when: + cluster_available: ["$[[CLUSTER_ID]]"] + - name: activity_ingest + auto_start: true + keep_running: true + processor: + - activity: + bulk_size_in_mb: 5 + bulk_max_docs_count: 5000 + fetch_max_messages: 100 + elasticsearch: "$[[CLUSTER_ID]]" + queues: + category: elasticsearch + activity: true + consumer: + group: activity + when: + cluster_available: ["$[[CLUSTER_ID]]"] diff --git a/config/system_config.yml b/config/system_config.yml new file mode 100644 index 00000000..87531125 --- /dev/null +++ b/config/system_config.yml @@ -0,0 +1,9 @@ +configs.template: + - name: "system" + path: ./config/system_config.tpl + variable: + CLUSTER_ID: _setup_clustercd9s5brq50k75kji57tg + CLUSTER_ENDPINT: "http://192.168.3.188:9206" + CLUSTER_USER: "admin" + CLUSTER_PASS: "pass" + INDEX_PREFIX: ".infini_" \ No newline at end of file diff --git a/console.yml b/console.yml index 7f24d827..23116695 100644 --- a/console.yml +++ b/console.yml @@ -1,20 +1,10 @@ -# for the system cluster, please use Elasticsearch v7.3+ -elasticsearch: - - name: default - enabled: true - monitored: false - endpoint: http://localhost:9200 - basic_auth: - username: elastic - password: infinilabs - discovery: - enabled: true +path.configs: "config" web: enabled: true embedding_api: true auth: - enabled: false + enabled: true ui: enabled: true path: .public @@ -27,7 +17,6 @@ web: enabled: true elastic: - elasticsearch: default enabled: true remote_configs: true health_check: @@ -58,67 +47,4 @@ metrics: enabled: true cluster_stats: true node_stats: true - index_stats: true - -pipeline: - - name: indexing_merge - auto_start: true - keep_running: true - processor: - - indexing_merge: - input_queue: "metrics" - elasticsearch: "default" - index_name: ".infini_metrics" - output_queue: - name: "metrics_requests" - label: - tag: "metrics" - worker_size: 1 - bulk_size_in_mb: 10 - - name: consume-metrics_requests - auto_start: true - keep_running: true - processor: - - bulk_indexing: - bulk: - compress: true - batch_size_in_mb: 10 - batch_size_in_docs: 5000 - consumer: - fetch_max_messages: 100 - queues: - type: indexing_merge - when: - cluster_available: ["default"] - - name: metadata_ingest - auto_start: true - keep_running: true - processor: - - metadata: - bulk_size_in_mb: 10 - bulk_max_docs_count: 5000 - fetch_max_messages: 1000 - elasticsearch: "default" - queues: - type: metadata - category: elasticsearch - consumer: - group: metadata - when: - cluster_available: ["default"] - - name: activity_ingest - auto_start: true - keep_running: true - processor: - - activity: - bulk_size_in_mb: 10 - bulk_max_docs_count: 5000 - fetch_max_messages: 1000 - elasticsearch: "default" - queues: - category: elasticsearch - activity: true - consumer: - group: activity - when: - cluster_available: ["default"] + index_stats: true \ No newline at end of file diff --git a/main.go b/main.go index f481e8e4..4a9c3119 100644 --- a/main.go +++ b/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "errors" _ "expvar" log "github.com/cihub/seelog" @@ -12,10 +13,12 @@ import ( "infini.sh/framework" "infini.sh/framework/core/elastic" "infini.sh/framework/core/env" + "infini.sh/framework/core/global" "infini.sh/framework/core/insight" _ "infini.sh/framework/core/log" "infini.sh/framework/core/module" "infini.sh/framework/core/orm" + task1 "infini.sh/framework/core/task" "infini.sh/framework/modules/agent" _ "infini.sh/framework/modules/api" elastic2 "infini.sh/framework/modules/elastic" @@ -23,10 +26,13 @@ import ( "infini.sh/framework/modules/pipeline" queue2 "infini.sh/framework/modules/queue/disk_queue" "infini.sh/framework/modules/redis" + "infini.sh/framework/modules/security" "infini.sh/framework/modules/stats" "infini.sh/framework/modules/task" "infini.sh/framework/modules/ui" _ "infini.sh/framework/plugins" + setup1 "infini.sh/console/plugin/setup" + _ "infini.sh/console/plugin" api2 "infini.sh/gateway/api" _ "infini.sh/gateway/proxy" _ "time/tzdata" @@ -36,12 +42,7 @@ var appConfig *config.AppConfig var appUI *UI func main() { - terminalHeader := ("\n\n") - terminalHeader += (" ___ __ ___ ___ \n") - terminalHeader += (" / __\\/ / /___\\/\\ /\\ / \\ \n") - terminalHeader += (" / / / / // // / \\ \\/ /\\ / \n") - terminalHeader += ("/ /__/ /__/ \\_//\\ \\_/ / /_// \n") - terminalHeader += ("\\____|____|___/ \\___/___,' \n") + terminalHeader := ("\n") terminalHeader += (" ___ ___ __ __ ___ __ __ \n") terminalHeader += (" / __\\/___\\/\\ \\ \\/ _\\ /___\\/ / /__\\\n") terminalHeader += (" / / // // \\/ /\\ \\ // // / /_\\ \n") @@ -51,7 +52,7 @@ func main() { terminalFooter := "" - app := framework.NewApp("console", "INFINI Cloud Console, The easiest way to operate your own elasticsearch platform.", + app := framework.NewApp("console", "The easiest way to operate your own search platform.", config.Version, config.BuildNumber, config.LastCommitLog, config.BuildDate, config.EOLDate, terminalHeader, terminalFooter) app.Init(nil) @@ -59,28 +60,44 @@ func main() { api := api2.GatewayAPI{} + modules:=[]module.Module{} + modules=append(modules,&stats.SimpleStatsModule{}) + modules=append(modules,&elastic2.ElasticModule{}) + modules=append(modules,&queue2.DiskQueue{}) + modules=append(modules,&redis.RedisModule{}) + modules=append(modules,&pipeline.PipeModule{}) + modules=append(modules,&task.TaskModule{}) + modules=append(modules,&agent.AgentModule{}) + modules=append(modules,&metrics.MetricsModule{}) + + if app.Setup(func() { - err := bootstrapRequirementCheck() - if err != nil { - panic(err) - } //load core modules first - module.RegisterSystemModule(&elastic2.ElasticModule{}) - module.RegisterSystemModule(&stats.SimpleStatsModule{}) - module.RegisterSystemModule(&queue2.DiskQueue{}) - module.RegisterSystemModule(&redis.RedisModule{}) + module.RegisterSystemModule(&setup1.Module{}) module.RegisterSystemModule(&ui.UIModule{}) - module.RegisterSystemModule(&pipeline.PipeModule{}) - module.RegisterSystemModule(&task.TaskModule{}) - module.RegisterSystemModule(&agent.AgentModule{}) - module.RegisterUserPlugin(&metrics.MetricsModule{}) + var initFunc= func() { + module.RegisterSystemModule(&stats.SimpleStatsModule{}) + module.RegisterSystemModule(&elastic2.ElasticModule{}) + module.RegisterSystemModule(&queue2.DiskQueue{}) + module.RegisterSystemModule(&redis.RedisModule{}) + module.RegisterSystemModule(&pipeline.PipeModule{}) + module.RegisterSystemModule(&task.TaskModule{}) + module.RegisterSystemModule(&agent.AgentModule{}) + module.RegisterUserPlugin(&metrics.MetricsModule{}) + module.RegisterUserPlugin(&security.Module{}) + } + + if !global.Env().SetupRequired(){ + initFunc() + }else{ + setup1.RegisterSetupCallback(initFunc) + } api.RegisterAPI("") appConfig = &config.AppConfig{ - Elasticsearch: "default", UI: config.UIConfig{ LocalPath: ".public", VFSEnabled: true, @@ -104,26 +121,51 @@ func main() { module.Start() - //orm.RegisterSchemaWithIndexName(model.Dict{}, "dict") - //orm.RegisterSchemaWithIndexName(model.Reindex{}, "reindex") - orm.RegisterSchemaWithIndexName(elastic.View{}, "view") - orm.RegisterSchemaWithIndexName(elastic.CommonCommand{}, "commands") - //orm.RegisterSchemaWithIndexName(elastic.TraceTemplate{}, "trace-template") - orm.RegisterSchemaWithIndexName(gateway.Instance{}, "gateway-instance") - orm.RegisterSchemaWithIndexName(alerting.Rule{}, "alert-rule") - orm.RegisterSchemaWithIndexName(alerting.Alert{}, "alert-history") - orm.RegisterSchemaWithIndexName(alerting.AlertMessage{}, "alert-message") - orm.RegisterSchemaWithIndexName(alerting.Channel{}, "channel") - orm.RegisterSchemaWithIndexName(insight.Visualization{}, "visualization") - orm.RegisterSchemaWithIndexName(insight.Dashboard{}, "dashboard") - api.RegisterSchema() + var initFunc= func() { + if global.Env().SetupRequired() { - go func() { - err := alerting2.InitTasks() - if err != nil { - log.Errorf("init alerting task error: %v", err) + for _, v := range modules { + v.Setup() + v.Start() + } } - }() + + elastic2.InitTemplate(false) + + //orm.RegisterSchemaWithIndexName(model.Dict{}, "dict") + orm.RegisterSchemaWithIndexName(elastic.View{}, "view") + orm.RegisterSchemaWithIndexName(elastic.CommonCommand{}, "commands") + //orm.RegisterSchemaWithIndexName(elastic.TraceTemplate{}, "trace-template") + orm.RegisterSchemaWithIndexName(gateway.Instance{}, "gateway-instance") + orm.RegisterSchemaWithIndexName(alerting.Rule{}, "alert-rule") + orm.RegisterSchemaWithIndexName(alerting.Alert{}, "alert-history") + orm.RegisterSchemaWithIndexName(alerting.AlertMessage{}, "alert-message") + orm.RegisterSchemaWithIndexName(alerting.Channel{}, "channel") + orm.RegisterSchemaWithIndexName(insight.Visualization{}, "visualization") + orm.RegisterSchemaWithIndexName(insight.Dashboard{}, "dashboard") + api.RegisterSchema() + + task1.RunWithinGroup("initialize_alerting",func(ctx context.Context) error { + err := alerting2.InitTasks() + if err != nil { + log.Errorf("init alerting task error: %v", err) + } + return err + }) + } + + if !global.Env().SetupRequired(){ + initFunc() + }else{ + setup1.RegisterSetupCallback(initFunc) + } + + if !global.Env().SetupRequired(){ + err := bootstrapRequirementCheck() + if err != nil { + panic(err) + } + } }, nil) { app.Run() diff --git a/plugin/api/alerting/alert.go b/plugin/api/alerting/alert.go index e62579f9..3336a2e4 100644 --- a/plugin/api/alerting/alert.go +++ b/plugin/api/alerting/alert.go @@ -10,6 +10,7 @@ import ( "infini.sh/console/model/alerting" httprouter "infini.sh/framework/core/api/router" "infini.sh/framework/core/elastic" + "infini.sh/framework/core/global" "infini.sh/framework/core/orm" "infini.sh/framework/core/util" "net/http" @@ -109,7 +110,7 @@ func (h *AlertAPI) searchAlert(w http.ResponseWriter, req *http.Request, ps http } func (h *AlertAPI) getAlertStats(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - esClient := elastic.GetClient(h.Config.Elasticsearch) + esClient := elastic.GetClient(global.MustLookupString(elastic.GlobalSystemElasticsearchID)) queryDsl := util.MapStr{ "size": 0, "query": util.MapStr{ diff --git a/plugin/api/alerting/api.go b/plugin/api/alerting/api.go index 0e0627b2..6ce66dea 100644 --- a/plugin/api/alerting/api.go +++ b/plugin/api/alerting/api.go @@ -5,7 +5,6 @@ package alerting import ( - "infini.sh/console/config" "infini.sh/framework/core/api" "infini.sh/framework/core/api/rbac/enum" ) @@ -13,7 +12,6 @@ import ( type AlertAPI struct { api.Handler - Config *config.AppConfig } func (alert *AlertAPI) Init() { diff --git a/plugin/api/alerting/message.go b/plugin/api/alerting/message.go index ae98e2cb..d52d349d 100644 --- a/plugin/api/alerting/message.go +++ b/plugin/api/alerting/message.go @@ -11,6 +11,7 @@ import ( alerting2 "infini.sh/console/service/alerting" httprouter "infini.sh/framework/core/api/router" "infini.sh/framework/core/elastic" + "infini.sh/framework/core/global" "infini.sh/framework/core/kv" "infini.sh/framework/core/orm" "infini.sh/framework/core/util" @@ -82,7 +83,7 @@ func (h *AlertAPI) ignoreAlertMessage(w http.ResponseWriter, req *http.Request, } func (h *AlertAPI) getAlertMessageStats(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - esClient := elastic.GetClient(h.Config.Elasticsearch) + esClient := elastic.GetClient(global.MustLookupString(elastic.GlobalSystemElasticsearchID)) must := []util.MapStr{ { "terms": util.MapStr{ diff --git a/plugin/api/alerting/rule.go b/plugin/api/alerting/rule.go index ee89bc70..e28b07e4 100644 --- a/plugin/api/alerting/rule.go +++ b/plugin/api/alerting/rule.go @@ -14,6 +14,7 @@ import ( httprouter "infini.sh/framework/core/api/router" "infini.sh/framework/core/elastic" "infini.sh/framework/core/event" + "infini.sh/framework/core/global" "infini.sh/framework/core/kv" "infini.sh/framework/core/orm" "infini.sh/framework/core/queue" @@ -462,7 +463,8 @@ func (alertAPI *AlertAPI) searchRule(w http.ResponseWriter, req *http.Request, p } func (alertAPI *AlertAPI) getRuleAlertMessageNumbers(ruleIDs []string) ( map[string]interface{},error) { - esClient := elastic.GetClient(alertAPI.Config.Elasticsearch) + + esClient := elastic.GetClient(global.MustLookupString(elastic.GlobalSystemElasticsearchID)) queryDsl := util.MapStr{ "size": 0, "query": util.MapStr{ @@ -513,7 +515,7 @@ func (alertAPI *AlertAPI) fetchAlertInfos(w http.ResponseWriter, req *http.Reque alertAPI.WriteJSON(w, util.MapStr{}, http.StatusOK) return } - esClient := elastic.GetClient(alertAPI.Config.Elasticsearch) + esClient := elastic.GetClient(global.MustLookupString(elastic.GlobalSystemElasticsearchID)) queryDsl := util.MapStr{ "_source": []string{"state", "rule_id"}, "sort": []util.MapStr{ diff --git a/plugin/api/index_management/common_command.go b/plugin/api/index_management/common_command.go index c5dd19c9..08e5bcf2 100644 --- a/plugin/api/index_management/common_command.go +++ b/plugin/api/index_management/common_command.go @@ -5,6 +5,7 @@ import ( log "github.com/cihub/seelog" httprouter "infini.sh/framework/core/api/router" "infini.sh/framework/core/elastic" + "infini.sh/framework/core/global" "infini.sh/framework/core/orm" "infini.sh/framework/core/util" "net/http" @@ -28,7 +29,7 @@ func (h *APIHandler) HandleAddCommonCommandAction(w http.ResponseWriter, req *ht reqParams.Created = time.Now() reqParams.ID = util.GetUUID() - esClient := elastic.GetClient(h.Config.Elasticsearch) + esClient := elastic.GetClient(global.MustLookupString(elastic.GlobalSystemElasticsearchID)) queryDSL :=[]byte(fmt.Sprintf(`{"size":1, "query":{"bool":{"must":{"match":{"title.keyword":"%s"}}}}}`, reqParams.Title)) var indexName = orm.GetIndexName(reqParams) @@ -73,7 +74,7 @@ func (h *APIHandler) HandleSaveCommonCommandAction(w http.ResponseWriter, req *h return } reqParams.ID = ps.ByName("cid") - esClient := elastic.GetClient(h.Config.Elasticsearch) + esClient := elastic.GetClient(global.MustLookupString(elastic.GlobalSystemElasticsearchID)) queryDSL :=[]byte(fmt.Sprintf(`{"size":1, "query":{"bool":{"must":{"match":{"title.keyword":"%s"}}}}}`, reqParams.Title)) var indexName = orm.GetIndexName(reqParams) @@ -133,7 +134,7 @@ func (h *APIHandler) HandleQueryCommonCommandAction(w http.ResponseWriter, req * } queryDSL = fmt.Sprintf(queryDSL, filterBuilder.String(), size, from) - esClient := elastic.GetClient(h.Config.Elasticsearch) + esClient := elastic.GetClient(global.MustLookupString(elastic.GlobalSystemElasticsearchID)) searchRes, err := esClient.SearchWithRawQueryDSL(orm.GetIndexName(elastic.CommonCommand{}), []byte(queryDSL)) if err != nil { @@ -149,7 +150,7 @@ func (h *APIHandler) HandleQueryCommonCommandAction(w http.ResponseWriter, req * func (h *APIHandler) HandleDeleteCommonCommandAction(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { resBody := map[string]interface{}{} id := ps.ByName("cid") - esClient := elastic.GetClient(h.Config.Elasticsearch) + esClient := elastic.GetClient(global.MustLookupString(elastic.GlobalSystemElasticsearchID)) delRes, err := esClient.Delete(orm.GetIndexName(elastic.CommonCommand{}), "", id, "wait_for") if err != nil { log.Error(err) diff --git a/plugin/api/index_management/elasticsearch.go b/plugin/api/index_management/elasticsearch.go index 50a2719c..ba815d6c 100644 --- a/plugin/api/index_management/elasticsearch.go +++ b/plugin/api/index_management/elasticsearch.go @@ -6,6 +6,7 @@ import ( httprouter "infini.sh/framework/core/api/router" "infini.sh/framework/core/elastic" "infini.sh/framework/core/event" + "infini.sh/framework/core/global" "infini.sh/framework/core/host" "infini.sh/framework/core/orm" "infini.sh/framework/core/util" @@ -25,7 +26,7 @@ func (handler APIHandler) ElasticsearchOverviewAction(w http.ResponseWriter, req // clusterIDs = append(clusterIDs, key) // return true //}) - esClient := elastic.GetClient(handler.Config.Elasticsearch) + esClient := elastic.GetClient(global.MustLookupString(elastic.GlobalSystemElasticsearchID)) queryDsl := util.MapStr{ "size": 100, } @@ -110,7 +111,7 @@ func (handler APIHandler) ElasticsearchOverviewAction(w http.ResponseWriter, req } func (handler APIHandler) getLatestClusterMonitorData(clusterIDs []interface{}) (*elastic.SearchResponse, error){ - client := elastic.GetClient(handler.Config.Elasticsearch) + client := elastic.GetClient(global.MustLookupString(elastic.GlobalSystemElasticsearchID)) queryDSLTpl := `{ "size": %d, "query": { @@ -155,7 +156,7 @@ func (handler APIHandler) getLatestClusterMonitorData(clusterIDs []interface{}) } func (handler APIHandler) getMetricCount(indexName, field string, clusterIDs []interface{}) (interface{}, error){ - client := elastic.GetClient(handler.Config.Elasticsearch) + client := elastic.GetClient(global.MustLookupString(elastic.GlobalSystemElasticsearchID)) queryDSL := util.MapStr{ "size": 0, "aggs": util.MapStr{ @@ -182,7 +183,7 @@ func (handler APIHandler) getMetricCount(indexName, field string, clusterIDs []i } func (handler APIHandler) getLastActiveHostCount() (int, error){ - client := elastic.GetClient(handler.Config.Elasticsearch) + client := elastic.GetClient(global.MustLookupString(elastic.GlobalSystemElasticsearchID)) queryDSL := `{ "size": 0, "query": { diff --git a/plugin/api/index_management/index.go b/plugin/api/index_management/index.go index 99ee9422..9f6d637c 100644 --- a/plugin/api/index_management/index.go +++ b/plugin/api/index_management/index.go @@ -1,6 +1,8 @@ package index_management import ( + "infini.sh/framework/core/elastic" + "infini.sh/framework/core/global" "net/http" "strconv" "strings" @@ -33,7 +35,7 @@ func (handler APIHandler) GetDictListAction(w http.ResponseWriter, req *http.Req if len(tags) > 3 { tags = tags[0:3] } - rel, err := model2.GetDictList(from, size, name, tags, handler.Config.Elasticsearch) + rel, err := model2.GetDictList(from, size, name, tags, global.MustLookupString(elastic.GlobalSystemElasticsearchID)) if err != nil { resp["error"] = err resp["status"] = false diff --git a/plugin/api/index_management/rebuild.go b/plugin/api/index_management/rebuild.go index 25799bb6..2853cd35 100644 --- a/plugin/api/index_management/rebuild.go +++ b/plugin/api/index_management/rebuild.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" log "github.com/cihub/seelog" + "infini.sh/framework/core/global" "infini.sh/framework/core/orm" "net/http" "strings" @@ -33,7 +34,7 @@ func (handler APIHandler) HandleReindexAction(w http.ResponseWriter, req *http.R //fmt.Println(reindexItem) typ := handler.GetParameter(req, "_type") - ID, err := reindex(handler.Config.Elasticsearch, reindexItem, typ) + ID, err := reindex(global.MustLookupString(elastic.GlobalSystemElasticsearchID), reindexItem, typ) if err != nil { log.Error(err) resResult["error"] = err @@ -94,7 +95,7 @@ func (handler APIHandler) HandleDeleteRebuildAction(w http.ResponseWriter, req * id := ps.ByName("id") var ids = []string{id} resBody := newResponseBody() - err := deleteTasksByIds(handler.Config.Elasticsearch, ids) + err := deleteTasksByIds(global.MustLookupString(elastic.GlobalSystemElasticsearchID), ids) if err != nil { log.Error(err) resBody["error"] = err @@ -111,7 +112,7 @@ func (handler APIHandler) HandleGetRebuildListAction(w http.ResponseWriter, req size = handler.GetIntOrDefault(req, "size", 10) name = handler.GetParameter(req, "name") resBody = newResponseBody() - esName = handler.Config.Elasticsearch + esName = global.MustLookupString(elastic.GlobalSystemElasticsearchID) ) esResp, err := model.GetRebuildList(esName, from, size, name) if err != nil { diff --git a/plugin/api/init.go b/plugin/api/init.go index 4e43870e..72818baf 100644 --- a/plugin/api/init.go +++ b/plugin/api/init.go @@ -58,7 +58,6 @@ func Init(cfg *config.AppConfig) { //}) alertAPI := alerting.AlertAPI{ - Config: cfg, } alertAPI.Init() diff --git a/plugin/setup/setup.go b/plugin/setup/setup.go new file mode 100644 index 00000000..6f24bc01 --- /dev/null +++ b/plugin/setup/setup.go @@ -0,0 +1,326 @@ +package task + +import ( + "bytes" + "fmt" + "infini.sh/framework/core/api" + httprouter "infini.sh/framework/core/api/router" + "infini.sh/framework/core/elastic" + "infini.sh/framework/core/env" + "infini.sh/framework/core/errors" + "infini.sh/framework/core/global" + "infini.sh/framework/core/module" + "infini.sh/framework/core/orm" + "infini.sh/framework/core/util" + elastic1 "infini.sh/framework/modules/elastic/common" + elastic2 "infini.sh/framework/modules/elastic" + "net/http" + "path" + "runtime" + "time" +) + +type Module struct { + api.Handler +} + +func (module *Module) Name() string { + return "setup" +} + +func init() { + module.RegisterSystemModule(&Module{}) +} + +func (module *Module) Setup() { + + if !global.Env().SetupRequired(){ + return + } + + api.HandleAPIMethod(api.POST, "/setup/_validate", module.validate) + api.HandleAPIMethod(api.POST, "/setup/_initialize", module.initialize) +} + +var setupFinishedCallback= []func() {} +func RegisterSetupCallback(f func()) { + setupFinishedCallback=append(setupFinishedCallback,f) +} + +func InvokeSetupCallback() { + for _,v:=range setupFinishedCallback{ + v() + } +} + +func (module *Module) Start() error { + return nil +} +func (module *Module) Stop() error { + return nil +} + +type SetupRequest struct { + Cluster struct { + Endpoint string `json:"endpoint"` + Username string `json:"username"` + Password string `json:"password"` + } `json:"cluster"` + + BootstrapUsername string `json:"bootstrap_username"` + BootstrapPassword string `json:"bootstrap_password"` +} + +var tempID="_setup_cluster"+util.GetUUID() + +const VersionTooOld ="elasticsearch_version_too_old" +const IndicesExists ="elasticsearch_indices_exists" +const TemplateExists ="elasticsearch_template_exists" + +var cfg1 elastic1.ORMConfig + +func (module *Module) validate(w http.ResponseWriter, r *http.Request, ps httprouter.Params) { + + if !global.Env().SetupRequired(){ + module.WriteError(w, "setup not permitted", 500) + return + } + + success:=false + var err error + var errType string + var fixTips string + var code int + code=200 + defer func() { + result := util.MapStr{} + result["success"]=success + + if r := recover(); r != nil { + var v string + switch r.(type) { + case error: + v = r.(error).Error() + case runtime.Error: + v = r.(runtime.Error).Error() + case string: + v = r.(string) + } + if v!=""{ + success=false + result["error"]=util.MapStr{ + "reason":v, + } + if errType!=""{ + result["type"]=errType + } + if fixTips!=""{ + result["fix_tips"]=fixTips + } + code=500 + } + } + module.WriteJSON(w, result, code) + }() + + err, client := module.initTempClient(r) + if err!=nil{ + panic(err) + } + + //validate version + version := client.GetVersion() + if version != "" { + ver := &util.Version{} + ver, err = util.ParseSemantic(version) + if err != nil { + panic(err) + } + if ver.Major() >= 7 { + if ver.Major() == 7 && ver.Minor() < 3 { + errType = VersionTooOld + panic(errors.Errorf("elasticsearch version(%v) should greater than v7.3", version)) + } + } else { + errType = VersionTooOld + panic(errors.Errorf("elasticsearch version(%v) should greater than v7.3", version)) + } + } + cfg1 = elastic1.ORMConfig{} + exist, err := env.ParseConfig("elastic.orm", &cfg1) + if exist && err != nil { + panic(err) + } + + if cfg1.IndexPrefix==""{ + cfg1.IndexPrefix=".infini_" + } + if cfg1.TemplateName==""{ + cfg1.TemplateName=".infini" + } + + //validate indices + indices, err := client.GetIndices(util.TrimSpaces(cfg1.IndexPrefix) + "*") + if err != nil { + panic(err) + } + + if indices != nil && len(*indices) > 0 { + buff := bytes.Buffer{} + for k, _ := range *indices { + buff.WriteString(k) + buff.WriteString("\n") + } + errType = IndicesExists + fixTips="DELETE "+util.TrimSpaces(cfg1.IndexPrefix) + "*" + panic(errors.Errorf("there are following indices exists in target elasticsearch: \n%v", buff.String())) + } + + ok, err := client.TemplateExists(cfg1.TemplateName) + if err != nil { + panic(err) + } + if ok { + errType = TemplateExists + fixTips="DELETE /_template/"+util.TrimSpaces(cfg1.TemplateName) + panic(errors.Errorf("there are following template already exists in target elasticsearch: \n%v", cfg1.TemplateName)) + } + + success = true +} +var cfg elastic.ElasticsearchConfig +func (module *Module) initTempClient(r *http.Request) (error, elastic.API) { + request := SetupRequest{} + err := module.DecodeJSON(r, &request) + if err != nil { + return err,nil + } + + cfg = elastic.ElasticsearchConfig{ + Enabled: true, + Reserved: true, + Endpoint: request.Cluster.Endpoint, + BasicAuth: &elastic.BasicAuth{ + Username: request.Cluster.Username, + Password: request.Cluster.Password, + }, + } + + cfg.ID = tempID + cfg.Name = "INFINI_SYSTEM ("+util.PickRandomName()+")" + elastic.InitMetadata(&cfg, true) + client, err := elastic1.InitClientWithConfig(cfg) + if err != nil { + return err,nil + } + + elastic.UpdateConfig(cfg) + elastic.UpdateClient(cfg, client) + + global.Register(elastic.GlobalSystemElasticsearchID,tempID) + + return err, client +} + +func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps httprouter.Params) { + if !global.Env().SetupRequired(){ + module.WriteError(w, "setup not permitted", 500) + return + } + + success:=false + var err error + var errType string + var fixTips string + var code int + code=200 + defer func() { + result := util.MapStr{} + result["success"]=success + + if r := recover(); r != nil { + var v string + switch r.(type) { + case error: + v = r.(error).Error() + case runtime.Error: + v = r.(runtime.Error).Error() + case string: + v = r.(string) + } + if v!=""{ + success=false + result["error"]=util.MapStr{ + "reason":v, + } + if errType!=""{ + result["type"]=errType + } + if fixTips!=""{ + result["fix_tips"]=fixTips + } + code=500 + } + } + module.WriteJSON(w, result, code) + }() + + err, client := module.initTempClient(r) + if err!=nil{ + panic(err) + } + + if cfg1.IndexPrefix==""{ + cfg1.IndexPrefix=".infini_" + } + if cfg1.TemplateName==""{ + cfg1.TemplateName=".infini" + } + + if !cfg1.Enabled{ + cfg1.Enabled=true + } + + if !cfg1.InitTemplate{ + cfg1.InitTemplate=true + } + + cfg.Reserved=true + + //处理ORM + handler := elastic2.ElasticORM{Client: client, Config:cfg1 } + orm.Register("elastic_setup", handler) + + //处理模版 + elastic2.InitTemplate(true) + elastic2.InitSchema() + + //保存默认集群 + err=orm.Save(&cfg) + if err!=nil{ + panic(err) + } + + //save to local file + file:=path.Join(global.Env().GetConfigDir(),"system_config.yml") + util.FilePutContent(file,fmt.Sprintf("configs.template:\n - name: \"system\"\n path: ./config/system_config.tpl\n variable:\n " + + "CLUSTER_ID: %v\n CLUSTER_ENDPINT: \"%v\"\n " + + "CLUSTER_USER: \"%v\"\n CLUSTER_PASS: \"%v\"\n INDEX_PREFIX: \"%v\"", + tempID,cfg.Endpoint,cfg.BasicAuth.Username,cfg.BasicAuth.Password,cfg1.IndexPrefix )) + + //处理 ILM + //处理默认用户信息 + + //callback + InvokeSetupCallback() + + //place setup lock file + setupLock:=path.Join(global.Env().GetDataDir(),".setup_lock") + _,err=util.FilePutContent(setupLock,time.Now().String()) + if err!=nil{ + panic(err) + } + + success=true + +}