From 0c4edd45415d9ef85d24e5f0996883307fc850b9 Mon Sep 17 00:00:00 2001 From: medcl Date: Fri, 20 Oct 2023 23:55:26 +0800 Subject: [PATCH] fix enroll api, should check agent credential before enroll --- config/install_agent.tpl | 2 +- modules/agent/api/elasticsearch.go | 74 +++++++++++++++++++++--------- modules/agent/api/remote_config.go | 18 +++++--- 3 files changed, 66 insertions(+), 28 deletions(-) diff --git a/config/install_agent.tpl b/config/install_agent.tpl index 1b0881d5..610eaefe 100644 --- a/config/install_agent.tpl +++ b/config/install_agent.tpl @@ -267,7 +267,7 @@ configs: #for managed client's setting managed: true # managed by remote servers panic_on_config_error: false #ignore config error - interval: "1s" + interval: "10s" servers: # config servers - "http://localhost:9000" max_backup_files: 5 diff --git a/modules/agent/api/elasticsearch.go b/modules/agent/api/elasticsearch.go index d7ca9ffa..13bf827d 100644 --- a/modules/agent/api/elasticsearch.go +++ b/modules/agent/api/elasticsearch.go @@ -10,6 +10,7 @@ import ( log "github.com/cihub/seelog" httprouter "infini.sh/framework/core/api/router" "infini.sh/framework/core/elastic" + "infini.sh/framework/core/global" "infini.sh/framework/core/model" "infini.sh/framework/core/orm" "infini.sh/framework/core/util" @@ -109,7 +110,7 @@ func GetElasticsearchNodesViaAgent(ctx context.Context, instance *model.Instance } obj := elastic.DiscoveryResult{} - _,err := server.ProxyAgentRequest(instance.GetEndpoint(), req, &obj) + _, err := server.ProxyAgentRequest(instance.GetEndpoint(), req, &obj) if err != nil { return nil, err } @@ -145,7 +146,7 @@ func GetElasticLogFiles(ctx context.Context, instance *model.Instance, logsPath } resBody := map[string]interface{}{} - _,err := server.ProxyAgentRequest(instance.GetEndpoint(), req, &resBody) + _, err := server.ProxyAgentRequest(instance.GetEndpoint(), req, &resBody) if err != nil { return nil, err } @@ -164,7 +165,7 @@ func GetElasticLogFileContent(ctx context.Context, instance *model.Instance, bod Body: util.MustToJSONBytes(body), } resBody := map[string]interface{}{} - _,err := server.ProxyAgentRequest(instance.GetEndpoint(), req, &resBody) + _, err := server.ProxyAgentRequest(instance.GetEndpoint(), req, &resBody) if err != nil { return nil, err } @@ -354,10 +355,10 @@ func (h *APIHandler) discoveryESNodesInfo(w http.ResponseWriter, req *http.Reque ip = fmt.Sprintf("[%s]", v.IP) } nodeHost := fmt.Sprintf("%s:%d", ip, v.Port) - success, tryAgain, nodeInfo := h.getESNodeInfoViaProxy(nodeHost, "http", auth, instance) + success, tryAgain, nodeInfo := h.getESNodeInfoViaProxy(nodeHost, "http", auth, &instance) if !success && tryAgain { //try https again - success, tryAgain, nodeInfo = h.getESNodeInfoViaProxy(nodeHost, "https", auth, instance) + success, tryAgain, nodeInfo = h.getESNodeInfoViaProxy(nodeHost, "https", auth, &instance) } if success { log.Error("connect to es node success:", nodeHost) @@ -390,26 +391,32 @@ func (h *APIHandler) discoveryESNodesInfo(w http.ResponseWriter, req *http.Reque h.WriteJSON(w, nodes, http.StatusOK) } -func (h *APIHandler) getESNodeInfoViaProxy(host string, schema string, auth *model.BasicAuth, instance model.Instance) (success, tryAgain bool, info *elastic.LocalNodeInfo) { - esConfig := elastic.ElasticsearchConfig{Host: host, Schema: schema, BasicAuth: auth} - body := util.MustToJSONBytes(esConfig) +func (h *APIHandler) getESNodeInfoViaProxy(esHost string, esSchema string, auth *model.BasicAuth, instance *model.Instance) (success, tryAgain bool, info *elastic.LocalNodeInfo) { + esConfig := elastic.ElasticsearchConfig{Host: esHost, Schema: esSchema, BasicAuth: auth} + return h.getESNodeInfoViaProxyWithConfig(&esConfig, auth, instance) +} +func (h *APIHandler) getESNodeInfoViaProxyWithConfig(cfg *elastic.ElasticsearchConfig, auth *model.BasicAuth, instance *model.Instance) (success, tryAgain bool, info *elastic.LocalNodeInfo) { + body := util.MustToJSONBytes(cfg) ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() req := &util.Request{ Method: http.MethodPost, Path: "/elasticsearch/node/_info", Context: ctx, - Body: body, + Body: body, } if auth != nil { req.SetBasicAuth(auth.Username, auth.Password) } - obj := elastic.LocalNodeInfo{} - res,err := server.ProxyAgentRequest(instance.GetEndpoint(), req, &obj) + obj := elastic.LocalNodeInfo{} + res, err := server.ProxyAgentRequest(instance.GetEndpoint(), req, &obj) if err != nil { - panic(err) + if global.Env().IsDebug { + log.Error(err) + } + return false, true, nil } if res != nil && res.StatusCode == http.StatusForbidden { @@ -521,23 +528,48 @@ func (h *APIHandler) enrollESNode(w http.ResponseWriter, req *http.Request, ps h //agent id instID := ps.MustGetParameter("instance_id") + exists, instance, err := server.GetRuntimeInstanceByID(instID) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + if !exists { + h.WriteError(w, "instance not found", http.StatusInternalServerError) + } + //node id and cluster id item := BindingItem{} - err := h.DecodeJSON(req, &item) + err = h.DecodeJSON(req, &item) if err != nil { h.WriteError(w, err.Error(), http.StatusInternalServerError) return } - //update node's setting - settings := NewNodeAgentSettings(instID, &item) - err = orm.Update(&orm.Context{ - Refresh: "wait_for", - }, settings) - if err != nil { - h.WriteError(w, err.Error(), http.StatusInternalServerError) + //check if the cluster's agent credential is valid + meta := elastic.GetMetadata(item.ClusterID) + if meta == nil { + h.WriteError(w, "cluster not found", http.StatusInternalServerError) return } - h.WriteAckOKJSON(w) + //use agent credential to access the node + meta.Config.BasicAuth, _ = common.GetAgentBasicAuth(meta.Config) + + success, _, _ := h.getESNodeInfoViaProxyWithConfig(meta.Config, meta.Config.BasicAuth, instance) + + if success { + //update node's setting + settings := NewNodeAgentSettings(instID, &item) + err = orm.Update(&orm.Context{ + Refresh: "wait_for", + }, settings) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + h.WriteAckOKJSON(w) + } else { + h.WriteError(w, "failed to access this node", http.StatusInternalServerError) + } } diff --git a/modules/agent/api/remote_config.go b/modules/agent/api/remote_config.go index e30fd7c2..7660c2ae 100644 --- a/modules/agent/api/remote_config.go +++ b/modules/agent/api/remote_config.go @@ -112,8 +112,9 @@ func dynamicAgentConfigProvider(instance model.Instance) []*common.ConfigFile { func getAgentIngestConfigs(items map[string]BindingItem) string { - buffer := bytes.NewBuffer([]byte("configs.template:\n ")) + buffer := bytes.NewBuffer([]byte("configs.template: ")) + var latestVersion int64 for _, v := range items { if v.ClusterID == "" { @@ -144,7 +145,11 @@ func getAgentIngestConfigs(items map[string]BindingItem) string { } } - buffer.Write([]byte(fmt.Sprintf("- name: \"%v\"\n path: ./config/task_config.tpl\n "+ + if v.Updated > latestVersion { + latestVersion = v.Updated + } + + buffer.Write([]byte(fmt.Sprintf("\n - name: \"%v\"\n path: ./config/task_config.tpl\n "+ "variable:\n "+ "CLUSTER_ID: %v\n "+ "CLUSTER_ENDPOINT: [\"%v\"]\n "+ @@ -152,12 +157,13 @@ func getAgentIngestConfigs(items map[string]BindingItem) string { "CLUSTER_PASSWORD: \"%v\"\n "+ "CLUSTER_LEVEL_TASKS_ENABLED: %v\n "+ "NODE_LEVEL_TASKS_ENABLED: %v\n "+ - "NODE_LOGS_PATH: \"%v\"\n\n\n"+ - "#MANAGED_CONFIG_VERSION: %v\n"+ - "#MANAGED: true", - v.NodeUUID, v.ClusterID, clusterEndPoint, username, password, clusterLevelEnabled, nodeLevelEnabled, v.PathLogs, v.Updated))) + "NODE_LOGS_PATH: \"%v\"\n\n\n", v.NodeUUID, v.ClusterID, clusterEndPoint, username, password, clusterLevelEnabled, nodeLevelEnabled, v.PathLogs))) } + + buffer.WriteString("\n") + buffer.WriteString(fmt.Sprintf("#MANAGED_CONFIG_VERSION: %v\n#MANAGED: true\n",latestVersion)) + //password: $[[keystore.$[[CLUSTER_ID]]_password]] return buffer.String()