fix: enroll should handle recognized nodes (#261)
Reviewed-on: https://git.infini.ltd/infini/console/pulls/261
This commit is contained in:
parent
0046d4d4ab
commit
0a6d0e5696
|
@ -379,7 +379,7 @@ type ClusterInfo struct {
|
||||||
ClusterIDs []string `json:"cluster_id"`
|
ClusterIDs []string `json:"cluster_id"`
|
||||||
}
|
}
|
||||||
|
|
||||||
var autoEnrollRunning=atomic.Bool{}
|
var autoEnrollRunning = atomic.Bool{}
|
||||||
|
|
||||||
func (h *APIHandler) autoEnrollESNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
func (h *APIHandler) autoEnrollESNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
||||||
//{"cluster_id":["infini_default_system_cluster"]}
|
//{"cluster_id":["infini_default_system_cluster"]}
|
||||||
|
@ -398,8 +398,7 @@ func (h *APIHandler) autoEnrollESNode(w http.ResponseWriter, req *http.Request,
|
||||||
panic(errors.New("please select cluster to enroll"))
|
panic(errors.New("please select cluster to enroll"))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if autoEnrollRunning.Load() {
|
||||||
if autoEnrollRunning.Load(){
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -407,18 +406,20 @@ func (h *APIHandler) autoEnrollESNode(w http.ResponseWriter, req *http.Request,
|
||||||
go func(clusterInfo ClusterInfo) {
|
go func(clusterInfo ClusterInfo) {
|
||||||
defer func() {
|
defer func() {
|
||||||
autoEnrollRunning.Swap(false)
|
autoEnrollRunning.Swap(false)
|
||||||
if r := recover(); r != nil {
|
if !global.Env().IsDebug {
|
||||||
var v string
|
if r := recover(); r != nil {
|
||||||
switch r.(type) {
|
var v string
|
||||||
case error:
|
switch r.(type) {
|
||||||
v = r.(error).Error()
|
case error:
|
||||||
case runtime.Error:
|
v = r.(error).Error()
|
||||||
v = r.(runtime.Error).Error()
|
case runtime.Error:
|
||||||
case string:
|
v = r.(runtime.Error).Error()
|
||||||
v = r.(string)
|
case string:
|
||||||
}
|
v = r.(string)
|
||||||
if v != "" {
|
}
|
||||||
log.Error(v)
|
if v != "" {
|
||||||
|
log.Error(v)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.Debug("finish auto enroll")
|
log.Debug("finish auto enroll")
|
||||||
|
@ -451,15 +452,24 @@ func (h *APIHandler) autoEnrollESNode(w http.ResponseWriter, req *http.Request,
|
||||||
}
|
}
|
||||||
log.Debugf("instance:%v,%v, has: %v nodes, %v unknown nodes", instanceID, instanceEndpoint, len(nodes.Nodes), len(nodes.UnknownProcess))
|
log.Debugf("instance:%v,%v, has: %v nodes, %v unknown nodes", instanceID, instanceEndpoint, len(nodes.Nodes), len(nodes.UnknownProcess))
|
||||||
if len(nodes.UnknownProcess) > 0 {
|
if len(nodes.UnknownProcess) > 0 {
|
||||||
pids:=h.bindInstanceToCluster(clusterInfo, nodes, instanceID, instanceEndpoint)
|
pids := h.bindInstanceToCluster(clusterInfo, nodes, instanceID, instanceEndpoint)
|
||||||
log.Infof("instance:%v,%v, success enroll %v nodes",instanceID, instanceEndpoint,len(pids))
|
log.Infof("instance:%v,%v, success enroll %v nodes", instanceID, instanceEndpoint, len(pids))
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(nodes.Nodes)>0{
|
||||||
|
for k,v:=range nodes.Nodes{
|
||||||
|
log.Debug(k,v.Status,v.Enrolled)
|
||||||
|
if !v.Enrolled{
|
||||||
|
pids := h.bindInstanceToCluster(clusterInfo, nodes, instanceID, instanceEndpoint)
|
||||||
|
log.Infof("instance:%v,%v, success enroll %v nodes", instanceID, instanceEndpoint, len(pids))
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}(clusterInfo)
|
}(clusterInfo)
|
||||||
|
|
||||||
//get all unknown nodes
|
//get all unknown nodes
|
||||||
|
@ -528,7 +538,6 @@ func (h *APIHandler) bindInstanceToCluster(clusterInfo ClusterInfo, nodes *elast
|
||||||
for _, clusterID := range clusterInfo.ClusterIDs {
|
for _, clusterID := range clusterInfo.ClusterIDs {
|
||||||
meta := elastic.GetMetadata(clusterID)
|
meta := elastic.GetMetadata(clusterID)
|
||||||
if meta != nil {
|
if meta != nil {
|
||||||
|
|
||||||
states, err := elastic.GetClient(clusterID).GetClusterState()
|
states, err := elastic.GetClient(clusterID).GetClusterState()
|
||||||
if err != nil || states == nil {
|
if err != nil || states == nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
|
@ -537,53 +546,43 @@ func (h *APIHandler) bindInstanceToCluster(clusterInfo ClusterInfo, nodes *elast
|
||||||
|
|
||||||
clusterUUID := states.ClusterUUID
|
clusterUUID := states.ClusterUUID
|
||||||
|
|
||||||
if meta.Config.AgentCredentialID != "" {
|
//no auth or agent auth configured
|
||||||
|
if meta.Config.AgentCredentialID != "" || meta.Config.CredentialID == "" {
|
||||||
auth, err := common.GetAgentBasicAuth(meta.Config)
|
auth, err := common.GetAgentBasicAuth(meta.Config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
if auth != nil {
|
|
||||||
//try connect
|
for _,v:=range nodes.Nodes{
|
||||||
for _, node := range nodes.UnknownProcess {
|
if !v.Enrolled{
|
||||||
for _, v := range node.ListenAddresses {
|
if v.NodeInfo!=nil{
|
||||||
ip := v.IP
|
pid:=v.NodeInfo.Process.Id
|
||||||
if util.ContainStr(v.IP, "::") {
|
nodeHost:=v.NodeInfo.GetHttpPublishHost()
|
||||||
ip = fmt.Sprintf("[%s]", v.IP)
|
nodeInfo:=h.internalProcessBind(clusterID,clusterUUID,instanceID,instanceEndpoint,pid,nodeHost,auth)
|
||||||
}
|
if nodeInfo!=nil{
|
||||||
nodeHost := fmt.Sprintf("%s:%d", ip, v.Port)
|
discoveredPIDs[pid] = nodeInfo
|
||||||
success, tryAgain, nodeInfo := h.getESNodeInfoViaProxy(nodeHost, "http", auth, instanceEndpoint)
|
|
||||||
if !success && tryAgain {
|
|
||||||
//try https again
|
|
||||||
success, tryAgain, nodeInfo = h.getESNodeInfoViaProxy(nodeHost, "https", auth, instanceEndpoint)
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if success {
|
//try connect
|
||||||
log.Debug("connect to es node success:", nodeHost, ", pid: ", node.PID)
|
for _, node := range nodes.UnknownProcess {
|
||||||
discoveredPIDs[node.PID] = nodeInfo
|
|
||||||
|
|
||||||
if nodeInfo.ClusterInfo.ClusterUUID != clusterUUID {
|
pid:=node.PID
|
||||||
log.Error("cluster uuid not match, cluster id: ", clusterID, ", cluster uuid: ", clusterUUID, ", node cluster uuid: ", nodeInfo.ClusterInfo.ClusterUUID)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
//enroll this node
|
for _, v := range node.ListenAddresses {
|
||||||
item := BindingItem{
|
|
||||||
ClusterID: clusterID,
|
|
||||||
ClusterUUID: nodeInfo.ClusterInfo.ClusterUUID,
|
|
||||||
NodeUUID: nodeInfo.NodeUUID,
|
|
||||||
}
|
|
||||||
|
|
||||||
settings := NewNodeAgentSettings(instanceID, &item)
|
ip := v.IP
|
||||||
err = orm.Update(&orm.Context{
|
port:=v.Port
|
||||||
Refresh: "wait_for",
|
|
||||||
}, settings)
|
|
||||||
|
|
||||||
if err == nil {
|
if util.ContainStr(ip, "::") {
|
||||||
nodeInfo.ClusterID = clusterID
|
ip = fmt.Sprintf("[%s]", ip)
|
||||||
nodeInfo.Enrolled = true
|
}
|
||||||
}
|
nodeHost := fmt.Sprintf("%s:%d", ip, port)
|
||||||
break
|
nodeInfo:=h.internalProcessBind(clusterID,clusterUUID,instanceID,instanceEndpoint,pid,nodeHost,auth)
|
||||||
}
|
if nodeInfo!=nil{
|
||||||
|
discoveredPIDs[pid] = nodeInfo
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -594,6 +593,44 @@ func (h *APIHandler) bindInstanceToCluster(clusterInfo ClusterInfo, nodes *elast
|
||||||
return discoveredPIDs
|
return discoveredPIDs
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *APIHandler) internalProcessBind(clusterID,clusterUUID,instanceID,instanceEndpoint string,pid int,nodeHost string,auth *model.BasicAuth) *elastic.LocalNodeInfo{
|
||||||
|
success, tryAgain, nodeInfo := h.getESNodeInfoViaProxy(nodeHost, "http", auth, instanceEndpoint)
|
||||||
|
if !success && tryAgain {
|
||||||
|
//try https again
|
||||||
|
success, tryAgain, nodeInfo = h.getESNodeInfoViaProxy(nodeHost, "https", auth, instanceEndpoint)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debug(clusterUUID,nodeHost,instanceEndpoint,success, tryAgain, nodeInfo)
|
||||||
|
|
||||||
|
if success {
|
||||||
|
log.Debug("connect to es node success:", nodeHost, ", pid: ", pid)
|
||||||
|
if nodeInfo.ClusterInfo.ClusterUUID != clusterUUID {
|
||||||
|
log.Info("cluster uuid not match, cluster id: ", clusterID, ", cluster uuid: ", clusterUUID, ", node cluster uuid: ", nodeInfo.ClusterInfo.ClusterUUID)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//enroll this node
|
||||||
|
item := BindingItem{
|
||||||
|
ClusterID: clusterID,
|
||||||
|
ClusterUUID: nodeInfo.ClusterInfo.ClusterUUID,
|
||||||
|
NodeUUID: nodeInfo.NodeUUID,
|
||||||
|
}
|
||||||
|
|
||||||
|
settings := NewNodeAgentSettings(instanceID, &item)
|
||||||
|
err := orm.Update(&orm.Context{
|
||||||
|
Refresh: "wait_for",
|
||||||
|
}, settings)
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
nodeInfo.ClusterID = clusterID
|
||||||
|
nodeInfo.Enrolled = true
|
||||||
|
}
|
||||||
|
return nodeInfo
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
func (h *APIHandler) getESNodeInfoViaProxy(esHost string, esSchema string, auth *model.BasicAuth, endpoint string) (success, tryAgain bool, info *elastic.LocalNodeInfo) {
|
func (h *APIHandler) getESNodeInfoViaProxy(esHost string, esSchema string, auth *model.BasicAuth, endpoint string) (success, tryAgain bool, info *elastic.LocalNodeInfo) {
|
||||||
esConfig := elastic.ElasticsearchConfig{Host: esHost, Schema: esSchema, BasicAuth: auth}
|
esConfig := elastic.ElasticsearchConfig{Host: esHost, Schema: esSchema, BasicAuth: auth}
|
||||||
return h.getESNodeInfoViaProxyWithConfig(&esConfig, auth, endpoint)
|
return h.getESNodeInfoViaProxyWithConfig(&esConfig, auth, endpoint)
|
||||||
|
|
|
@ -22,7 +22,7 @@ func GetAgentConfig() *model.AgentConfig {
|
||||||
}
|
}
|
||||||
_, err := env.ParseConfig("agent", agentCfg )
|
_, err := env.ParseConfig("agent", agentCfg )
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Debug("agent config not found: %v", err)
|
log.Errorf("agent config not found: %v", err)
|
||||||
}
|
}
|
||||||
if agentCfg.Setup.CACertFile == "" && agentCfg.Setup.CAKeyFile == "" {
|
if agentCfg.Setup.CACertFile == "" && agentCfg.Setup.CAKeyFile == "" {
|
||||||
agentCfg.Setup.CACertFile, agentCfg.Setup.CAKeyFile, err = common.GetOrInitDefaultCaCerts()
|
agentCfg.Setup.CACertFile, agentCfg.Setup.CAKeyFile, err = common.GetOrInitDefaultCaCerts()
|
||||||
|
|
Loading…
Reference in New Issue