diff --git a/config/setup/install_agent.tpl b/config/install_agent.tpl similarity index 100% rename from config/setup/install_agent.tpl rename to config/install_agent.tpl diff --git a/modules/agent/api/elasticsearch.go b/modules/agent/api/elasticsearch.go index 168608ed..b0830a71 100644 --- a/modules/agent/api/elasticsearch.go +++ b/modules/agent/api/elasticsearch.go @@ -6,6 +6,7 @@ package api import ( "context" + "errors" "fmt" log "github.com/cihub/seelog" httprouter "infini.sh/framework/core/api/router" @@ -19,18 +20,20 @@ import ( "infini.sh/framework/modules/elastic/metadata" "infini.sh/framework/plugins/managed/server" "net/http" + "runtime" + "sync/atomic" "time" ) //node -> binding item -func GetEnrolledNodesByAgent(instance *model.Instance) (map[string]BindingItem, error) { +func GetEnrolledNodesByAgent(instanceID string) (map[string]BindingItem, error) { //get nodes settings where agent id = instance id q := orm.Query{ Size: 1000, Conds: orm.And(orm.Eq("metadata.category", "node_settings"), orm.Eq("metadata.name", "agent"), - orm.Eq("metadata.labels.agent_id", instance.ID), + orm.Eq("metadata.labels.agent_id", instanceID), ), } @@ -74,15 +77,15 @@ func GetEnrolledNodesByAgent(instance *model.Instance) (map[string]BindingItem, return ids, nil } -func refreshNodesInfo(inst *model.Instance) (*elastic.DiscoveryResult, error) { - enrolledNodesByAgent, err := GetEnrolledNodesByAgent(inst) +func refreshNodesInfo(instanceID, instanceEndpoint string) (*elastic.DiscoveryResult, error) { + enrolledNodesByAgent, err := GetEnrolledNodesByAgent(instanceID) if err != nil { return nil, fmt.Errorf("error on get binding nodes info: %w", err) } ctxTimeout, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() - nodesInfo, err := GetElasticsearchNodesViaAgent(ctxTimeout, inst) + nodesInfo, err := GetElasticsearchNodesViaAgent(ctxTimeout, instanceEndpoint) if err != nil { //TODO return already biding nodes info ?? return nil, fmt.Errorf("error on get nodes info from agent: %w", err) @@ -175,7 +178,7 @@ func refreshNodesInfo(inst *model.Instance) (*elastic.DiscoveryResult, error) { } //get nodes info via agent -func GetElasticsearchNodesViaAgent(ctx context.Context, instance *model.Instance) (*elastic.DiscoveryResult, error) { +func GetElasticsearchNodesViaAgent(ctx context.Context, endpoint string) (*elastic.DiscoveryResult, error) { req := &util.Request{ Method: http.MethodGet, Path: "/elasticsearch/node/_discovery", @@ -183,7 +186,7 @@ func GetElasticsearchNodesViaAgent(ctx context.Context, instance *model.Instance } obj := elastic.DiscoveryResult{} - _, err := server.ProxyAgentRequest(instance.GetEndpoint(), req, &obj) + _, err := server.ProxyAgentRequest(endpoint, req, &obj) if err != nil { return nil, err } @@ -193,7 +196,7 @@ func GetElasticsearchNodesViaAgent(ctx context.Context, instance *model.Instance type BindingItem struct { //infini system assigned id - ClusterID string `json:"cluster_id"` + ClusterID string `json:"cluster_id"` ClusterUUID string `json:"cluster_uuid"` NodeUUID string `json:"node_uuid"` @@ -333,8 +336,8 @@ func getAgentByNodeID(clusterID, nodeID string) (*model.Instance, string, error) return nil, "", err } - nodeInfo,err:=metadata.GetNodeConfig(clusterID, nodeID) - if err!=nil||nodeInfo==nil{ + nodeInfo, err := metadata.GetNodeConfig(clusterID, nodeID) + if err != nil || nodeInfo == nil { log.Error("node info is nil") return nil, "", err } @@ -376,17 +379,94 @@ type ClusterInfo struct { ClusterIDs []string `json:"cluster_id"` } +var autoEnrollRunning=atomic.Bool{} + func (h *APIHandler) autoEnrollESNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - //{"cluster_id":["infini_default_system_cluster"]} + clusterInfo := ClusterInfo{} + if req.Method == "POST" { + bytes, err := h.GetRawBody(req) + if err != nil { + panic(err) + } + if len(bytes) > 0 { + util.FromJSONBytes(bytes, &clusterInfo) + } + } + + if len(clusterInfo.ClusterIDs) <= 0 { + panic(errors.New("please select cluster to enroll")) + } + + + if autoEnrollRunning.Load(){ + return + } + + autoEnrollRunning.Swap(true) + go func(clusterInfo ClusterInfo) { + defer func() { + autoEnrollRunning.Swap(false) + if r := recover(); r != nil { + var v string + switch r.(type) { + case error: + v = r.(error).Error() + case runtime.Error: + v = r.(runtime.Error).Error() + case string: + v = r.(string) + } + if v != "" { + log.Error(v) + } + } + log.Debug("finish auto enroll") + }() + + log.Debug("start auto enroll") + //get instances + q := &orm.Query{Conds: orm.And(orm.Eq("application.name", "agent"))} + q.From = 0 + q.Size = 50000 + err, res := orm.Search(&model.Instance{}, q) + if err != nil { + log.Error(err) + return + } + + for _, v := range res.Result { + f, ok := v.(map[string]interface{}) + if ok { + instanceIDObj, ok1 := f["id"] + instanceEndpointObj, ok2 := f["endpoint"] + if ok1 && ok2 { + instanceID, ok1 := instanceIDObj.(string) + instanceEndpoint, ok2 := instanceEndpointObj.(string) + if ok1 && ok2 { + nodes, err := refreshNodesInfo(instanceID, instanceEndpoint) + if err != nil { + log.Error(err) + continue + } + log.Debugf("instance:%v,%v, has: %v nodes, %v unknown nodes", instanceID, instanceEndpoint, len(nodes.Nodes), len(nodes.UnknownProcess)) + if len(nodes.UnknownProcess) > 0 { + pids:=h.bindInstanceToCluster(clusterInfo, nodes, instanceID, instanceEndpoint) + log.Infof("instance:%v,%v, success enroll %v nodes",instanceID, instanceEndpoint,len(pids)) + } + } + } + } + } + + + }(clusterInfo) - //get instances //get all unknown nodes //check each process with cluster id //send this to background task - h.WriteAckOKJSON(w) } @@ -404,14 +484,13 @@ func (h *APIHandler) discoveryESNodesInfo(w http.ResponseWriter, req *http.Reque return } - nodes, err := refreshNodesInfo(&instance) + nodes, err := refreshNodesInfo(instance.ID, instance.GetEndpoint()) if err != nil { h.WriteError(w, err.Error(), http.StatusInternalServerError) return } if len(nodes.UnknownProcess) > 0 { - var discoveredPIDs map[int]*elastic.LocalNodeInfo = make(map[int]*elastic.LocalNodeInfo) if req.Method == "POST" { bytes, err := h.GetRawBody(req) @@ -422,75 +501,7 @@ func (h *APIHandler) discoveryESNodesInfo(w http.ResponseWriter, req *http.Reque if len(bytes) > 0 { clusterInfo := ClusterInfo{} util.FromJSONBytes(bytes, &clusterInfo) - if len(clusterInfo.ClusterIDs) > 0 { - //try connect this node to cluster by using this cluster's agent credential - for _, clusterID := range clusterInfo.ClusterIDs { - meta := elastic.GetMetadata(clusterID) - if meta != nil { - - states,err:=elastic.GetClient(clusterID).GetClusterState() - if err!=nil||states==nil{ - log.Error(err) - continue - } - - clusterUUID:=states.ClusterUUID - - if meta.Config.AgentCredentialID != "" { - auth, err := common.GetAgentBasicAuth(meta.Config) - if err != nil { - panic(err) - } - if auth != nil { - //try connect - for _, node := range nodes.UnknownProcess { - for _, v := range node.ListenAddresses { - ip := v.IP - if util.ContainStr(v.IP, "::") { - ip = fmt.Sprintf("[%s]", v.IP) - } - nodeHost := fmt.Sprintf("%s:%d", ip, v.Port) - success, tryAgain, nodeInfo := h.getESNodeInfoViaProxy(nodeHost, "http", auth, &instance) - if !success && tryAgain { - //try https again - success, tryAgain, nodeInfo = h.getESNodeInfoViaProxy(nodeHost, "https", auth, &instance) - } - - if success { - log.Debug("connect to es node success:", nodeHost, ", pid: ", node.PID) - discoveredPIDs[node.PID] = nodeInfo - - - if nodeInfo.ClusterInfo.ClusterUUID!=clusterUUID{ - log.Error("cluster uuid not match, cluster id: ", clusterID, ", cluster uuid: ", clusterUUID, ", node cluster uuid: ", nodeInfo.ClusterInfo.ClusterUUID) - continue - } - - //enroll this node - item := BindingItem{ - ClusterID: clusterID, - ClusterUUID: nodeInfo.ClusterInfo.ClusterUUID, - NodeUUID: nodeInfo.NodeUUID, - } - - settings := NewNodeAgentSettings(instance.ID, &item) - err = orm.Update(&orm.Context{ - Refresh: "wait_for", - }, settings) - - if err == nil { - nodeInfo.ClusterID = clusterID - nodeInfo.Enrolled = true - } - break - } - } - } - } - } - } - } - } + discoveredPIDs = h.bindInstanceToCluster(clusterInfo, nodes, instance.ID, instance.GetEndpoint()) } } @@ -510,12 +521,85 @@ func (h *APIHandler) discoveryESNodesInfo(w http.ResponseWriter, req *http.Reque h.WriteJSON(w, nodes, http.StatusOK) } -func (h *APIHandler) getESNodeInfoViaProxy(esHost string, esSchema string, auth *model.BasicAuth, instance *model.Instance) (success, tryAgain bool, info *elastic.LocalNodeInfo) { - esConfig := elastic.ElasticsearchConfig{Host: esHost, Schema: esSchema, BasicAuth: auth} - return h.getESNodeInfoViaProxyWithConfig(&esConfig, auth, instance) +func (h *APIHandler) bindInstanceToCluster(clusterInfo ClusterInfo, nodes *elastic.DiscoveryResult, instanceID, instanceEndpoint string) map[int]*elastic.LocalNodeInfo { + discoveredPIDs := map[int]*elastic.LocalNodeInfo{} + if len(clusterInfo.ClusterIDs) > 0 { + //try connect this node to cluster by using this cluster's agent credential + for _, clusterID := range clusterInfo.ClusterIDs { + meta := elastic.GetMetadata(clusterID) + if meta != nil { + + states, err := elastic.GetClient(clusterID).GetClusterState() + if err != nil || states == nil { + log.Error(err) + continue + } + + clusterUUID := states.ClusterUUID + + if meta.Config.AgentCredentialID != "" { + auth, err := common.GetAgentBasicAuth(meta.Config) + if err != nil { + panic(err) + } + if auth != nil { + //try connect + for _, node := range nodes.UnknownProcess { + for _, v := range node.ListenAddresses { + ip := v.IP + if util.ContainStr(v.IP, "::") { + ip = fmt.Sprintf("[%s]", v.IP) + } + nodeHost := fmt.Sprintf("%s:%d", ip, v.Port) + success, tryAgain, nodeInfo := h.getESNodeInfoViaProxy(nodeHost, "http", auth, instanceEndpoint) + if !success && tryAgain { + //try https again + success, tryAgain, nodeInfo = h.getESNodeInfoViaProxy(nodeHost, "https", auth, instanceEndpoint) + } + + if success { + log.Debug("connect to es node success:", nodeHost, ", pid: ", node.PID) + discoveredPIDs[node.PID] = nodeInfo + + if nodeInfo.ClusterInfo.ClusterUUID != clusterUUID { + log.Error("cluster uuid not match, cluster id: ", clusterID, ", cluster uuid: ", clusterUUID, ", node cluster uuid: ", nodeInfo.ClusterInfo.ClusterUUID) + continue + } + + //enroll this node + item := BindingItem{ + ClusterID: clusterID, + ClusterUUID: nodeInfo.ClusterInfo.ClusterUUID, + NodeUUID: nodeInfo.NodeUUID, + } + + settings := NewNodeAgentSettings(instanceID, &item) + err = orm.Update(&orm.Context{ + Refresh: "wait_for", + }, settings) + + if err == nil { + nodeInfo.ClusterID = clusterID + nodeInfo.Enrolled = true + } + break + } + } + } + } + } + } + } + } + return discoveredPIDs } -func (h *APIHandler) getESNodeInfoViaProxyWithConfig(cfg *elastic.ElasticsearchConfig, auth *model.BasicAuth, instance *model.Instance) (success, tryAgain bool, info *elastic.LocalNodeInfo) { +func (h *APIHandler) getESNodeInfoViaProxy(esHost string, esSchema string, auth *model.BasicAuth, endpoint string) (success, tryAgain bool, info *elastic.LocalNodeInfo) { + esConfig := elastic.ElasticsearchConfig{Host: esHost, Schema: esSchema, BasicAuth: auth} + return h.getESNodeInfoViaProxyWithConfig(&esConfig, auth, endpoint) +} + +func (h *APIHandler) getESNodeInfoViaProxyWithConfig(cfg *elastic.ElasticsearchConfig, auth *model.BasicAuth, endpoint string) (success, tryAgain bool, info *elastic.LocalNodeInfo) { body := util.MustToJSONBytes(cfg) ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() @@ -530,7 +614,7 @@ func (h *APIHandler) getESNodeInfoViaProxyWithConfig(cfg *elastic.ElasticsearchC } obj := elastic.LocalNodeInfo{} - res, err := server.ProxyAgentRequest(instance.GetEndpoint(), req, &obj) + res, err := server.ProxyAgentRequest(endpoint, req, &obj) if err != nil { if global.Env().IsDebug { log.Error(err) @@ -670,7 +754,7 @@ func (h *APIHandler) enrollESNode(w http.ResponseWriter, req *http.Request, ps h //use agent credential to access the node meta.Config.BasicAuth, _ = common.GetAgentBasicAuth(meta.Config) - success, _, _ := h.getESNodeInfoViaProxyWithConfig(meta.Config, meta.Config.BasicAuth, instance) + success, _, _ := h.getESNodeInfoViaProxyWithConfig(meta.Config, meta.Config.BasicAuth, instance.GetEndpoint()) if success { //update node's setting diff --git a/modules/agent/api/remote_config.go b/modules/agent/api/remote_config.go index 3f8c663f..6a21251f 100644 --- a/modules/agent/api/remote_config.go +++ b/modules/agent/api/remote_config.go @@ -90,7 +90,7 @@ func dynamicAgentConfigProvider(instance model.Instance) []*common.ConfigFile { //get settings with this agent id result := []*common.ConfigFile{} - ids, err := GetEnrolledNodesByAgent(&instance) + ids, err := GetEnrolledNodesByAgent(instance.ID) if err != nil { panic(err) }