diff --git a/config/setup/common/agent.tpl b/config/setup/common/agent.tpl index d596939c..e0212fe7 100644 --- a/config/setup/common/agent.tpl +++ b/config/setup/common/agent.tpl @@ -3,7 +3,7 @@ POST $[[SETUP_INDEX_PREFIX]]configs/$[[SETUP_DOC_TYPE]]/system_ingest_config_yml { "payload": { - "content": "\n\nmetrics:\n enabled: true\n queue: metrics\n network:\n enabled: true\n summary: true\n sockets: true\n #throughput: true\n details: true\n memory:\n metrics:\n - swap\n - memory\n disk:\n metrics:\n - iops\n - usage\n cpu:\n metrics:\n - idle\n - system\n - user\n - iowait\n - load\n instance:\n enabled: true\n\nelastic:\n availability_check:\n enabled: false\n\npipeline:\n - name: replicate_message_to_gateway\n enabled: true\n auto_start: true\n keep_running: true\n processor:\n - consumer:\n max_worker_size: 3\n queue_selector:\n keys:\n - metrics\n - logs\n consumer:\n group: replication\n processor:\n - http:\n max_sending_qps: 100\n method: POST\n path: /$[[queue_name]]/_doc/\n headers:\n Content-Type: application/json\n body: $[[message]]\n basic_auth:\n username: ingest\n password: password\n# tls: #for mTLS connection with config servers\n# enabled: true\n# ca_file: /xxx/ca.crt\n# cert_file: /xxx/client.crt\n# key_file: /xxx/client.key\n# skip_insecure_verify: false\n schema: \"http\"\n hosts: # receiver endpoint, fallback in order\n - \"10.0.0.3:8081\"\n valid_status_code: [200,201] #panic on other status code\n\n", + "content": "$[[SETUP_SYSTEM_INGEST_CONFIG]]", "version": 1, "location": "system_ingest_config.yml", "name": "system_ingest_config.yml" @@ -23,7 +23,7 @@ POST $[[SETUP_INDEX_PREFIX]]configs/$[[SETUP_DOC_TYPE]]/system_ingest_config_yml POST $[[SETUP_INDEX_PREFIX]]configs/$[[SETUP_DOC_TYPE]]/task_config_tpl { "payload": { - "content": "\n\nenv:\n CLUSTER_PASSWORD: $[[keystore.$[[CLUSTER_ID]]_password]]\n\nelasticsearch:\n - id: $[[TASK_ID]]\n name: $[[TASK_ID]]\n cluster_uuid: $[[CLUSTER_UUID]]\n enabled: true\n endpoints: $[[CLUSTER_ENDPOINT]]\n discovery:\n enabled: false\n basic_auth:\n username: $[[CLUSTER_USERNAME]]\n password: $[[CLUSTER_PASSWORD]]\n traffic_control:\n enabled: true\n max_qps_per_node: 100\n max_bytes_per_node: 10485760\n max_connection_per_node: 5\n\npipeline:\n\n#node level metrics\n- auto_start: $[[NODE_LEVEL_TASKS_ENABLED]]\n enabled: $[[NODE_LEVEL_TASKS_ENABLED]]\n keep_running: true\n name: collect_$[[TASK_ID]]_es_node_stats\n retry_delay_in_ms: 10000\n processor:\n - es_node_stats:\n elasticsearch: $[[TASK_ID]]\n labels:\n cluster_id: $[[CLUSTER_ID]]\n when:\n cluster_available: [\"$[[TASK_ID]]\"]\n\n#node logs\n- auto_start: $[[NODE_LEVEL_TASKS_ENABLED]]\n enabled: $[[NODE_LEVEL_TASKS_ENABLED]]\n keep_running: true\n name: collect_$[[TASK_ID]]_es_logs\n retry_delay_in_ms: 10000\n processor:\n - es_logs_processor:\n elasticsearch: $[[TASK_ID]]\n labels:\n cluster_id: $[[CLUSTER_ID]]\n logs_path: $[[NODE_LOGS_PATH]]\n queue_name: logs\n when:\n cluster_available: [\"$[[TASK_ID]]\"]\n", + "content": "$[[SETUP_TASK_CONFIG_TPL]]", "version": 1, "location": "task_config.tpl", "name": "task_config.tpl" @@ -43,7 +43,7 @@ POST $[[SETUP_INDEX_PREFIX]]configs/$[[SETUP_DOC_TYPE]]/task_config_tpl POST $[[SETUP_INDEX_PREFIX]]configs/$[[SETUP_DOC_TYPE]]/agent_relay_gateway_config_yml { "payload": { - "content": "\n\npath.data: data\npath.logs: log\n\nallow_multi_instance: true\nconfigs.auto_reload: false\n\nentry:\n - name: my_es_entry\n enabled: true\n router: my_router\n max_concurrency: 200000\n network:\n binding: 0.0.0.0:8081\n# tls: #for mTLS connection with config servers\n# enabled: true\n# ca_file: /xxx/ca.crt\n# cert_file: /xxx/server.crt\n# key_file: /xxx/server.key\n# skip_insecure_verify: false\n\nflow:\n - name: deny_flow\n filter:\n - set_response:\n body: \"request not allowed\"\n status: 500\n - name: ingest_flow\n filter:\n - basic_auth:\n valid_users:\n ingest: n\n - rewrite_to_bulk:\n type_removed: false\n - bulk_request_mutate:\n fix_null_id: true\n generate_enhanced_id: true\n# fix_null_type: true\n# default_type: m-type\n# default_index: m-index\n index_rename:\n metrics: \".infini_metrics\"\n logs: \".infini_logs\"\n - bulk_reshuffle:\n when:\n contains:\n _ctx.request.path: /_bulk\n elasticsearch: prod\n level: node\n partition_size: 1\n fix_null_id: true\n\nrouter:\n - name: my_router\n default_flow: deny_flow\n rules:\n - method:\n - \"POST\"\n enabled: true\n pattern:\n - \"/{any_index}/_doc/\"\n flow:\n - ingest_flow\nelasticsearch:\n - name: prod\n enabled: true\n basic_auth:\n username: ingest\n password: password\n endpoints:\n - http://10.0.0.3:9020\n\npipeline:\n - name: bulk_request_ingest\n auto_start: true\n keep_running: true\n retry_delay_in_ms: 1000\n processor:\n - bulk_indexing:\n max_connection_per_node: 100\n num_of_slices: 3\n max_worker_size: 30\n idle_timeout_in_seconds: 10\n bulk:\n compress: false\n batch_size_in_mb: 10\n batch_size_in_docs: 10000\n consumer:\n fetch_max_messages: 100\n queue_selector:\n labels:\n type: bulk_reshuffle\n", + "content": "$[[SETUP_AGENT_RELAY_GATEWAY_CONFIG]]", "version": 1, "location": "agent_relay_gateway_config.yml", "name": "agent_relay_gateway_config.yml" diff --git a/config/setup/common/data/agent_relay_gateway_config.dat b/config/setup/common/data/agent_relay_gateway_config.dat new file mode 100644 index 00000000..ec471064 --- /dev/null +++ b/config/setup/common/data/agent_relay_gateway_config.dat @@ -0,0 +1,92 @@ +path.data: data +path.logs: log + +allow_multi_instance: true +configs.auto_reload: false + +entry: + - name: my_es_entry + enabled: true + router: my_router + max_concurrency: 200000 + network: + binding: 0.0.0.0:8081 +# tls: #for mTLS connection with config servers +# enabled: true +# ca_file: /xxx/ca.crt +# cert_file: /xxx/server.crt +# key_file: /xxx/server.key +# skip_insecure_verify: false + +flow: + - name: deny_flow + filter: + - set_response: + body: "request not allowed" + status: 500 + - name: ingest_flow + filter: + - basic_auth: + valid_users: + ingest: n + - rewrite_to_bulk: + type_removed: false + - bulk_request_mutate: + fix_null_id: true + generate_enhanced_id: true +# fix_null_type: true +# default_type: m-type +# default_index: m-index + index_rename: + metrics: "$[[SETUP_INDEX_PREFIX]]metrics" + logs: "$[[SETUP_INDEX_PREFIX]]logs" + - bulk_reshuffle: + when: + contains: + _ctx.request.path: /_bulk + elasticsearch: prod + level: node + partition_size: 1 + fix_null_id: true + +router: + - name: my_router + default_flow: deny_flow + rules: + - method: + - "POST" + enabled: true + pattern: + - "/{any_index}/_doc/" + flow: + - ingest_flow + +elasticsearch: + - name: prod + enabled: true + basic_auth: + username: ingest + password: password + endpoints: + - $[[SETUP_ENDPOINT]] + +pipeline: + - name: bulk_request_ingest + auto_start: true + keep_running: true + retry_delay_in_ms: 1000 + processor: + - bulk_indexing: + max_connection_per_node: 100 + num_of_slices: 3 + max_worker_size: 30 + idle_timeout_in_seconds: 10 + bulk: + compress: false + batch_size_in_mb: 10 + batch_size_in_docs: 10000 + consumer: + fetch_max_messages: 100 + queue_selector: + labels: + type: bulk_reshuffle \ No newline at end of file diff --git a/config/setup/common/data/system_ingest_config.dat b/config/setup/common/data/system_ingest_config.dat new file mode 100644 index 00000000..76ee888f --- /dev/null +++ b/config/setup/common/data/system_ingest_config.dat @@ -0,0 +1,66 @@ +metrics: + enabled: true + queue: metrics + network: + enabled: true + summary: true + sockets: true + #throughput: true + details: true + memory: + metrics: + - swap + - memory + disk: + metrics: + - iops + - usage + cpu: + metrics: + - idle + - system + - user + - iowait + - load + instance: + enabled: true + +elastic: + availability_check: + enabled: false + +pipeline: + - name: replicate_message_to_gateway + enabled: true + auto_start: true + keep_running: true + processor: + - consumer: + max_worker_size: 3 + queue_selector: + keys: + - metrics + - logs + consumer: + group: replication + processor: + - http: + max_sending_qps: 100 + method: POST + path: /$[[SETUP_INDEX_PREFIX]]$[[queue_name]]/_doc/ + headers: + Content-Type: application/json + body: $[[message]] + basic_auth: + username: ingest + password: password +# tls: #for mTLS connection with config servers +# enabled: true +# ca_file: /xxx/ca.crt +# cert_file: /xxx/client.crt +# key_file: /xxx/client.key +# skip_insecure_verify: false + schema: "http" + hosts: # receiver endpoint, fallback in order + - "$[[SETUP_ENDPOINT]]" + valid_status_code: [200,201] #panic on other status code \ No newline at end of file diff --git a/config/setup/common/data/task_config_tpl.dat b/config/setup/common/data/task_config_tpl.dat new file mode 100644 index 00000000..4ea796c9 --- /dev/null +++ b/config/setup/common/data/task_config_tpl.dat @@ -0,0 +1,53 @@ +env: + CLUSTER_PASSWORD: $[[keystore.$[[CLUSTER_ID]]_password]] + +elasticsearch: + - id: $[[TASK_ID]] + name: $[[TASK_ID]] + cluster_uuid: $[[CLUSTER_UUID]] + enabled: true + endpoints: $[[CLUSTER_ENDPOINT]] + discovery: + enabled: false + basic_auth: + username: $[[CLUSTER_USERNAME]] + password: $[[CLUSTER_PASSWORD]] + traffic_control: + enabled: true + max_qps_per_node: 100 + max_bytes_per_node: 10485760 + max_connection_per_node: 5 + +pipeline: + +#node level metrics +- auto_start: $[[NODE_LEVEL_TASKS_ENABLED]] + enabled: $[[NODE_LEVEL_TASKS_ENABLED]] + keep_running: true + name: collect_$[[TASK_ID]]_es_node_stats + retry_delay_in_ms: 10000 + processor: + - es_node_stats: + elasticsearch: $[[TASK_ID]] + labels: + cluster_id: $[[CLUSTER_ID]] + cluster_uuid: $[[CLUSTER_UUID]] + when: + cluster_available: ["$[[TASK_ID]]"] + +#node logs +- auto_start: $[[NODE_LEVEL_TASKS_ENABLED]] + enabled: $[[NODE_LEVEL_TASKS_ENABLED]] + keep_running: true + name: collect_$[[TASK_ID]]_es_logs + retry_delay_in_ms: 10000 + processor: + - es_logs_processor: + elasticsearch: $[[TASK_ID]] + labels: + cluster_id: $[[CLUSTER_ID]] + cluster_uuid: $[[CLUSTER_UUID]] + logs_path: $[[NODE_LOGS_PATH]] + queue_name: logs + when: + cluster_available: ["$[[TASK_ID]]"] \ No newline at end of file diff --git a/plugin/setup/setup.go b/plugin/setup/setup.go index 53c17bdc..45710bc6 100644 --- a/plugin/setup/setup.go +++ b/plugin/setup/setup.go @@ -6,12 +6,6 @@ import ( "crypto/md5" "encoding/hex" "fmt" - "infini.sh/framework/core/kv" - "infini.sh/framework/core/model" - "infini.sh/framework/lib/fasthttp" - keystore2 "infini.sh/framework/lib/keystore" - "infini.sh/framework/modules/security" - "strings" log "github.com/cihub/seelog" "github.com/valyala/fasttemplate" "golang.org/x/crypto/bcrypt" @@ -24,21 +18,28 @@ import ( "infini.sh/framework/core/errors" "infini.sh/framework/core/global" "infini.sh/framework/core/keystore" + "infini.sh/framework/core/kv" + "infini.sh/framework/core/model" "infini.sh/framework/core/module" "infini.sh/framework/core/orm" "infini.sh/framework/core/pipeline" "infini.sh/framework/core/util" + "infini.sh/framework/lib/fasthttp" + keystore2 "infini.sh/framework/lib/keystore" elastic2 "infini.sh/framework/modules/elastic" "infini.sh/framework/modules/elastic/adapter" elastic3 "infini.sh/framework/modules/elastic/api" elastic1 "infini.sh/framework/modules/elastic/common" + "infini.sh/framework/modules/security" _ "infini.sh/framework/modules/security" "infini.sh/framework/plugins/replay" "io" + "io/ioutil" "net/http" uri2 "net/url" "path" "runtime" + "strings" "time" ) @@ -116,10 +117,10 @@ type SetupRequest struct { Password string `json:"password"` } `json:"cluster"` - Skip bool `json:"skip"` - BootstrapUsername string `json:"bootstrap_username"` - BootstrapPassword string `json:"bootstrap_password"` - CredentialSecret string `json:"credential_secret"` + Skip bool `json:"skip"` + BootstrapUsername string `json:"bootstrap_username"` + BootstrapPassword string `json:"bootstrap_password"` + CredentialSecret string `json:"credential_secret"` InitializeTemplate string `json:"initialize_template"` } @@ -210,7 +211,7 @@ func (module *Module) validate(w http.ResponseWriter, r *http.Request, ps httpro } cfg1 = elastic1.ORMConfig{} exist, err := env.ParseConfig("elastic.orm", &cfg1) - if exist && err != nil &&global.Env().SystemConfig.Configs.PanicOnConfigError{ + if exist && err != nil && global.Env().SystemConfig.Configs.PanicOnConfigError { panic(err) } @@ -332,10 +333,10 @@ func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps http return } var ( - success = false - errType string - fixTips string - code = 200 + success = false + errType string + fixTips string + code = 200 secretMismatch = false ) defer func() { @@ -423,7 +424,7 @@ func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps http secretMismatch = true } //不存在或者密钥不匹配时保存凭据密钥 - if err == errSecretMismatch || !exists{ + if err == errSecretMismatch || !exists { h := md5.New() rawSecret := []byte(request.CredentialSecret) h.Write(rawSecret) @@ -463,7 +464,7 @@ func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps http } if reuseOldCred { toSaveCfg.CredentialID = oldCfg.CredentialID - }else{ + } else { cred := credential.Credential{ Name: "INFINI_SYSTEM", Type: credential.BasicAuth, @@ -493,8 +494,8 @@ func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps http } //保存默认集群 - t:=time.Now() - toSaveCfg.Created=&t + t := time.Now() + toSaveCfg.Created = &t err = orm.Save(nil, &toSaveCfg) if err != nil { panic(err) @@ -591,6 +592,7 @@ func (module *Module) validateSecret(w http.ResponseWriter, r *http.Request, ps } var errSecretMismatch = fmt.Errorf("invalid credential secret") + func validateCredentialSecret(ormHandler orm.ORM, credentialSecret string) (bool, error) { rkey, err := keystore.GetValue(credential.SecretKey) var exists bool @@ -607,7 +609,7 @@ func validateCredentialSecret(ormHandler orm.ORM, credentialSecret string) (bool if bytes.Compare(rkey, secret) != 0 { return exists, errSecretMismatch } - }else { + } else { exists = false tempCred := credential.Credential{} var result orm.Result @@ -630,6 +632,20 @@ func validateCredentialSecret(ormHandler orm.ORM, credentialSecret string) (bool return exists, nil } +func getYamlData(filename string) []byte { + baseDir := path.Join(global.Env().GetConfigDir(), "setup") + filePath := path.Join(baseDir, "common", "data", filename) + content, err := ioutil.ReadFile(filePath) + if err != nil { + log.Errorf("Error reading YAML file:", err) + return nil + } + // 转义换行符 + escapedContent := bytes.ReplaceAll(content, []byte("\n"), []byte("\\n")) + // 转义双引号 + escapedContent = bytes.ReplaceAll(escapedContent, []byte("\""), []byte("\\\"")) + return escapedContent +} func (module *Module) initializeTemplate(w http.ResponseWriter, r *http.Request, ps httprouter.Params) { if !global.Env().SetupRequired() { module.WriteError(w, "setup not permitted", 500) @@ -639,7 +655,7 @@ func (module *Module) initializeTemplate(w http.ResponseWriter, r *http.Request, if v := recover(); v != nil { module.WriteJSON(w, util.MapStr{ "success": false, - "log": fmt.Sprintf("%v", v), + "log": fmt.Sprintf("%v", v), }, http.StatusOK) } }() @@ -650,13 +666,13 @@ func (module *Module) initializeTemplate(w http.ResponseWriter, r *http.Request, } ver := elastic.GetClient(GlobalSystemElasticsearchID).GetVersion() - if ver.Distribution == ""{ - ver.Distribution=elastic.Elasticsearch + if ver.Distribution == "" { + ver.Distribution = elastic.Elasticsearch } - baseDir := path.Join(global.Env().GetConfigDir(),"setup") + baseDir := path.Join(global.Env().GetConfigDir(), "setup") var ( dslTplFileName = "" - useCommon = true + useCommon = true ) switch request.InitializeTemplate { case "template_ilm": @@ -676,7 +692,7 @@ func (module *Module) initializeTemplate(w http.ResponseWriter, r *http.Request, } if useCommon { baseDir = path.Join(baseDir, "common") - }else{ + } else { baseDir = path.Join(baseDir, ver.Distribution) } @@ -684,7 +700,7 @@ func (module *Module) initializeTemplate(w http.ResponseWriter, r *http.Request, switch ver.Distribution { case elastic.Elasticsearch: majorVersion := elastic.GetClient(GlobalSystemElasticsearchID).GetMajorVersion() - if !useCommon{ + if !useCommon { if majorVersion == 6 { baseDir = path.Join(baseDir, "v6") } else if majorVersion <= 5 { @@ -701,9 +717,9 @@ func (module *Module) initializeTemplate(w http.ResponseWriter, r *http.Request, break } - dslTplFile := path.Join(baseDir ,dslTplFileName) + dslTplFile := path.Join(baseDir, dslTplFileName) if !util.FileExists(dslTplFile) { - panic(errors.Errorf("template file %v for setup was missing",dslTplFile)) + panic(errors.Errorf("template file %v for setup was missing", dslTplFile)) } var dsl []byte @@ -720,12 +736,28 @@ func (module *Module) initializeTemplate(w http.ResponseWriter, r *http.Request, if err != nil { module.WriteJSON(w, util.MapStr{ "success": false, - "log": fmt.Sprintf("new fasttemplate [%s] error: ", err.Error()), + "log": fmt.Sprintf("new fasttemplate [%s] error: ", err.Error()), }, http.StatusOK) return } output := tpl.ExecuteFuncString(func(w io.Writer, tag string) (int, error) { switch tag { + case "SETUP_SYSTEM_INGEST_CONFIG": + return w.Write(getYamlData("system_ingest_config.dat")) + case "SETUP_TASK_CONFIG_TPL": + return w.Write(getYamlData("task_config_tpl.dat")) + case "SETUP_AGENT_RELAY_GATEWAY_CONFIG": + return w.Write(getYamlData("agent_relay_gateway_config.dat")) + } + //ignore unresolved variable + return w.Write([]byte("$[[" + tag + "]]")) + }) + + tpl, err = fasttemplate.NewTemplate(output, "$[[", "]]") + output = tpl.ExecuteFuncString(func(w io.Writer, tag string) (int, error) { + switch tag { + case "SETUP_ENDPOINT": + return w.Write([]byte(request.Cluster.Endpoint)) case "SETUP_TEMPLATE_NAME": return w.Write([]byte(cfg1.TemplateName)) case "SETUP_INDEX_PREFIX": @@ -742,7 +774,7 @@ func (module *Module) initializeTemplate(w http.ResponseWriter, r *http.Request, return w.Write([]byte(docType)) } //ignore unresolved variable - return w.Write([]byte("$[["+tag+"]]")) + return w.Write([]byte("$[[" + tag + "]]")) }) br := bytes.NewReader([]byte(output)) scanner := bufio.NewScanner(br) @@ -771,13 +803,13 @@ func (module *Module) initializeTemplate(w http.ResponseWriter, r *http.Request, if err != nil { module.WriteJSON(w, util.MapStr{ "success": false, - "log": fmt.Sprintf("initalize template [%s] error: ", err.Error()), + "log": fmt.Sprintf("initalize template [%s] error: ", err.Error()), }, http.StatusOK) return } module.WriteJSON(w, util.MapStr{ "success": true, - "log": fmt.Sprintf("initalize template [%s] succeed", request.InitializeTemplate), + "log": fmt.Sprintf("initalize template [%s] succeed", request.InitializeTemplate), }, http.StatusOK) }