fix enroll api, should check agent credential before enroll

This commit is contained in:
medcl 2023-10-20 23:55:26 +08:00
parent 7b84271ab3
commit 0c4edd4541
3 changed files with 66 additions and 28 deletions

View File

@ -267,7 +267,7 @@ configs:
#for managed client's setting #for managed client's setting
managed: true # managed by remote servers managed: true # managed by remote servers
panic_on_config_error: false #ignore config error panic_on_config_error: false #ignore config error
interval: "1s" interval: "10s"
servers: # config servers servers: # config servers
- "http://localhost:9000" - "http://localhost:9000"
max_backup_files: 5 max_backup_files: 5

View File

@ -10,6 +10,7 @@ import (
log "github.com/cihub/seelog" log "github.com/cihub/seelog"
httprouter "infini.sh/framework/core/api/router" httprouter "infini.sh/framework/core/api/router"
"infini.sh/framework/core/elastic" "infini.sh/framework/core/elastic"
"infini.sh/framework/core/global"
"infini.sh/framework/core/model" "infini.sh/framework/core/model"
"infini.sh/framework/core/orm" "infini.sh/framework/core/orm"
"infini.sh/framework/core/util" "infini.sh/framework/core/util"
@ -109,7 +110,7 @@ func GetElasticsearchNodesViaAgent(ctx context.Context, instance *model.Instance
} }
obj := elastic.DiscoveryResult{} obj := elastic.DiscoveryResult{}
_,err := server.ProxyAgentRequest(instance.GetEndpoint(), req, &obj) _, err := server.ProxyAgentRequest(instance.GetEndpoint(), req, &obj)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -145,7 +146,7 @@ func GetElasticLogFiles(ctx context.Context, instance *model.Instance, logsPath
} }
resBody := map[string]interface{}{} resBody := map[string]interface{}{}
_,err := server.ProxyAgentRequest(instance.GetEndpoint(), req, &resBody) _, err := server.ProxyAgentRequest(instance.GetEndpoint(), req, &resBody)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -164,7 +165,7 @@ func GetElasticLogFileContent(ctx context.Context, instance *model.Instance, bod
Body: util.MustToJSONBytes(body), Body: util.MustToJSONBytes(body),
} }
resBody := map[string]interface{}{} resBody := map[string]interface{}{}
_,err := server.ProxyAgentRequest(instance.GetEndpoint(), req, &resBody) _, err := server.ProxyAgentRequest(instance.GetEndpoint(), req, &resBody)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -354,10 +355,10 @@ func (h *APIHandler) discoveryESNodesInfo(w http.ResponseWriter, req *http.Reque
ip = fmt.Sprintf("[%s]", v.IP) ip = fmt.Sprintf("[%s]", v.IP)
} }
nodeHost := fmt.Sprintf("%s:%d", ip, v.Port) nodeHost := fmt.Sprintf("%s:%d", ip, v.Port)
success, tryAgain, nodeInfo := h.getESNodeInfoViaProxy(nodeHost, "http", auth, instance) success, tryAgain, nodeInfo := h.getESNodeInfoViaProxy(nodeHost, "http", auth, &instance)
if !success && tryAgain { if !success && tryAgain {
//try https again //try https again
success, tryAgain, nodeInfo = h.getESNodeInfoViaProxy(nodeHost, "https", auth, instance) success, tryAgain, nodeInfo = h.getESNodeInfoViaProxy(nodeHost, "https", auth, &instance)
} }
if success { if success {
log.Error("connect to es node success:", nodeHost) log.Error("connect to es node success:", nodeHost)
@ -390,26 +391,32 @@ func (h *APIHandler) discoveryESNodesInfo(w http.ResponseWriter, req *http.Reque
h.WriteJSON(w, nodes, http.StatusOK) h.WriteJSON(w, nodes, http.StatusOK)
} }
func (h *APIHandler) getESNodeInfoViaProxy(host string, schema string, auth *model.BasicAuth, instance model.Instance) (success, tryAgain bool, info *elastic.LocalNodeInfo) { func (h *APIHandler) getESNodeInfoViaProxy(esHost string, esSchema string, auth *model.BasicAuth, instance *model.Instance) (success, tryAgain bool, info *elastic.LocalNodeInfo) {
esConfig := elastic.ElasticsearchConfig{Host: host, Schema: schema, BasicAuth: auth} esConfig := elastic.ElasticsearchConfig{Host: esHost, Schema: esSchema, BasicAuth: auth}
body := util.MustToJSONBytes(esConfig) return h.getESNodeInfoViaProxyWithConfig(&esConfig, auth, instance)
}
func (h *APIHandler) getESNodeInfoViaProxyWithConfig(cfg *elastic.ElasticsearchConfig, auth *model.BasicAuth, instance *model.Instance) (success, tryAgain bool, info *elastic.LocalNodeInfo) {
body := util.MustToJSONBytes(cfg)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel() defer cancel()
req := &util.Request{ req := &util.Request{
Method: http.MethodPost, Method: http.MethodPost,
Path: "/elasticsearch/node/_info", Path: "/elasticsearch/node/_info",
Context: ctx, Context: ctx,
Body: body, Body: body,
} }
if auth != nil { if auth != nil {
req.SetBasicAuth(auth.Username, auth.Password) req.SetBasicAuth(auth.Username, auth.Password)
} }
obj := elastic.LocalNodeInfo{} obj := elastic.LocalNodeInfo{}
res,err := server.ProxyAgentRequest(instance.GetEndpoint(), req, &obj) res, err := server.ProxyAgentRequest(instance.GetEndpoint(), req, &obj)
if err != nil { if err != nil {
panic(err) if global.Env().IsDebug {
log.Error(err)
}
return false, true, nil
} }
if res != nil && res.StatusCode == http.StatusForbidden { if res != nil && res.StatusCode == http.StatusForbidden {
@ -521,23 +528,48 @@ func (h *APIHandler) enrollESNode(w http.ResponseWriter, req *http.Request, ps h
//agent id //agent id
instID := ps.MustGetParameter("instance_id") instID := ps.MustGetParameter("instance_id")
exists, instance, err := server.GetRuntimeInstanceByID(instID)
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
if !exists {
h.WriteError(w, "instance not found", http.StatusInternalServerError)
}
//node id and cluster id //node id and cluster id
item := BindingItem{} item := BindingItem{}
err := h.DecodeJSON(req, &item) err = h.DecodeJSON(req, &item)
if err != nil { if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError) h.WriteError(w, err.Error(), http.StatusInternalServerError)
return return
} }
//update node's setting //check if the cluster's agent credential is valid
settings := NewNodeAgentSettings(instID, &item) meta := elastic.GetMetadata(item.ClusterID)
err = orm.Update(&orm.Context{ if meta == nil {
Refresh: "wait_for", h.WriteError(w, "cluster not found", http.StatusInternalServerError)
}, settings)
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return return
} }
h.WriteAckOKJSON(w) //use agent credential to access the node
meta.Config.BasicAuth, _ = common.GetAgentBasicAuth(meta.Config)
success, _, _ := h.getESNodeInfoViaProxyWithConfig(meta.Config, meta.Config.BasicAuth, instance)
if success {
//update node's setting
settings := NewNodeAgentSettings(instID, &item)
err = orm.Update(&orm.Context{
Refresh: "wait_for",
}, settings)
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
h.WriteAckOKJSON(w)
} else {
h.WriteError(w, "failed to access this node", http.StatusInternalServerError)
}
} }

View File

@ -112,8 +112,9 @@ func dynamicAgentConfigProvider(instance model.Instance) []*common.ConfigFile {
func getAgentIngestConfigs(items map[string]BindingItem) string { func getAgentIngestConfigs(items map[string]BindingItem) string {
buffer := bytes.NewBuffer([]byte("configs.template:\n ")) buffer := bytes.NewBuffer([]byte("configs.template: "))
var latestVersion int64
for _, v := range items { for _, v := range items {
if v.ClusterID == "" { if v.ClusterID == "" {
@ -144,7 +145,11 @@ func getAgentIngestConfigs(items map[string]BindingItem) string {
} }
} }
buffer.Write([]byte(fmt.Sprintf("- name: \"%v\"\n path: ./config/task_config.tpl\n "+ if v.Updated > latestVersion {
latestVersion = v.Updated
}
buffer.Write([]byte(fmt.Sprintf("\n - name: \"%v\"\n path: ./config/task_config.tpl\n "+
"variable:\n "+ "variable:\n "+
"CLUSTER_ID: %v\n "+ "CLUSTER_ID: %v\n "+
"CLUSTER_ENDPOINT: [\"%v\"]\n "+ "CLUSTER_ENDPOINT: [\"%v\"]\n "+
@ -152,12 +157,13 @@ func getAgentIngestConfigs(items map[string]BindingItem) string {
"CLUSTER_PASSWORD: \"%v\"\n "+ "CLUSTER_PASSWORD: \"%v\"\n "+
"CLUSTER_LEVEL_TASKS_ENABLED: %v\n "+ "CLUSTER_LEVEL_TASKS_ENABLED: %v\n "+
"NODE_LEVEL_TASKS_ENABLED: %v\n "+ "NODE_LEVEL_TASKS_ENABLED: %v\n "+
"NODE_LOGS_PATH: \"%v\"\n\n\n"+ "NODE_LOGS_PATH: \"%v\"\n\n\n", v.NodeUUID, v.ClusterID, clusterEndPoint, username, password, clusterLevelEnabled, nodeLevelEnabled, v.PathLogs)))
"#MANAGED_CONFIG_VERSION: %v\n"+
"#MANAGED: true",
v.NodeUUID, v.ClusterID, clusterEndPoint, username, password, clusterLevelEnabled, nodeLevelEnabled, v.PathLogs, v.Updated)))
} }
buffer.WriteString("\n")
buffer.WriteString(fmt.Sprintf("#MANAGED_CONFIG_VERSION: %v\n#MANAGED: true\n",latestVersion))
//password: $[[keystore.$[[CLUSTER_ID]]_password]] //password: $[[keystore.$[[CLUSTER_ID]]_password]]
return buffer.String() return buffer.String()