refactoring enroll/revoke nodes

This commit is contained in:
medcl 2023-10-17 11:26:53 +08:00
parent a315c4330d
commit e3c10817ef
4 changed files with 186 additions and 138 deletions

View File

@ -17,35 +17,12 @@ import (
"time" "time"
) )
//func (h *APIHandler) refreshESNodesInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { func refreshNodesInfo(inst *model.Instance) (*elastic.DiscoveryResult, error) {
// id := ps.MustGetParameter("instance_id") oldNodesInfo, err := getEnrolledNodesByAgent(inst)
// obj := model.Instance{}
// obj.ID = id
// exists, err := orm.Get(&obj)
// if !exists || err != nil {
// h.WriteJSON(w, util.MapStr{
// "_id": id,
// "found": false,
// }, http.StatusNotFound)
// return
// }
// _, err = refreshNodesInfo(&obj)
// if err != nil {
// log.Error(err)
// h.WriteError(w, err.Error(), http.StatusInternalServerError)
// return
// }
// h.WriteAckOKJSON(w)
//}
func refreshNodesInfo(inst *model.Instance) ([]*model.ESNodeInfo, error) {
oldNodesInfo, err := getNodesBindingToAgent(inst)
if err != nil { if err != nil {
return nil, fmt.Errorf("error on get binding nodes info: %w", err) return nil, fmt.Errorf("error on get binding nodes info: %w", err)
} }
log.Error("oldNodesInfo:",util.MustToJSON(oldNodesInfo))
ctxTimeout, cancel := context.WithTimeout(context.Background(), time.Second*10) ctxTimeout, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel() defer cancel()
nodesInfo, err := GetElasticsearchNodesViaAgent(ctxTimeout, inst) nodesInfo, err := GetElasticsearchNodesViaAgent(ctxTimeout, inst)
@ -54,32 +31,65 @@ func refreshNodesInfo(inst *model.Instance) ([]*model.ESNodeInfo, error) {
return nil, fmt.Errorf("error on get nodes info from agent: %w", err) return nil, fmt.Errorf("error on get nodes info from agent: %w", err)
} }
log.Error("nodesInfo:",util.MustToJSON(nodesInfo)) for nodeID, node := range nodesInfo.Nodes {
v, ok := oldNodesInfo[nodeID]
if ok {
node.ClusterID = v.ClusterID
node.Enrolled = true
}
}
for _, node := range nodesInfo { ////not recognized by agent, need auth?
v,ok:=oldNodesInfo[node.NodeUUID] //for _, node := range nodesInfo.UnknownProcess{
if ok{ // for _, v := range node.ListenAddresses {
node.ClusterID=v.ClusterID // //ask user to manual enroll this node
} // //check local credentials, if it works, get node info
} // }
//}
// {
// //node was not recognized by agent, need auth?
// if node.HttpPort != "" {
// for _, v := range oldNodesInfo {
// if v.PublishAddress != "" {
// if util.UnifyLocalAddress(v.PublishAddress) == util.UnifyLocalAddress(node.PublishAddress) {
// node.ClusterID = v.ClusterID
// node.ClusterName = v.ClusterName
// node.NodeUUID = v.NodeUUID
// node.ClusterUuid = v.ClusterUUID
// node.NodeName = v.NodeName
// node.Path.Home = v.PathHome
// node.Path.Logs = v.PathLogs
// node.AgentID = inst.ID
// //TODO verify node info if the node id really match, need to fetch the credentials for agent
// //or let manager sync configs to this agent, verify the node info after receiving the configs
// //report any error along with this agent and node info
// break
// }
// }
// }
// }
//
//}
return nodesInfo, nil return nodesInfo, nil
} }
//get nodes info via agent //get nodes info via agent
func GetElasticsearchNodesViaAgent(ctx context.Context, instance *model.Instance) ([]*model.ESNodeInfo, error) { func GetElasticsearchNodesViaAgent(ctx context.Context, instance *model.Instance) (*elastic.DiscoveryResult, error) {
req := &util.Request{ req := &util.Request{
Method: http.MethodGet, Method: http.MethodGet,
Path: "/elasticsearch/nodes/_discovery", Path: "/elasticsearch/nodes/_discovery",
Context: ctx, Context: ctx,
} }
resBody := []*model.ESNodeInfo{}
err := doRequest(instance, req, &resBody) obj := elastic.DiscoveryResult{}
err := doRequest(instance, req, &obj)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return resBody, nil return &obj, nil
} }
func AuthESNode(ctx context.Context, agentBaseURL string, cfg elastic.ElasticsearchConfig) (*model.ESNodeInfo, error) { func AuthESNode(ctx context.Context, agentBaseURL string, cfg elastic.ElasticsearchConfig) (*model.ESNodeInfo, error) {
@ -114,13 +124,20 @@ func getNodeByPidOrUUID(nodes map[int]*model.ESNodeInfo, pid int, uuid string, p
} }
type BindingItem struct { type BindingItem struct {
ClusterID string `json:"cluster_id"` ClusterName string `json:"cluster_name"`
ClusterUUID string `json:"cluster_uuid"` ClusterUUID string `json:"cluster_uuid"`
NodeUUID string `json:"node_uuid"` NodeUUID string `json:"node_uuid"`
PublishAddress string `json:"publish_address"`
NodeName string `json:"node_name"`
PathLogs string `json:"path_logs"`
PathHome string `json:"path_home"`
//infini system assigned id
ClusterID string `json:"cluster_id"`
} }
//node -> binding item //node -> binding item
func getNodesBindingToAgent(instance *model.Instance) (map[string]BindingItem, error) { func getEnrolledNodesByAgent(instance *model.Instance) (map[string]BindingItem, error) {
//get nodes settings where agent id = instance id //get nodes settings where agent id = instance id
q := orm.Query{ q := orm.Query{
@ -141,19 +158,20 @@ func getNodesBindingToAgent(instance *model.Instance) (map[string]BindingItem, e
for _, row := range result.Result { for _, row := range result.Result {
v, ok := row.(map[string]interface{}) v, ok := row.(map[string]interface{})
if ok { if ok {
x, ok := v["metadata"] x, ok := v["payload"]
if ok { if ok {
y, ok := x.(map[string]interface{}) f, ok := x.(map[string]interface{})
if ok {
e, ok := y["labels"]
if ok {
f, ok := e.(map[string]interface{})
if ok { if ok {
nodeID, ok := f["node_uuid"].(string) nodeID, ok := f["node_uuid"].(string)
if ok { if ok {
item := BindingItem{} item := BindingItem{}
item.ClusterID = f["cluster_id"].(string) item.ClusterID = util.ToString(f["cluster_id"])
item.ClusterUUID = f["cluster_uuid"].(string) item.ClusterName = util.ToString(f["cluster_name"])
item.ClusterUUID = util.ToString(f["cluster_uuid"])
item.PublishAddress = util.ToString(f["publish_address"])
item.NodeName = util.ToString(f["node_name"])
item.PathHome = util.ToString(f["path_home"])
item.PathLogs = util.ToString(f["path_logs"])
item.NodeUUID = nodeID item.NodeUUID = nodeID
ids[item.NodeUUID] = item ids[item.NodeUUID] = item
} }
@ -161,8 +179,6 @@ func getNodesBindingToAgent(instance *model.Instance) (map[string]BindingItem, e
} }
} }
} }
}
}
return ids, nil return ids, nil
} }
@ -248,7 +264,7 @@ func GetElasticLogFileContent(ctx context.Context, instance *model.Instance, bod
func (h *APIHandler) getLogFilesByNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { func (h *APIHandler) getLogFilesByNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
nodeID := ps.MustGetParameter("node_id") nodeID := ps.MustGetParameter("node_id")
inst, node, err := getAgentByNodeID(nodeID) inst, pathLogs, err := getAgentByNodeID(nodeID)
if err != nil { if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError) h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err) log.Error(err)
@ -262,7 +278,7 @@ func (h *APIHandler) getLogFilesByNode(w http.ResponseWriter, req *http.Request,
}, http.StatusOK) }, http.StatusOK)
return return
} }
logFiles, err := GetElasticLogFiles(nil, inst, node.Path.Logs) logFiles, err := GetElasticLogFiles(nil, inst, pathLogs)
if err != nil { if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError) h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err) log.Error(err)
@ -276,7 +292,7 @@ func (h *APIHandler) getLogFilesByNode(w http.ResponseWriter, req *http.Request,
func (h *APIHandler) getLogFileContent(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { func (h *APIHandler) getLogFileContent(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
nodeID := ps.MustGetParameter("node_id") nodeID := ps.MustGetParameter("node_id")
inst, node, err := getAgentByNodeID(nodeID) inst, pathLogs, err := getAgentByNodeID(nodeID)
if err != nil { if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError) h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err) log.Error(err)
@ -299,7 +315,7 @@ func (h *APIHandler) getLogFileContent(w http.ResponseWriter, req *http.Request,
log.Error(err) log.Error(err)
return return
} }
reqBody.LogsPath = node.Path.Logs reqBody.LogsPath = pathLogs
res, err := GetElasticLogFileContent(nil, inst, reqBody) res, err := GetElasticLogFileContent(nil, inst, reqBody)
if err != nil { if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError) h.WriteError(w, err.Error(), http.StatusInternalServerError)
@ -309,45 +325,57 @@ func (h *APIHandler) getLogFileContent(w http.ResponseWriter, req *http.Request,
h.WriteJSON(w, res, http.StatusOK) h.WriteJSON(w, res, http.StatusOK)
} }
func getAgentByNodeID(nodeID string) (*model.Instance, *model.ESNodeInfo, error) { //instance, pathLogs
queryDsl := util.MapStr{ func getAgentByNodeID(nodeID string) (*model.Instance, string, error) {
"size": 1,
"query": util.MapStr{ q := orm.Query{
"term": util.MapStr{ Size: 1000,
"node_uuid": util.MapStr{ Conds: orm.And(orm.Eq("metadata.category", "node_settings"),
"value": nodeID, orm.Eq("metadata.name", "agent"),
}, orm.Eq("payload.node_uuid", nodeID),
}, ),
},
"sort": []util.MapStr{
{
"timestamp": util.MapStr{
"order": "desc",
},
},
},
} }
q := &orm.Query{
RawQuery: util.MustToJSONBytes(queryDsl), err, result := orm.Search(model.Setting{}, &q)
}
err, result := orm.Search(model.ESNodeInfo{}, q)
if err != nil { if err != nil {
return nil, nil, err return nil, "", err
} }
if len(result.Result) > 0 {
buf := util.MustToJSONBytes(result.Result[0]) for _, row := range result.Result {
v := &model.ESNodeInfo{} v, ok := row.(map[string]interface{})
err = util.FromJSONBytes(buf, v) if ok {
pathLogs := ""
payload, ok := v["payload"]
if ok {
payloadMap, ok := payload.(map[string]interface{})
if ok {
pathLogs = util.ToString(payloadMap["path_logs"])
}
}
x, ok := v["metadata"]
if ok {
f, ok := x.(map[string]interface{})
if ok {
labels, ok := f["labels"].(map[string]interface{})
if ok {
id, ok := labels["agent_id"]
if ok {
inst := &model.Instance{} inst := &model.Instance{}
inst.ID = v.AgentID inst.ID = util.ToString(id)
_, err = orm.Get(inst) _, err = orm.Get(inst)
if err != nil { if err != nil {
return nil, v, err return nil, pathLogs, err
} }
if inst.Name == "" { if inst.Name == "" {
return nil, v, nil return nil, pathLogs, nil
} }
return inst, v, nil return inst, pathLogs, nil
} }
return nil, nil, nil }
}
}
}
}
return nil, "", nil
} }

View File

@ -5,7 +5,6 @@
package api package api
import ( import (
"context"
"fmt" "fmt"
log "github.com/cihub/seelog" log "github.com/cihub/seelog"
httprouter "infini.sh/framework/core/api/router" httprouter "infini.sh/framework/core/api/router"
@ -197,26 +196,26 @@ func (h *APIHandler) GetHostElasticProcess(w http.ResponseWriter, req *http.Requ
return return
} }
esNodesInfo, err := GetElasticsearchNodesViaAgent(context.Background(), &obj) //esNodesInfo, err := GetElasticsearchNodesViaAgent(context.Background(), &obj)
if err != nil { //if err != nil {
log.Error(err) // log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError) // h.WriteError(w, err.Error(), http.StatusInternalServerError)
return // return
} //}
var processes []util.MapStr //var processes []util.MapStr
for _, node := range esNodesInfo { //for _, node := range esNodesInfo {
processes = append(processes, util.MapStr{ // processes = append(processes, util.MapStr{
"pid": node.ProcessInfo.PID, // "pid": node.ProcessInfo.PID,
"pid_status": node.ProcessInfo.Status, // "pid_status": node.ProcessInfo.Status,
"cluster_name": node.ClusterName, // "cluster_name": node.ClusterName,
"cluster_uuid": node.ClusterUuid, // "cluster_uuid": node.ClusterUuid,
"cluster_id": node.ClusterID, // "cluster_id": node.ClusterID,
"node_id": node.NodeUUID, // "node_id": node.NodeUUID,
"node_name": node.NodeName, // "node_name": node.NodeName,
"uptime_in_ms": time.Now().UnixMilli() - node.ProcessInfo.CreateTime, // "uptime_in_ms": time.Now().UnixMilli() - node.ProcessInfo.CreateTime,
}) // })
} //}
h.WriteJSON(w, util.MapStr{ h.WriteJSON(w, util.MapStr{
"elastic_processes": processes, //"elastic_processes": processes,
}, http.StatusOK) }, http.StatusOK)
} }

View File

@ -16,7 +16,10 @@ func Init() {
api.HandleAPIMethod(api.GET, "/host/:host_id/processes", handler.GetHostElasticProcess) api.HandleAPIMethod(api.GET, "/host/:host_id/processes", handler.GetHostElasticProcess)
api.HandleAPIMethod(api.DELETE, "/host/:host_id", handler.deleteHost) api.HandleAPIMethod(api.DELETE, "/host/:host_id", handler.deleteHost)
api.HandleAPIMethod(api.POST, "/instance/:instance_id/node/_associate", handler.RequirePermission(handler.associateESNode, enum.PermissionAgentInstanceWrite)) //bind agent with nodes
api.HandleAPIMethod(api.POST, "/instance/:instance_id/node/_enroll", handler.RequirePermission(handler.enrollESNode, enum.PermissionAgentInstanceWrite))
api.HandleAPIMethod(api.POST, "/instance/:instance_id/node/_revoke", handler.RequirePermission(handler.revokeESNode, enum.PermissionAgentInstanceWrite))
api.HandleAPIMethod(api.POST, "/auto_associate", handler.RequirePermission(handler.autoAssociateESNode, enum.PermissionAgentInstanceWrite)) api.HandleAPIMethod(api.POST, "/auto_associate", handler.RequirePermission(handler.autoAssociateESNode, enum.PermissionAgentInstanceWrite))
//api.HandleAPIMethod(api.POST, "/instance/:instance_id/node/_auth", handler.RequirePermission(handler.authESNode, enum.PermissionAgentInstanceWrite)) //api.HandleAPIMethod(api.POST, "/instance/:instance_id/node/_auth", handler.RequirePermission(handler.authESNode, enum.PermissionAgentInstanceWrite))

View File

@ -183,7 +183,7 @@ func (h *APIHandler) authESNode(w http.ResponseWriter, req *http.Request, ps htt
h.WriteJSON(w, fmt.Sprintf("node [%s] of agent [%s] was not found", oldNodeInfo.ID, inst.Name), http.StatusInternalServerError) h.WriteJSON(w, fmt.Sprintf("node [%s] of agent [%s] was not found", oldNodeInfo.ID, inst.Name), http.StatusInternalServerError)
return return
} }
}else{ } else {
//find out the node id with credentials //find out the node id with credentials
cfg := reqBody.ESConfig cfg := reqBody.ESConfig
if cfg.Endpoint == "" { if cfg.Endpoint == "" {
@ -222,10 +222,9 @@ func (h *APIHandler) authESNode(w http.ResponseWriter, req *http.Request, ps htt
// return // return
//} //}
reqBody.NodeID=nodeInfo.NodeUUID reqBody.NodeID = nodeInfo.NodeUUID
} }
//nodeInfo:=elastic.NodeConfig{} //nodeInfo:=elastic.NodeConfig{}
//nodeInfo.ID = reqBody.NodeID //nodeInfo.ID = reqBody.NodeID
//nodeInfo.AgentID = inst.ID //nodeInfo.AgentID = inst.ID
@ -253,7 +252,7 @@ func NewClusterSettings(clusterID string) *model.Setting {
return &settings return &settings
} }
func NewNodeAgentSettings(clusterID, clusterUUID, nodeUUID, agentID, agentCredential string) *model.Setting { func NewNodeAgentSettings(instanceID string, item *BindingItem) *model.Setting {
settings := model.Setting{ settings := model.Setting{
Metadata: model.SettingsMetadata{ Metadata: model.SettingsMetadata{
@ -261,14 +260,21 @@ func NewNodeAgentSettings(clusterID, clusterUUID, nodeUUID, agentID, agentCreden
Name: "agent", Name: "agent",
}, },
} }
settings.ID = fmt.Sprintf("%v_%v_%v", settings.Metadata.Category, settings.Metadata.Name, nodeUUID) settings.ID = fmt.Sprintf("%v_%v_%v", settings.Metadata.Category, settings.Metadata.Name, item.NodeUUID)
settings.Metadata.Labels = util.MapStr{ settings.Metadata.Labels = util.MapStr{
"cluster_id": clusterID, "agent_id": instanceID,
"cluster_uuid": clusterUUID, }
"node_uuid": nodeUUID,
"agent_id": agentID, settings.Payload = util.MapStr{
"agent_credential": agentCredential, "cluster_id": item.ClusterID,
"cluster_name": item.ClusterName,
"cluster_uuid": item.ClusterUUID,
"node_uuid": item.NodeUUID,
"publish_address": item.PublishAddress,
"node_name": item.NodeName,
"path_home": item.PathHome,
"path_logs": item.PathLogs,
} }
return &settings return &settings
@ -298,33 +304,45 @@ const Cluster = "cluster_settings"
const Node = "node_settings" const Node = "node_settings"
const Index = "index_settings" const Index = "index_settings"
func (h *APIHandler) associateESNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { func (h *APIHandler) revokeESNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
//agent id
instID := ps.MustGetParameter("instance_id")
item := BindingItem{}
err := h.DecodeJSON(req, &item)
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
settings := NewNodeAgentSettings(instID, &item)
err = orm.Delete(&orm.Context{
Refresh: "wait_for",
}, settings)
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
}
func (h *APIHandler) enrollESNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
//agent id //agent id
instID := ps.MustGetParameter("instance_id") instID := ps.MustGetParameter("instance_id")
//node id and cluster id //node id and cluster id
reqBody := struct { item := BindingItem{}
ClusterUUID string `json:"cluster_uuid"` err := h.DecodeJSON(req, &item)
NodeUUID string `json:"node_uuid"`
//infini system assigned id
ClusterID string `json:"cluster_id"`
}{}
err := h.DecodeJSON(req, &reqBody)
if err != nil { if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError) h.WriteError(w, err.Error(), http.StatusInternalServerError)
return return
} }
//update node's setting //update node's setting
settings := NewNodeAgentSettings(reqBody.ClusterID, reqBody.ClusterUUID, reqBody.NodeUUID, instID, "node.AgentCredential") settings := NewNodeAgentSettings(instID, &item)
err = orm.Update(&orm.Context{ err = orm.Update(&orm.Context{
Refresh: "wait_for", Refresh: "wait_for",
}, settings) }, settings)
if err != nil { if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError) h.WriteError(w, err.Error(), http.StatusInternalServerError)
return return
} }