realtime access agent status and node processes

This commit is contained in:
liugq 2023-06-10 19:42:01 +08:00
parent 796b78abec
commit ef8fe41089
2 changed files with 53 additions and 82 deletions

View File

@ -21,7 +21,6 @@ import (
"infini.sh/framework/core/api" "infini.sh/framework/core/api"
httprouter "infini.sh/framework/core/api/router" httprouter "infini.sh/framework/core/api/router"
"infini.sh/framework/core/elastic" "infini.sh/framework/core/elastic"
"infini.sh/framework/core/event"
"infini.sh/framework/core/orm" "infini.sh/framework/core/orm"
"infini.sh/framework/core/util" "infini.sh/framework/core/util"
elastic2 "infini.sh/framework/modules/elastic" elastic2 "infini.sh/framework/modules/elastic"
@ -110,10 +109,6 @@ func (h *APIHandler) createInstance(w http.ResponseWriter, req *http.Request, ps
log.Error(err) log.Error(err)
return return
} }
_, err = refreshNodesInfo(obj)
if err != nil {
log.Error(err)
}
err = client.GetClient().SaveIngestConfig(context.Background(), obj.GetEndpoint()) err = client.GetClient().SaveIngestConfig(context.Background(), obj.GetEndpoint())
if err != nil { if err != nil {
log.Error(err) log.Error(err)
@ -221,50 +216,17 @@ func (h *APIHandler) getInstanceStats(w http.ResponseWriter, req *http.Request,
h.WriteJSON(w, util.MapStr{}, http.StatusOK) h.WriteJSON(w, util.MapStr{}, http.StatusOK)
return return
} }
q := orm.Query{ q := orm.Query{}
WildcardIndex: true,
}
queryDSL := util.MapStr{ queryDSL := util.MapStr{
"sort": []util.MapStr{
{
"timestamp": util.MapStr{
"order": "desc",
},
},
},
"collapse": util.MapStr{
"field": "agent.id",
},
"query": util.MapStr{ "query": util.MapStr{
"bool": util.MapStr{
"filter": []util.MapStr{
{
"range": util.MapStr{
"timestamp": util.MapStr{
"gte": "now-1m",
},
},
},
},
"must": []util.MapStr{
{
"term": util.MapStr{
"metadata.name": util.MapStr{
"value": "agent",
},
},
}, {
"terms": util.MapStr{ "terms": util.MapStr{
"agent.id": instanceIDs, "_id": instanceIDs,
},
},
},
}, },
}, },
} }
q.RawQuery = util.MustToJSONBytes(queryDSL) q.RawQuery = util.MustToJSONBytes(queryDSL)
err, res := orm.Search(event.Event{}, &q) err, res := orm.Search(&agent.Instance{}, &q)
if err != nil { if err != nil {
log.Error(err) log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError) h.WriteError(w, err.Error(), http.StatusInternalServerError)
@ -272,27 +234,32 @@ func (h *APIHandler) getInstanceStats(w http.ResponseWriter, req *http.Request,
} }
result := util.MapStr{} result := util.MapStr{}
for _, item := range res.Result { for _, item := range res.Result {
if itemV, ok := item.(map[string]interface{}); ok { instBytes, err := util.ToJSONBytes(item)
if agentID, ok := util.GetMapValueByKeys([]string{"agent", "id"}, itemV); ok { if err != nil {
if v, ok := agentID.(string); ok { log.Error(err)
if ab, ok := util.GetMapValueByKeys([]string{"payload", "instance", "system"}, itemV); ok { continue
if abV, ok := ab.(map[string]interface{}); ok {
result[v] = util.MapStr{
"timestamp": itemV["timestamp"],
"system": util.MapStr{
"cpu": abV["cpu"],
"mem": abV["mem"],
"uptime_in_ms": abV["uptime_in_ms"],
"status": "online",
},
}
}
}
}
} }
instance := agent.Instance{}
err = util.FromJSONBytes(instBytes, &instance)
if err != nil {
log.Error(err)
continue
} }
agReq := &util.Request{
Method: http.MethodGet,
Url: fmt.Sprintf("%s/stats", instance.GetEndpoint()),
} }
var resMap = util.MapStr{}
err = client.GetClient().DoRequest(agReq, &resMap)
if err != nil {
log.Error(err)
result[instance.ID] = util.MapStr{}
continue
}
result[instance.ID] = resMap
}
h.WriteJSON(w, result, http.StatusOK) h.WriteJSON(w, result, http.StatusOK)
} }
@ -409,16 +376,12 @@ func (h *APIHandler) getESNodesInfo(w http.ResponseWriter, req *http.Request, ps
}, http.StatusNotFound) }, http.StatusNotFound)
return return
} }
nodesM, err := getNodesInfoFromES(obj.ID) nodes, err := refreshNodesInfo(&obj)
if err != nil { if err != nil {
log.Error(err) log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError) h.WriteError(w, err.Error(), http.StatusInternalServerError)
return return
} }
var nodes []*agent.ESNodeInfo
for _, node := range nodesM {
nodes = append(nodes, node)
}
h.WriteJSON(w, nodes, http.StatusOK) h.WriteJSON(w, nodes, http.StatusOK)
} }
@ -656,14 +619,21 @@ func (h *APIHandler) tryConnect(w http.ResponseWriter, req *http.Request, ps htt
} }
func refreshNodesInfo(inst *agent.Instance) ([]agent.ESNodeInfo, error) { func refreshNodesInfo(inst *agent.Instance) ([]agent.ESNodeInfo, error) {
nodesInfo, err := client.GetClient().GetElasticsearchNodes(context.Background(), inst.GetEndpoint())
if err != nil {
return nil, fmt.Errorf("get elasticsearch nodes error: %w", err)
}
oldNodesInfo, err := getNodesInfoFromES(inst.ID) oldNodesInfo, err := getNodesInfoFromES(inst.ID)
if err != nil { if err != nil {
return nil, fmt.Errorf("get elasticsearch nodes info from es error: %w", err) return nil, fmt.Errorf("get elasticsearch nodes info from es error: %w", err)
} }
nodesInfo, err := client.GetClient().GetElasticsearchNodes(context.Background(), inst.GetEndpoint())
if err != nil {
log.Errorf("get elasticsearch nodes error: %v", err)
//return nodes info from es after failed to get nodes info from agent
var nodes = []agent.ESNodeInfo{}
for _, nodeInfo := range oldNodesInfo {
nodes = append(nodes, *nodeInfo)
}
return nodes, nil
}
oldPids := map[int]struct{}{} oldPids := map[int]struct{}{}
var resultNodes []agent.ESNodeInfo var resultNodes []agent.ESNodeInfo
if err != nil { if err != nil {

View File

@ -41,6 +41,7 @@ type ClientAPI interface {
SetKeystoreValue(ctx context.Context, agentBaseURL string, key, value string) error SetKeystoreValue(ctx context.Context, agentBaseURL string, key, value string) error
SaveDynamicConfig(ctx context.Context, agentBaseURL string, name, content string) error SaveDynamicConfig(ctx context.Context, agentBaseURL string, name, content string) error
SaveIngestConfig(ctx context.Context, agentBaseURL string) error SaveIngestConfig(ctx context.Context, agentBaseURL string) error
DoRequest(req *util.Request, respObj interface{}) error
} }
type Client struct { type Client struct {
@ -59,7 +60,7 @@ func (client *Client) GetHostInfo(ctx context.Context, agentBaseURL string) (*ho
Error string `json:"error"` Error string `json:"error"`
HostInfo *host.HostInfo `json:"result"` HostInfo *host.HostInfo `json:"result"`
}{} }{}
err := client.doRequest(req, &resBody) err := client.DoRequest(req, &resBody)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -76,7 +77,7 @@ func (client *Client) GetElasticProcess(ctx context.Context, agentBaseURL string
Context: ctx, Context: ctx,
} }
resBody := map[string]interface{}{} resBody := map[string]interface{}{}
err := client.doRequest(req, &resBody) err := client.DoRequest(req, &resBody)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -97,7 +98,7 @@ func (client *Client) GetElasticLogFiles(ctx context.Context, agentBaseURL strin
Body: reqBody, Body: reqBody,
} }
resBody := map[string]interface{}{} resBody := map[string]interface{}{}
err := client.doRequest(req, &resBody) err := client.DoRequest(req, &resBody)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -115,7 +116,7 @@ func (client *Client) GetElasticLogFileContent(ctx context.Context, agentBaseURL
Body: util.MustToJSONBytes(body), Body: util.MustToJSONBytes(body),
} }
resBody := map[string]interface{}{} resBody := map[string]interface{}{}
err := client.doRequest(req, &resBody) err := client.DoRequest(req, &resBody)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -139,7 +140,7 @@ func (client *Client) GetInstanceBasicInfo(ctx context.Context, agentBaseURL str
Context: ctx, Context: ctx,
} }
resBody := &agent.Instance{} resBody := &agent.Instance{}
err := client.doRequest(req, &resBody) err := client.DoRequest(req, &resBody)
return resBody, err return resBody, err
} }
@ -155,7 +156,7 @@ func (client *Client) RegisterElasticsearch(ctx context.Context, agentBaseURL st
Body: reqBody, Body: reqBody,
} }
resBody := util.MapStr{} resBody := util.MapStr{}
err = client.doRequest(req, &resBody) err = client.DoRequest(req, &resBody)
if err != nil { if err != nil {
return err return err
} }
@ -172,7 +173,7 @@ func (client *Client) GetElasticsearchNodes(ctx context.Context, agentBaseURL st
Context: ctx, Context: ctx,
} }
resBody := []agent.ESNodeInfo{} resBody := []agent.ESNodeInfo{}
err := client.doRequest(req, &resBody) err := client.DoRequest(req, &resBody)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -192,7 +193,7 @@ func (client *Client) AuthESNode(ctx context.Context, agentBaseURL string, cfg e
Body: reqBody, Body: reqBody,
} }
resBody := &agent.ESNodeInfo{} resBody := &agent.ESNodeInfo{}
err = client.doRequest(req, resBody) err = client.DoRequest(req, resBody)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -207,7 +208,7 @@ func (client *Client) CreatePipeline(ctx context.Context, agentBaseURL string, b
Context: ctx, Context: ctx,
} }
resBody := util.MapStr{} resBody := util.MapStr{}
return client.doRequest(req, &resBody) return client.DoRequest(req, &resBody)
} }
func (client *Client) DeletePipeline(ctx context.Context, agentBaseURL, pipelineID string) error{ func (client *Client) DeletePipeline(ctx context.Context, agentBaseURL, pipelineID string) error{
@ -216,7 +217,7 @@ func (client *Client) DeletePipeline(ctx context.Context, agentBaseURL, pipeline
Url: fmt.Sprintf("%s/pipeline/task/%s", agentBaseURL, pipelineID), Url: fmt.Sprintf("%s/pipeline/task/%s", agentBaseURL, pipelineID),
Context: ctx, Context: ctx,
} }
return client.doRequest(req, nil) return client.DoRequest(req, nil)
} }
func (client *Client) SetKeystoreValue(ctx context.Context, agentBaseURL string, key, value string) error{ func (client *Client) SetKeystoreValue(ctx context.Context, agentBaseURL string, key, value string) error{
@ -230,7 +231,7 @@ func (client *Client) SetKeystoreValue(ctx context.Context, agentBaseURL string,
Context: ctx, Context: ctx,
Body: util.MustToJSONBytes(body), Body: util.MustToJSONBytes(body),
} }
return client.doRequest(req, nil) return client.DoRequest(req, nil)
} }
func (client *Client) SaveDynamicConfig(ctx context.Context, agentBaseURL string, name, content string) error{ func (client *Client) SaveDynamicConfig(ctx context.Context, agentBaseURL string, name, content string) error{
@ -245,7 +246,7 @@ func (client *Client) SaveDynamicConfig(ctx context.Context, agentBaseURL string
Context: ctx, Context: ctx,
Body: util.MustToJSONBytes(body), Body: util.MustToJSONBytes(body),
} }
return client.doRequest(req, nil) return client.DoRequest(req, nil)
} }
func (client *Client) SaveIngestConfig(ctx context.Context, agentBaseURL string) error { func (client *Client) SaveIngestConfig(ctx context.Context, agentBaseURL string) error {
@ -267,7 +268,7 @@ func (client *Client) SaveIngestConfig(ctx context.Context, agentBaseURL string)
} }
func (client *Client) doRequest(req *util.Request, respObj interface{}) error { func (client *Client) DoRequest(req *util.Request, respObj interface{}) error {
return client.Executor.DoRequest(req, respObj) return client.Executor.DoRequest(req, respObj)
} }