support auto enroll

This commit is contained in:
medcl 2023-12-15 17:47:54 +08:00
parent 5aae342bbe
commit c4c0c8fdb3
3 changed files with 175 additions and 91 deletions

View File

@ -6,6 +6,7 @@ package api
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
log "github.com/cihub/seelog" log "github.com/cihub/seelog"
httprouter "infini.sh/framework/core/api/router" httprouter "infini.sh/framework/core/api/router"
@ -19,18 +20,20 @@ import (
"infini.sh/framework/modules/elastic/metadata" "infini.sh/framework/modules/elastic/metadata"
"infini.sh/framework/plugins/managed/server" "infini.sh/framework/plugins/managed/server"
"net/http" "net/http"
"runtime"
"sync/atomic"
"time" "time"
) )
//node -> binding item //node -> binding item
func GetEnrolledNodesByAgent(instance *model.Instance) (map[string]BindingItem, error) { func GetEnrolledNodesByAgent(instanceID string) (map[string]BindingItem, error) {
//get nodes settings where agent id = instance id //get nodes settings where agent id = instance id
q := orm.Query{ q := orm.Query{
Size: 1000, Size: 1000,
Conds: orm.And(orm.Eq("metadata.category", "node_settings"), Conds: orm.And(orm.Eq("metadata.category", "node_settings"),
orm.Eq("metadata.name", "agent"), orm.Eq("metadata.name", "agent"),
orm.Eq("metadata.labels.agent_id", instance.ID), orm.Eq("metadata.labels.agent_id", instanceID),
), ),
} }
@ -74,15 +77,15 @@ func GetEnrolledNodesByAgent(instance *model.Instance) (map[string]BindingItem,
return ids, nil return ids, nil
} }
func refreshNodesInfo(inst *model.Instance) (*elastic.DiscoveryResult, error) { func refreshNodesInfo(instanceID, instanceEndpoint string) (*elastic.DiscoveryResult, error) {
enrolledNodesByAgent, err := GetEnrolledNodesByAgent(inst) enrolledNodesByAgent, err := GetEnrolledNodesByAgent(instanceID)
if err != nil { if err != nil {
return nil, fmt.Errorf("error on get binding nodes info: %w", err) return nil, fmt.Errorf("error on get binding nodes info: %w", err)
} }
ctxTimeout, cancel := context.WithTimeout(context.Background(), time.Second*10) ctxTimeout, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel() defer cancel()
nodesInfo, err := GetElasticsearchNodesViaAgent(ctxTimeout, inst) nodesInfo, err := GetElasticsearchNodesViaAgent(ctxTimeout, instanceEndpoint)
if err != nil { if err != nil {
//TODO return already biding nodes info ?? //TODO return already biding nodes info ??
return nil, fmt.Errorf("error on get nodes info from agent: %w", err) return nil, fmt.Errorf("error on get nodes info from agent: %w", err)
@ -175,7 +178,7 @@ func refreshNodesInfo(inst *model.Instance) (*elastic.DiscoveryResult, error) {
} }
//get nodes info via agent //get nodes info via agent
func GetElasticsearchNodesViaAgent(ctx context.Context, instance *model.Instance) (*elastic.DiscoveryResult, error) { func GetElasticsearchNodesViaAgent(ctx context.Context, endpoint string) (*elastic.DiscoveryResult, error) {
req := &util.Request{ req := &util.Request{
Method: http.MethodGet, Method: http.MethodGet,
Path: "/elasticsearch/node/_discovery", Path: "/elasticsearch/node/_discovery",
@ -183,7 +186,7 @@ func GetElasticsearchNodesViaAgent(ctx context.Context, instance *model.Instance
} }
obj := elastic.DiscoveryResult{} obj := elastic.DiscoveryResult{}
_, err := server.ProxyAgentRequest(instance.GetEndpoint(), req, &obj) _, err := server.ProxyAgentRequest(endpoint, req, &obj)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -193,7 +196,7 @@ func GetElasticsearchNodesViaAgent(ctx context.Context, instance *model.Instance
type BindingItem struct { type BindingItem struct {
//infini system assigned id //infini system assigned id
ClusterID string `json:"cluster_id"` ClusterID string `json:"cluster_id"`
ClusterUUID string `json:"cluster_uuid"` ClusterUUID string `json:"cluster_uuid"`
NodeUUID string `json:"node_uuid"` NodeUUID string `json:"node_uuid"`
@ -333,8 +336,8 @@ func getAgentByNodeID(clusterID, nodeID string) (*model.Instance, string, error)
return nil, "", err return nil, "", err
} }
nodeInfo,err:=metadata.GetNodeConfig(clusterID, nodeID) nodeInfo, err := metadata.GetNodeConfig(clusterID, nodeID)
if err!=nil||nodeInfo==nil{ if err != nil || nodeInfo == nil {
log.Error("node info is nil") log.Error("node info is nil")
return nil, "", err return nil, "", err
} }
@ -376,17 +379,94 @@ type ClusterInfo struct {
ClusterIDs []string `json:"cluster_id"` ClusterIDs []string `json:"cluster_id"`
} }
var autoEnrollRunning=atomic.Bool{}
func (h *APIHandler) autoEnrollESNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { func (h *APIHandler) autoEnrollESNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
//{"cluster_id":["infini_default_system_cluster"]} //{"cluster_id":["infini_default_system_cluster"]}
clusterInfo := ClusterInfo{}
if req.Method == "POST" {
bytes, err := h.GetRawBody(req)
if err != nil {
panic(err)
}
if len(bytes) > 0 {
util.FromJSONBytes(bytes, &clusterInfo)
}
}
if len(clusterInfo.ClusterIDs) <= 0 {
panic(errors.New("please select cluster to enroll"))
}
if autoEnrollRunning.Load(){
return
}
autoEnrollRunning.Swap(true)
go func(clusterInfo ClusterInfo) {
defer func() {
autoEnrollRunning.Swap(false)
if r := recover(); r != nil {
var v string
switch r.(type) {
case error:
v = r.(error).Error()
case runtime.Error:
v = r.(runtime.Error).Error()
case string:
v = r.(string)
}
if v != "" {
log.Error(v)
}
}
log.Debug("finish auto enroll")
}()
log.Debug("start auto enroll")
//get instances
q := &orm.Query{Conds: orm.And(orm.Eq("application.name", "agent"))}
q.From = 0
q.Size = 50000
err, res := orm.Search(&model.Instance{}, q)
if err != nil {
log.Error(err)
return
}
for _, v := range res.Result {
f, ok := v.(map[string]interface{})
if ok {
instanceIDObj, ok1 := f["id"]
instanceEndpointObj, ok2 := f["endpoint"]
if ok1 && ok2 {
instanceID, ok1 := instanceIDObj.(string)
instanceEndpoint, ok2 := instanceEndpointObj.(string)
if ok1 && ok2 {
nodes, err := refreshNodesInfo(instanceID, instanceEndpoint)
if err != nil {
log.Error(err)
continue
}
log.Debugf("instance:%v,%v, has: %v nodes, %v unknown nodes", instanceID, instanceEndpoint, len(nodes.Nodes), len(nodes.UnknownProcess))
if len(nodes.UnknownProcess) > 0 {
pids:=h.bindInstanceToCluster(clusterInfo, nodes, instanceID, instanceEndpoint)
log.Infof("instance:%v,%v, success enroll %v nodes",instanceID, instanceEndpoint,len(pids))
}
}
}
}
}
}(clusterInfo)
//get instances
//get all unknown nodes //get all unknown nodes
//check each process with cluster id //check each process with cluster id
//send this to background task //send this to background task
h.WriteAckOKJSON(w) h.WriteAckOKJSON(w)
} }
@ -404,14 +484,13 @@ func (h *APIHandler) discoveryESNodesInfo(w http.ResponseWriter, req *http.Reque
return return
} }
nodes, err := refreshNodesInfo(&instance) nodes, err := refreshNodesInfo(instance.ID, instance.GetEndpoint())
if err != nil { if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError) h.WriteError(w, err.Error(), http.StatusInternalServerError)
return return
} }
if len(nodes.UnknownProcess) > 0 { if len(nodes.UnknownProcess) > 0 {
var discoveredPIDs map[int]*elastic.LocalNodeInfo = make(map[int]*elastic.LocalNodeInfo) var discoveredPIDs map[int]*elastic.LocalNodeInfo = make(map[int]*elastic.LocalNodeInfo)
if req.Method == "POST" { if req.Method == "POST" {
bytes, err := h.GetRawBody(req) bytes, err := h.GetRawBody(req)
@ -422,75 +501,7 @@ func (h *APIHandler) discoveryESNodesInfo(w http.ResponseWriter, req *http.Reque
if len(bytes) > 0 { if len(bytes) > 0 {
clusterInfo := ClusterInfo{} clusterInfo := ClusterInfo{}
util.FromJSONBytes(bytes, &clusterInfo) util.FromJSONBytes(bytes, &clusterInfo)
if len(clusterInfo.ClusterIDs) > 0 { discoveredPIDs = h.bindInstanceToCluster(clusterInfo, nodes, instance.ID, instance.GetEndpoint())
//try connect this node to cluster by using this cluster's agent credential
for _, clusterID := range clusterInfo.ClusterIDs {
meta := elastic.GetMetadata(clusterID)
if meta != nil {
states,err:=elastic.GetClient(clusterID).GetClusterState()
if err!=nil||states==nil{
log.Error(err)
continue
}
clusterUUID:=states.ClusterUUID
if meta.Config.AgentCredentialID != "" {
auth, err := common.GetAgentBasicAuth(meta.Config)
if err != nil {
panic(err)
}
if auth != nil {
//try connect
for _, node := range nodes.UnknownProcess {
for _, v := range node.ListenAddresses {
ip := v.IP
if util.ContainStr(v.IP, "::") {
ip = fmt.Sprintf("[%s]", v.IP)
}
nodeHost := fmt.Sprintf("%s:%d", ip, v.Port)
success, tryAgain, nodeInfo := h.getESNodeInfoViaProxy(nodeHost, "http", auth, &instance)
if !success && tryAgain {
//try https again
success, tryAgain, nodeInfo = h.getESNodeInfoViaProxy(nodeHost, "https", auth, &instance)
}
if success {
log.Debug("connect to es node success:", nodeHost, ", pid: ", node.PID)
discoveredPIDs[node.PID] = nodeInfo
if nodeInfo.ClusterInfo.ClusterUUID!=clusterUUID{
log.Error("cluster uuid not match, cluster id: ", clusterID, ", cluster uuid: ", clusterUUID, ", node cluster uuid: ", nodeInfo.ClusterInfo.ClusterUUID)
continue
}
//enroll this node
item := BindingItem{
ClusterID: clusterID,
ClusterUUID: nodeInfo.ClusterInfo.ClusterUUID,
NodeUUID: nodeInfo.NodeUUID,
}
settings := NewNodeAgentSettings(instance.ID, &item)
err = orm.Update(&orm.Context{
Refresh: "wait_for",
}, settings)
if err == nil {
nodeInfo.ClusterID = clusterID
nodeInfo.Enrolled = true
}
break
}
}
}
}
}
}
}
}
} }
} }
@ -510,12 +521,85 @@ func (h *APIHandler) discoveryESNodesInfo(w http.ResponseWriter, req *http.Reque
h.WriteJSON(w, nodes, http.StatusOK) h.WriteJSON(w, nodes, http.StatusOK)
} }
func (h *APIHandler) getESNodeInfoViaProxy(esHost string, esSchema string, auth *model.BasicAuth, instance *model.Instance) (success, tryAgain bool, info *elastic.LocalNodeInfo) { func (h *APIHandler) bindInstanceToCluster(clusterInfo ClusterInfo, nodes *elastic.DiscoveryResult, instanceID, instanceEndpoint string) map[int]*elastic.LocalNodeInfo {
esConfig := elastic.ElasticsearchConfig{Host: esHost, Schema: esSchema, BasicAuth: auth} discoveredPIDs := map[int]*elastic.LocalNodeInfo{}
return h.getESNodeInfoViaProxyWithConfig(&esConfig, auth, instance) if len(clusterInfo.ClusterIDs) > 0 {
//try connect this node to cluster by using this cluster's agent credential
for _, clusterID := range clusterInfo.ClusterIDs {
meta := elastic.GetMetadata(clusterID)
if meta != nil {
states, err := elastic.GetClient(clusterID).GetClusterState()
if err != nil || states == nil {
log.Error(err)
continue
}
clusterUUID := states.ClusterUUID
if meta.Config.AgentCredentialID != "" {
auth, err := common.GetAgentBasicAuth(meta.Config)
if err != nil {
panic(err)
}
if auth != nil {
//try connect
for _, node := range nodes.UnknownProcess {
for _, v := range node.ListenAddresses {
ip := v.IP
if util.ContainStr(v.IP, "::") {
ip = fmt.Sprintf("[%s]", v.IP)
}
nodeHost := fmt.Sprintf("%s:%d", ip, v.Port)
success, tryAgain, nodeInfo := h.getESNodeInfoViaProxy(nodeHost, "http", auth, instanceEndpoint)
if !success && tryAgain {
//try https again
success, tryAgain, nodeInfo = h.getESNodeInfoViaProxy(nodeHost, "https", auth, instanceEndpoint)
}
if success {
log.Debug("connect to es node success:", nodeHost, ", pid: ", node.PID)
discoveredPIDs[node.PID] = nodeInfo
if nodeInfo.ClusterInfo.ClusterUUID != clusterUUID {
log.Error("cluster uuid not match, cluster id: ", clusterID, ", cluster uuid: ", clusterUUID, ", node cluster uuid: ", nodeInfo.ClusterInfo.ClusterUUID)
continue
}
//enroll this node
item := BindingItem{
ClusterID: clusterID,
ClusterUUID: nodeInfo.ClusterInfo.ClusterUUID,
NodeUUID: nodeInfo.NodeUUID,
}
settings := NewNodeAgentSettings(instanceID, &item)
err = orm.Update(&orm.Context{
Refresh: "wait_for",
}, settings)
if err == nil {
nodeInfo.ClusterID = clusterID
nodeInfo.Enrolled = true
}
break
}
}
}
}
}
}
}
}
return discoveredPIDs
} }
func (h *APIHandler) getESNodeInfoViaProxyWithConfig(cfg *elastic.ElasticsearchConfig, auth *model.BasicAuth, instance *model.Instance) (success, tryAgain bool, info *elastic.LocalNodeInfo) { func (h *APIHandler) getESNodeInfoViaProxy(esHost string, esSchema string, auth *model.BasicAuth, endpoint string) (success, tryAgain bool, info *elastic.LocalNodeInfo) {
esConfig := elastic.ElasticsearchConfig{Host: esHost, Schema: esSchema, BasicAuth: auth}
return h.getESNodeInfoViaProxyWithConfig(&esConfig, auth, endpoint)
}
func (h *APIHandler) getESNodeInfoViaProxyWithConfig(cfg *elastic.ElasticsearchConfig, auth *model.BasicAuth, endpoint string) (success, tryAgain bool, info *elastic.LocalNodeInfo) {
body := util.MustToJSONBytes(cfg) body := util.MustToJSONBytes(cfg)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel() defer cancel()
@ -530,7 +614,7 @@ func (h *APIHandler) getESNodeInfoViaProxyWithConfig(cfg *elastic.ElasticsearchC
} }
obj := elastic.LocalNodeInfo{} obj := elastic.LocalNodeInfo{}
res, err := server.ProxyAgentRequest(instance.GetEndpoint(), req, &obj) res, err := server.ProxyAgentRequest(endpoint, req, &obj)
if err != nil { if err != nil {
if global.Env().IsDebug { if global.Env().IsDebug {
log.Error(err) log.Error(err)
@ -670,7 +754,7 @@ func (h *APIHandler) enrollESNode(w http.ResponseWriter, req *http.Request, ps h
//use agent credential to access the node //use agent credential to access the node
meta.Config.BasicAuth, _ = common.GetAgentBasicAuth(meta.Config) meta.Config.BasicAuth, _ = common.GetAgentBasicAuth(meta.Config)
success, _, _ := h.getESNodeInfoViaProxyWithConfig(meta.Config, meta.Config.BasicAuth, instance) success, _, _ := h.getESNodeInfoViaProxyWithConfig(meta.Config, meta.Config.BasicAuth, instance.GetEndpoint())
if success { if success {
//update node's setting //update node's setting

View File

@ -90,7 +90,7 @@ func dynamicAgentConfigProvider(instance model.Instance) []*common.ConfigFile {
//get settings with this agent id //get settings with this agent id
result := []*common.ConfigFile{} result := []*common.ConfigFile{}
ids, err := GetEnrolledNodesByAgent(&instance) ids, err := GetEnrolledNodesByAgent(instance.ID)
if err != nil { if err != nil {
panic(err) panic(err)
} }