auto associate support multi cluster

This commit is contained in:
liugq 2023-07-19 10:20:20 +08:00
parent f92b448333
commit 503b201eb2
1 changed files with 102 additions and 100 deletions

View File

@ -625,7 +625,7 @@ func (h *APIHandler) associateESNode(w http.ResponseWriter, req *http.Request, p
func (h *APIHandler) autoAssociateESNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
reqBody := struct {
ClusterID string `json:"cluster_id"`
ClusterIDs []string `json:"cluster_ids"`
}{}
err := h.DecodeJSON(req, &reqBody)
if err != nil {
@ -633,14 +633,6 @@ func (h *APIHandler) autoAssociateESNode(w http.ResponseWriter, req *http.Reques
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
// query cluster basicauth
cfg := elastic.GetConfig(reqBody.ClusterID)
basicAuth, err := common.GetBasicAuth(cfg)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
// query not associated nodes info
nodesM, err := getUnAssociateNodes()
if err != nil {
@ -662,109 +654,119 @@ func (h *APIHandler) autoAssociateESNode(w http.ResponseWriter, req *http.Reques
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
taskSetting, err := getSettingsByClusterID(cfg.ID)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
for agentID, nodes := range nodesM {
var (
inst *agent.Instance
ok bool
)
if inst, ok = agents[agentID]; !ok {
log.Warnf("agent [%v] was not found", agentID)
continue
}
settings, err := common2.GetAgentSettings(agentID, 0)
for _, clusterID := range reqBody.ClusterIDs {
// query cluster basicauth
cfg := elastic.GetConfig(clusterID)
basicAuth, err := common.GetBasicAuth(cfg)
if err != nil {
log.Error(err)
continue
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
for _, node := range nodes {
host := node.PublishAddress
var endpoint string
if strings.HasPrefix( host, "::"){
instURL, err := url.Parse(inst.Endpoint)
if err != nil {
log.Error(err)
continue
}
host = instURL.Hostname()
endpoint = fmt.Sprintf("%s://%s:%s", node.Schema, host, node.HttpPort)
}else{
endpoint = fmt.Sprintf("%s://%s", node.Schema, host)
}
escfg := elastic.ElasticsearchConfig{
Endpoint: endpoint,
BasicAuth: &basicAuth,
}
nodeInfo, err := client.GetClient().AuthESNode(context.Background(), inst.GetEndpoint(), escfg)
if err != nil {
log.Warn(err)
taskSetting, err := getSettingsByClusterID(cfg.ID)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
for agentID, nodes := range nodesM {
var (
inst *agent.Instance
ok bool
)
if inst, ok = agents[agentID]; !ok {
log.Warnf("agent [%v] was not found", agentID)
continue
}
//matched
if nodeInfo.ClusterUuid == cfg.ClusterUUID {
//update node info
nodeInfo.ID = node.ID
nodeInfo.AgentID = inst.ID
nodeInfo.ClusterID = cfg.ID
err = orm.Save(nil, nodeInfo)
settings, err := common2.GetAgentSettings(agentID, 0)
if err != nil {
log.Error(err)
continue
}
for _, node := range nodes {
host := node.PublishAddress
var endpoint string
if strings.HasPrefix(host, "::") {
instURL, err := url.Parse(inst.Endpoint)
if err != nil {
log.Error(err)
continue
}
host = instURL.Hostname()
endpoint = fmt.Sprintf("%s://%s:%s", node.Schema, host, node.HttpPort)
} else {
endpoint = fmt.Sprintf("%s://%s", node.Schema, host)
}
escfg := elastic.ElasticsearchConfig{
Endpoint: endpoint,
BasicAuth: &basicAuth,
}
nodeInfo, err := client.GetClient().AuthESNode(context.Background(), inst.GetEndpoint(), escfg)
if err != nil {
log.Error(err)
log.Warn(err)
continue
}
setting := pickAgentSettings(settings, node)
if setting == nil {
tsetting := model.TaskSetting{
NodeStats: &model.NodeStatsTask{
Enabled: true,
},
Logs: &model.LogsTask{
Enabled: true,
LogsPath: nodeInfo.Path.Logs,
},
}
if taskSetting.IndexStats != nil {
tsetting.IndexStats = taskSetting.IndexStats
taskSetting.IndexStats = nil
}
if taskSetting.ClusterHealth != nil {
tsetting.ClusterHealth = taskSetting.ClusterHealth
taskSetting.ClusterHealth = nil
}
if taskSetting.ClusterStats != nil {
tsetting.ClusterStats = taskSetting.ClusterStats
taskSetting.ClusterStats = nil
}
setting = &agent.Setting{
Metadata: agent.SettingsMetadata{
Category: "agent",
Name: "task",
Labels: util.MapStr{
"agent_id": agentID,
"cluster_uuid": nodeInfo.ClusterUuid,
"cluster_id": nodeInfo.ClusterID,
"node_uuid": nodeInfo.NodeUUID,
"endpoint": fmt.Sprintf("%s://%s", nodeInfo.Schema, nodeInfo.PublishAddress),
},
},
Payload: util.MapStr{
"task": tsetting,
},
}
err = orm.Create(nil, setting)
//matched
if nodeInfo.ClusterUuid == cfg.ClusterUUID {
//update node info
nodeInfo.ID = node.ID
nodeInfo.AgentID = inst.ID
nodeInfo.ClusterID = cfg.ID
err = orm.Save(nil, nodeInfo)
if err != nil {
log.Error("save agent task setting error: ", err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
log.Error(err)
continue
}
setting := pickAgentSettings(settings, node)
if setting == nil {
tsetting := model.TaskSetting{
NodeStats: &model.NodeStatsTask{
Enabled: true,
},
Logs: &model.LogsTask{
Enabled: true,
LogsPath: nodeInfo.Path.Logs,
},
}
if taskSetting.IndexStats != nil {
tsetting.IndexStats = taskSetting.IndexStats
taskSetting.IndexStats = nil
}
if taskSetting.ClusterHealth != nil {
tsetting.ClusterHealth = taskSetting.ClusterHealth
taskSetting.ClusterHealth = nil
}
if taskSetting.ClusterStats != nil {
tsetting.ClusterStats = taskSetting.ClusterStats
taskSetting.ClusterStats = nil
}
setting = &agent.Setting{
Metadata: agent.SettingsMetadata{
Category: "agent",
Name: "task",
Labels: util.MapStr{
"agent_id": agentID,
"cluster_uuid": nodeInfo.ClusterUuid,
"cluster_id": nodeInfo.ClusterID,
"node_uuid": nodeInfo.NodeUUID,
"endpoint": fmt.Sprintf("%s://%s", nodeInfo.Schema, nodeInfo.PublishAddress),
},
},
Payload: util.MapStr{
"task": tsetting,
},
}
err = orm.Create(nil, setting)
if err != nil {
log.Error("save agent task setting error: ", err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
}
}
}
}
}
}
h.WriteAckOKJSON(w)
}
@ -985,7 +987,7 @@ func getNodesInfoFromES(agentID string) (map[int]*agent.ESNodeInfo, error) {
func getUnAssociateNodes() (map[string][]agent.ESNodeInfo, error){
query := util.MapStr{
"size": 1200,
"size": 3000,
"query": util.MapStr{
"bool": util.MapStr{
"must_not": []util.MapStr{