fix: agent first run need cluster_uuid & change for agent.tpl with yaml content

This commit is contained in:
hardy 2024-02-29 11:52:13 +08:00
parent 855c4849cb
commit 200ffd7af6
No known key found for this signature in database
GPG Key ID: D1ED7F7A9ED520C3
5 changed files with 279 additions and 36 deletions

View File

@ -3,7 +3,7 @@
POST $[[SETUP_INDEX_PREFIX]]configs/$[[SETUP_DOC_TYPE]]/system_ingest_config_yml
{
"payload": {
"content": "\n\nmetrics:\n enabled: true\n queue: metrics\n network:\n enabled: true\n summary: true\n sockets: true\n #throughput: true\n details: true\n memory:\n metrics:\n - swap\n - memory\n disk:\n metrics:\n - iops\n - usage\n cpu:\n metrics:\n - idle\n - system\n - user\n - iowait\n - load\n instance:\n enabled: true\n\nelastic:\n availability_check:\n enabled: false\n\npipeline:\n - name: replicate_message_to_gateway\n enabled: true\n auto_start: true\n keep_running: true\n processor:\n - consumer:\n max_worker_size: 3\n queue_selector:\n keys:\n - metrics\n - logs\n consumer:\n group: replication\n processor:\n - http:\n max_sending_qps: 100\n method: POST\n path: /$[[queue_name]]/_doc/\n headers:\n Content-Type: application/json\n body: $[[message]]\n basic_auth:\n username: ingest\n password: password\n# tls: #for mTLS connection with config servers\n# enabled: true\n# ca_file: /xxx/ca.crt\n# cert_file: /xxx/client.crt\n# key_file: /xxx/client.key\n# skip_insecure_verify: false\n schema: \"http\"\n hosts: # receiver endpoint, fallback in order\n - \"10.0.0.3:8081\"\n valid_status_code: [200,201] #panic on other status code\n\n",
"content": "$[[SETUP_SYSTEM_INGEST_CONFIG]]",
"version": 1,
"location": "system_ingest_config.yml",
"name": "system_ingest_config.yml"
@ -23,7 +23,7 @@ POST $[[SETUP_INDEX_PREFIX]]configs/$[[SETUP_DOC_TYPE]]/system_ingest_config_yml
POST $[[SETUP_INDEX_PREFIX]]configs/$[[SETUP_DOC_TYPE]]/task_config_tpl
{
"payload": {
"content": "\n\nenv:\n CLUSTER_PASSWORD: $[[keystore.$[[CLUSTER_ID]]_password]]\n\nelasticsearch:\n - id: $[[TASK_ID]]\n name: $[[TASK_ID]]\n cluster_uuid: $[[CLUSTER_UUID]]\n enabled: true\n endpoints: $[[CLUSTER_ENDPOINT]]\n discovery:\n enabled: false\n basic_auth:\n username: $[[CLUSTER_USERNAME]]\n password: $[[CLUSTER_PASSWORD]]\n traffic_control:\n enabled: true\n max_qps_per_node: 100\n max_bytes_per_node: 10485760\n max_connection_per_node: 5\n\npipeline:\n\n#node level metrics\n- auto_start: $[[NODE_LEVEL_TASKS_ENABLED]]\n enabled: $[[NODE_LEVEL_TASKS_ENABLED]]\n keep_running: true\n name: collect_$[[TASK_ID]]_es_node_stats\n retry_delay_in_ms: 10000\n processor:\n - es_node_stats:\n elasticsearch: $[[TASK_ID]]\n labels:\n cluster_id: $[[CLUSTER_ID]]\n when:\n cluster_available: [\"$[[TASK_ID]]\"]\n\n#node logs\n- auto_start: $[[NODE_LEVEL_TASKS_ENABLED]]\n enabled: $[[NODE_LEVEL_TASKS_ENABLED]]\n keep_running: true\n name: collect_$[[TASK_ID]]_es_logs\n retry_delay_in_ms: 10000\n processor:\n - es_logs_processor:\n elasticsearch: $[[TASK_ID]]\n labels:\n cluster_id: $[[CLUSTER_ID]]\n logs_path: $[[NODE_LOGS_PATH]]\n queue_name: logs\n when:\n cluster_available: [\"$[[TASK_ID]]\"]\n",
"content": "$[[SETUP_TASK_CONFIG_TPL]]",
"version": 1,
"location": "task_config.tpl",
"name": "task_config.tpl"
@ -43,7 +43,7 @@ POST $[[SETUP_INDEX_PREFIX]]configs/$[[SETUP_DOC_TYPE]]/task_config_tpl
POST $[[SETUP_INDEX_PREFIX]]configs/$[[SETUP_DOC_TYPE]]/agent_relay_gateway_config_yml
{
"payload": {
"content": "\n\npath.data: data\npath.logs: log\n\nallow_multi_instance: true\nconfigs.auto_reload: false\n\nentry:\n - name: my_es_entry\n enabled: true\n router: my_router\n max_concurrency: 200000\n network:\n binding: 0.0.0.0:8081\n# tls: #for mTLS connection with config servers\n# enabled: true\n# ca_file: /xxx/ca.crt\n# cert_file: /xxx/server.crt\n# key_file: /xxx/server.key\n# skip_insecure_verify: false\n\nflow:\n - name: deny_flow\n filter:\n - set_response:\n body: \"request not allowed\"\n status: 500\n - name: ingest_flow\n filter:\n - basic_auth:\n valid_users:\n ingest: n\n - rewrite_to_bulk:\n type_removed: false\n - bulk_request_mutate:\n fix_null_id: true\n generate_enhanced_id: true\n# fix_null_type: true\n# default_type: m-type\n# default_index: m-index\n index_rename:\n metrics: \".infini_metrics\"\n logs: \".infini_logs\"\n - bulk_reshuffle:\n when:\n contains:\n _ctx.request.path: /_bulk\n elasticsearch: prod\n level: node\n partition_size: 1\n fix_null_id: true\n\nrouter:\n - name: my_router\n default_flow: deny_flow\n rules:\n - method:\n - \"POST\"\n enabled: true\n pattern:\n - \"/{any_index}/_doc/\"\n flow:\n - ingest_flow\nelasticsearch:\n - name: prod\n enabled: true\n basic_auth:\n username: ingest\n password: password\n endpoints:\n - http://10.0.0.3:9020\n\npipeline:\n - name: bulk_request_ingest\n auto_start: true\n keep_running: true\n retry_delay_in_ms: 1000\n processor:\n - bulk_indexing:\n max_connection_per_node: 100\n num_of_slices: 3\n max_worker_size: 30\n idle_timeout_in_seconds: 10\n bulk:\n compress: false\n batch_size_in_mb: 10\n batch_size_in_docs: 10000\n consumer:\n fetch_max_messages: 100\n queue_selector:\n labels:\n type: bulk_reshuffle\n",
"content": "$[[SETUP_AGENT_RELAY_GATEWAY_CONFIG]]",
"version": 1,
"location": "agent_relay_gateway_config.yml",
"name": "agent_relay_gateway_config.yml"

View File

@ -0,0 +1,92 @@
path.data: data
path.logs: log
allow_multi_instance: true
configs.auto_reload: false
entry:
- name: my_es_entry
enabled: true
router: my_router
max_concurrency: 200000
network:
binding: 0.0.0.0:8081
# tls: #for mTLS connection with config servers
# enabled: true
# ca_file: /xxx/ca.crt
# cert_file: /xxx/server.crt
# key_file: /xxx/server.key
# skip_insecure_verify: false
flow:
- name: deny_flow
filter:
- set_response:
body: "request not allowed"
status: 500
- name: ingest_flow
filter:
- basic_auth:
valid_users:
ingest: n
- rewrite_to_bulk:
type_removed: false
- bulk_request_mutate:
fix_null_id: true
generate_enhanced_id: true
# fix_null_type: true
# default_type: m-type
# default_index: m-index
index_rename:
metrics: "$[[SETUP_INDEX_PREFIX]]metrics"
logs: "$[[SETUP_INDEX_PREFIX]]logs"
- bulk_reshuffle:
when:
contains:
_ctx.request.path: /_bulk
elasticsearch: prod
level: node
partition_size: 1
fix_null_id: true
router:
- name: my_router
default_flow: deny_flow
rules:
- method:
- "POST"
enabled: true
pattern:
- "/{any_index}/_doc/"
flow:
- ingest_flow
elasticsearch:
- name: prod
enabled: true
basic_auth:
username: ingest
password: password
endpoints:
- $[[SETUP_ENDPOINT]]
pipeline:
- name: bulk_request_ingest
auto_start: true
keep_running: true
retry_delay_in_ms: 1000
processor:
- bulk_indexing:
max_connection_per_node: 100
num_of_slices: 3
max_worker_size: 30
idle_timeout_in_seconds: 10
bulk:
compress: false
batch_size_in_mb: 10
batch_size_in_docs: 10000
consumer:
fetch_max_messages: 100
queue_selector:
labels:
type: bulk_reshuffle

View File

@ -0,0 +1,66 @@
metrics:
enabled: true
queue: metrics
network:
enabled: true
summary: true
sockets: true
#throughput: true
details: true
memory:
metrics:
- swap
- memory
disk:
metrics:
- iops
- usage
cpu:
metrics:
- idle
- system
- user
- iowait
- load
instance:
enabled: true
elastic:
availability_check:
enabled: false
pipeline:
- name: replicate_message_to_gateway
enabled: true
auto_start: true
keep_running: true
processor:
- consumer:
max_worker_size: 3
queue_selector:
keys:
- metrics
- logs
consumer:
group: replication
processor:
- http:
max_sending_qps: 100
method: POST
path: /$[[SETUP_INDEX_PREFIX]]$[[queue_name]]/_doc/
headers:
Content-Type: application/json
body: $[[message]]
basic_auth:
username: ingest
password: password
# tls: #for mTLS connection with config servers
# enabled: true
# ca_file: /xxx/ca.crt
# cert_file: /xxx/client.crt
# key_file: /xxx/client.key
# skip_insecure_verify: false
schema: "http"
hosts: # receiver endpoint, fallback in order
- "$[[SETUP_ENDPOINT]]"
valid_status_code: [200,201] #panic on other status code

View File

@ -0,0 +1,53 @@
env:
CLUSTER_PASSWORD: $[[keystore.$[[CLUSTER_ID]]_password]]
elasticsearch:
- id: $[[TASK_ID]]
name: $[[TASK_ID]]
cluster_uuid: $[[CLUSTER_UUID]]
enabled: true
endpoints: $[[CLUSTER_ENDPOINT]]
discovery:
enabled: false
basic_auth:
username: $[[CLUSTER_USERNAME]]
password: $[[CLUSTER_PASSWORD]]
traffic_control:
enabled: true
max_qps_per_node: 100
max_bytes_per_node: 10485760
max_connection_per_node: 5
pipeline:
#node level metrics
- auto_start: $[[NODE_LEVEL_TASKS_ENABLED]]
enabled: $[[NODE_LEVEL_TASKS_ENABLED]]
keep_running: true
name: collect_$[[TASK_ID]]_es_node_stats
retry_delay_in_ms: 10000
processor:
- es_node_stats:
elasticsearch: $[[TASK_ID]]
labels:
cluster_id: $[[CLUSTER_ID]]
cluster_uuid: $[[CLUSTER_UUID]]
when:
cluster_available: ["$[[TASK_ID]]"]
#node logs
- auto_start: $[[NODE_LEVEL_TASKS_ENABLED]]
enabled: $[[NODE_LEVEL_TASKS_ENABLED]]
keep_running: true
name: collect_$[[TASK_ID]]_es_logs
retry_delay_in_ms: 10000
processor:
- es_logs_processor:
elasticsearch: $[[TASK_ID]]
labels:
cluster_id: $[[CLUSTER_ID]]
cluster_uuid: $[[CLUSTER_UUID]]
logs_path: $[[NODE_LOGS_PATH]]
queue_name: logs
when:
cluster_available: ["$[[TASK_ID]]"]

View File

@ -6,12 +6,6 @@ import (
"crypto/md5"
"encoding/hex"
"fmt"
"infini.sh/framework/core/kv"
"infini.sh/framework/core/model"
"infini.sh/framework/lib/fasthttp"
keystore2 "infini.sh/framework/lib/keystore"
"infini.sh/framework/modules/security"
"strings"
log "github.com/cihub/seelog"
"github.com/valyala/fasttemplate"
"golang.org/x/crypto/bcrypt"
@ -24,21 +18,28 @@ import (
"infini.sh/framework/core/errors"
"infini.sh/framework/core/global"
"infini.sh/framework/core/keystore"
"infini.sh/framework/core/kv"
"infini.sh/framework/core/model"
"infini.sh/framework/core/module"
"infini.sh/framework/core/orm"
"infini.sh/framework/core/pipeline"
"infini.sh/framework/core/util"
"infini.sh/framework/lib/fasthttp"
keystore2 "infini.sh/framework/lib/keystore"
elastic2 "infini.sh/framework/modules/elastic"
"infini.sh/framework/modules/elastic/adapter"
elastic3 "infini.sh/framework/modules/elastic/api"
elastic1 "infini.sh/framework/modules/elastic/common"
"infini.sh/framework/modules/security"
_ "infini.sh/framework/modules/security"
"infini.sh/framework/plugins/replay"
"io"
"io/ioutil"
"net/http"
uri2 "net/url"
"path"
"runtime"
"strings"
"time"
)
@ -116,10 +117,10 @@ type SetupRequest struct {
Password string `json:"password"`
} `json:"cluster"`
Skip bool `json:"skip"`
BootstrapUsername string `json:"bootstrap_username"`
BootstrapPassword string `json:"bootstrap_password"`
CredentialSecret string `json:"credential_secret"`
Skip bool `json:"skip"`
BootstrapUsername string `json:"bootstrap_username"`
BootstrapPassword string `json:"bootstrap_password"`
CredentialSecret string `json:"credential_secret"`
InitializeTemplate string `json:"initialize_template"`
}
@ -210,7 +211,7 @@ func (module *Module) validate(w http.ResponseWriter, r *http.Request, ps httpro
}
cfg1 = elastic1.ORMConfig{}
exist, err := env.ParseConfig("elastic.orm", &cfg1)
if exist && err != nil &&global.Env().SystemConfig.Configs.PanicOnConfigError{
if exist && err != nil && global.Env().SystemConfig.Configs.PanicOnConfigError {
panic(err)
}
@ -332,10 +333,10 @@ func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps http
return
}
var (
success = false
errType string
fixTips string
code = 200
success = false
errType string
fixTips string
code = 200
secretMismatch = false
)
defer func() {
@ -423,7 +424,7 @@ func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps http
secretMismatch = true
}
//不存在或者密钥不匹配时保存凭据密钥
if err == errSecretMismatch || !exists{
if err == errSecretMismatch || !exists {
h := md5.New()
rawSecret := []byte(request.CredentialSecret)
h.Write(rawSecret)
@ -463,7 +464,7 @@ func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps http
}
if reuseOldCred {
toSaveCfg.CredentialID = oldCfg.CredentialID
}else{
} else {
cred := credential.Credential{
Name: "INFINI_SYSTEM",
Type: credential.BasicAuth,
@ -493,8 +494,8 @@ func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps http
}
//保存默认集群
t:=time.Now()
toSaveCfg.Created=&t
t := time.Now()
toSaveCfg.Created = &t
err = orm.Save(nil, &toSaveCfg)
if err != nil {
panic(err)
@ -591,6 +592,7 @@ func (module *Module) validateSecret(w http.ResponseWriter, r *http.Request, ps
}
var errSecretMismatch = fmt.Errorf("invalid credential secret")
func validateCredentialSecret(ormHandler orm.ORM, credentialSecret string) (bool, error) {
rkey, err := keystore.GetValue(credential.SecretKey)
var exists bool
@ -607,7 +609,7 @@ func validateCredentialSecret(ormHandler orm.ORM, credentialSecret string) (bool
if bytes.Compare(rkey, secret) != 0 {
return exists, errSecretMismatch
}
}else {
} else {
exists = false
tempCred := credential.Credential{}
var result orm.Result
@ -630,6 +632,20 @@ func validateCredentialSecret(ormHandler orm.ORM, credentialSecret string) (bool
return exists, nil
}
func getYamlData(filename string) []byte {
baseDir := path.Join(global.Env().GetConfigDir(), "setup")
filePath := path.Join(baseDir, "common", "data", filename)
content, err := ioutil.ReadFile(filePath)
if err != nil {
log.Errorf("Error reading YAML file:", err)
return nil
}
// 转义换行符
escapedContent := bytes.ReplaceAll(content, []byte("\n"), []byte("\\n"))
// 转义双引号
escapedContent = bytes.ReplaceAll(escapedContent, []byte("\""), []byte("\\\""))
return escapedContent
}
func (module *Module) initializeTemplate(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
if !global.Env().SetupRequired() {
module.WriteError(w, "setup not permitted", 500)
@ -639,7 +655,7 @@ func (module *Module) initializeTemplate(w http.ResponseWriter, r *http.Request,
if v := recover(); v != nil {
module.WriteJSON(w, util.MapStr{
"success": false,
"log": fmt.Sprintf("%v", v),
"log": fmt.Sprintf("%v", v),
}, http.StatusOK)
}
}()
@ -650,13 +666,13 @@ func (module *Module) initializeTemplate(w http.ResponseWriter, r *http.Request,
}
ver := elastic.GetClient(GlobalSystemElasticsearchID).GetVersion()
if ver.Distribution == ""{
ver.Distribution=elastic.Elasticsearch
if ver.Distribution == "" {
ver.Distribution = elastic.Elasticsearch
}
baseDir := path.Join(global.Env().GetConfigDir(),"setup")
baseDir := path.Join(global.Env().GetConfigDir(), "setup")
var (
dslTplFileName = ""
useCommon = true
useCommon = true
)
switch request.InitializeTemplate {
case "template_ilm":
@ -676,7 +692,7 @@ func (module *Module) initializeTemplate(w http.ResponseWriter, r *http.Request,
}
if useCommon {
baseDir = path.Join(baseDir, "common")
}else{
} else {
baseDir = path.Join(baseDir, ver.Distribution)
}
@ -684,7 +700,7 @@ func (module *Module) initializeTemplate(w http.ResponseWriter, r *http.Request,
switch ver.Distribution {
case elastic.Elasticsearch:
majorVersion := elastic.GetClient(GlobalSystemElasticsearchID).GetMajorVersion()
if !useCommon{
if !useCommon {
if majorVersion == 6 {
baseDir = path.Join(baseDir, "v6")
} else if majorVersion <= 5 {
@ -701,9 +717,9 @@ func (module *Module) initializeTemplate(w http.ResponseWriter, r *http.Request,
break
}
dslTplFile := path.Join(baseDir ,dslTplFileName)
dslTplFile := path.Join(baseDir, dslTplFileName)
if !util.FileExists(dslTplFile) {
panic(errors.Errorf("template file %v for setup was missing",dslTplFile))
panic(errors.Errorf("template file %v for setup was missing", dslTplFile))
}
var dsl []byte
@ -720,12 +736,28 @@ func (module *Module) initializeTemplate(w http.ResponseWriter, r *http.Request,
if err != nil {
module.WriteJSON(w, util.MapStr{
"success": false,
"log": fmt.Sprintf("new fasttemplate [%s] error: ", err.Error()),
"log": fmt.Sprintf("new fasttemplate [%s] error: ", err.Error()),
}, http.StatusOK)
return
}
output := tpl.ExecuteFuncString(func(w io.Writer, tag string) (int, error) {
switch tag {
case "SETUP_SYSTEM_INGEST_CONFIG":
return w.Write(getYamlData("system_ingest_config.dat"))
case "SETUP_TASK_CONFIG_TPL":
return w.Write(getYamlData("task_config_tpl.dat"))
case "SETUP_AGENT_RELAY_GATEWAY_CONFIG":
return w.Write(getYamlData("agent_relay_gateway_config.dat"))
}
//ignore unresolved variable
return w.Write([]byte("$[[" + tag + "]]"))
})
tpl, err = fasttemplate.NewTemplate(output, "$[[", "]]")
output = tpl.ExecuteFuncString(func(w io.Writer, tag string) (int, error) {
switch tag {
case "SETUP_ENDPOINT":
return w.Write([]byte(request.Cluster.Endpoint))
case "SETUP_TEMPLATE_NAME":
return w.Write([]byte(cfg1.TemplateName))
case "SETUP_INDEX_PREFIX":
@ -742,7 +774,7 @@ func (module *Module) initializeTemplate(w http.ResponseWriter, r *http.Request,
return w.Write([]byte(docType))
}
//ignore unresolved variable
return w.Write([]byte("$[["+tag+"]]"))
return w.Write([]byte("$[[" + tag + "]]"))
})
br := bytes.NewReader([]byte(output))
scanner := bufio.NewScanner(br)
@ -771,13 +803,13 @@ func (module *Module) initializeTemplate(w http.ResponseWriter, r *http.Request,
if err != nil {
module.WriteJSON(w, util.MapStr{
"success": false,
"log": fmt.Sprintf("initalize template [%s] error: ", err.Error()),
"log": fmt.Sprintf("initalize template [%s] error: ", err.Error()),
}, http.StatusOK)
return
}
module.WriteJSON(w, util.MapStr{
"success": true,
"log": fmt.Sprintf("initalize template [%s] succeed", request.InitializeTemplate),
"log": fmt.Sprintf("initalize template [%s] succeed", request.InitializeTemplate),
}, http.StatusOK)
}