split refresh es process list and associate it with registered cluster in console

This commit is contained in:
liugq 2023-05-24 14:23:50 +08:00
parent d788f9f418
commit 2b77b42512
4 changed files with 127 additions and 42 deletions

View File

@ -21,6 +21,8 @@ func Init() {
api.HandleAPIMethod(api.GET, "/agent/instance/:instance_id/_nodes", handler.RequirePermission(handler.getESNodesInfo, enum.PermissionAgentInstanceRead))
api.HandleAPIMethod(api.POST, "/agent/instance/:instance_id/_nodes/_refresh", handler.RequirePermission(handler.refreshESNodesInfo, enum.PermissionAgentInstanceWrite))
api.HandleAPIMethod(api.POST, "/agent/instance/:instance_id/node/_auth", handler.RequirePermission(handler.authESNode, enum.PermissionAgentInstanceWrite))
api.HandleAPIMethod(api.DELETE, "/agent/instance/:instance_id/_nodes", handler.RequirePermission(handler.deleteESNode, enum.PermissionAgentInstanceWrite))
api.HandleAPIMethod(api.POST, "/agent/instance/:instance_id/node/_associate", handler.RequirePermission(handler.associateESNode, enum.PermissionAgentInstanceWrite))
api.HandleAPIMethod(api.POST, "/host/_enroll", handler.enrollHost)
api.HandleAPIMethod(api.GET, "/host/:host_id/agent/info",handler.GetHostAgentInfo)

View File

@ -418,27 +418,6 @@ func (h *APIHandler) authESNode(w http.ResponseWriter, req *http.Request, ps htt
}
nodeInfo.ID = oldNodeInfo.ID
nodeInfo.AgentID = inst.ID
clusterCfgs := getClusterConfigs()
if nodeInfo.ClusterUuid != "" && clusterCfgs[nodeInfo.ClusterUuid] != nil {
nodeInfo.ClusterID = clusterCfgs[nodeInfo.ClusterUuid].ID
settings, err := common2.GetAgentSettings(inst.ID, 0)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
setting := pickAgentSettings(settings, *nodeInfo)
if setting == nil {
setting, err = getAgentTaskSetting(inst.ID, *nodeInfo)
if err != nil {
log.Error("get agent task setting error: ", err)
}
err = orm.Create(nil, setting)
if err != nil {
log.Error("save agent task setting error: ", err)
}
}
}
err = orm.Save(nil, nodeInfo)
if err != nil {
log.Error(err)
@ -448,6 +427,102 @@ func (h *APIHandler) authESNode(w http.ResponseWriter, req *http.Request, ps htt
h.WriteJSON(w, nodeInfo, http.StatusOK)
}
func (h *APIHandler) associateESNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
instID := ps.MustGetParameter("instance_id")
reqBody := struct {
ID string `json:"id"`
ClusterID string `json:"cluster_id"`
}{}
err := h.DecodeJSON(req, &reqBody)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
node := agent.ESNodeInfo{
ID: reqBody.ID,
}
_, err = orm.Get(&node)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
if node.AgentID != instID {
errStr := fmt.Sprintf("agent id not match: %s, %s", node.AgentID, instID)
log.Error(errStr)
h.WriteError(w, errStr, http.StatusInternalServerError)
return
}
node.ClusterID = reqBody.ClusterID
err = orm.Save(&orm.Context{
Refresh: "wait_for",
}, node)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
settings, err := common2.GetAgentSettings(instID, 0)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
setting := pickAgentSettings(settings, node)
if setting == nil {
setting, err = getAgentTaskSetting(instID, node)
if err != nil {
log.Error("get agent task setting error: ", err)
}
err = orm.Create(nil, setting)
if err != nil {
log.Error("save agent task setting error: ", err)
}
}
h.WriteAckOKJSON(w)
}
func (h *APIHandler) deleteESNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
id := ps.MustGetParameter("instance_id")
nodeIDs := []string{}
err := h.DecodeJSON(req, &nodeIDs)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
if len(nodeIDs) > 0 {
q := util.MapStr{
"query": util.MapStr{
"bool": util.MapStr{
"must": []util.MapStr{
{
"terms": util.MapStr{
"id": nodeIDs,
},
},
{
"term": util.MapStr{
"agent_id": util.MapStr{
"value": id,
},
},
},
},
},
},
}
err = orm.DeleteBy(agent.ESNodeInfo{}, util.MustToJSONBytes(q))
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
}
h.WriteAckOKJSON(w)
}
func refreshNodesInfo(inst *agent.Instance) ([]agent.ESNodeInfo, error) {
nodesInfo, err := common2.GetClient().GetElasticsearchNodes(context.Background(), inst.GetEndpoint())
if err != nil {
@ -457,10 +532,9 @@ func refreshNodesInfo(inst *agent.Instance) ([]agent.ESNodeInfo, error) {
if err != nil {
return nil, fmt.Errorf("get elasticsearch nodes info from es error: %w", err)
}
clusterCfgs := getClusterConfigs()
oldPids := map[int]struct{}{}
var resultNodes []agent.ESNodeInfo
settings, err := common2.GetAgentSettings(inst.ID, 0)
//settings, err := common2.GetAgentSettings(inst.ID, 0)
if err != nil {
return nil, err
}
@ -480,24 +554,25 @@ func refreshNodesInfo(inst *agent.Instance) ([]agent.ESNodeInfo, error) {
if node.ClusterUuid != "" {
if oldNode != nil && oldNode.ClusterID != "" {
node.ClusterID = oldNode.ClusterID
}else{
if cfg := clusterCfgs[node.ClusterUuid]; cfg != nil {
node.ClusterID = cfg.ID
setting := pickAgentSettings(settings, node)
if setting == nil {
setting, err = getAgentTaskSetting(inst.ID, node)
if err != nil {
log.Error()
}
err = orm.Create(nil, setting)
if err != nil {
log.Error("save agent task setting error: ", err)
}
}
}else{
//cluster not registered in console
}
}
//else{
// if cfg := clusterCfgs[node.ClusterUuid]; cfg != nil {
// node.ClusterID = cfg.ID
// setting := pickAgentSettings(settings, node)
// if setting == nil {
// setting, err = getAgentTaskSetting(inst.ID, node)
// if err != nil {
// log.Error()
// }
// err = orm.Create(nil, setting)
// if err != nil {
// log.Error("save agent task setting error: ", err)
// }
// }
// }else{
// //cluster not registered in console
// }
//}
}
node.Status = "online"

View File

@ -196,5 +196,8 @@ func (client *Client) doRequest(req *util.Request, respObj interface{}) error {
if result.StatusCode != 200 {
return fmt.Errorf(string(result.Body))
}
if respObj == nil {
return nil
}
return util.FromJSONBytes(result.Body, respObj)
}

View File

@ -18,6 +18,7 @@ import (
"infini.sh/framework/core/util"
"runtime"
"runtime/debug"
"strings"
"sync"
"time"
)
@ -87,6 +88,7 @@ func (sm *StateManager) checkAgentStatus() {
}
}
// status change to offline
// todo validate whether agent is offline
sm.agentIds[agentID] = StatusOffline
sm.workerChan <- struct{}{}
go func(agentID string) {
@ -148,9 +150,12 @@ func (sm *StateManager) syncSettings(agentID string) {
for _, pipelineID := range parseResult.ToDeletePipelineNames {
err = agClient.DeletePipeline(context.Background(), ag.GetEndpoint(), pipelineID)
if err != nil {
log.Errorf("delete pipeline error: %v", err)
continue
if !strings.Contains(err.Error(), "not found") {
log.Errorf("delete pipeline error: %v", err)
continue
}
}
//todo update delete pipeline state
}
for _, pipeline := range parseResult.Pipelines {
err = agClient.CreatePipeline(context.Background(), ag.GetEndpoint(), util.MustToJSONBytes(pipeline))