Merge pull request 'realtime access agent status and node processes' (#126) from agent_realtime into master
This commit is contained in:
commit
f0bfe79310
|
@ -21,7 +21,6 @@ import (
|
||||||
"infini.sh/framework/core/api"
|
"infini.sh/framework/core/api"
|
||||||
httprouter "infini.sh/framework/core/api/router"
|
httprouter "infini.sh/framework/core/api/router"
|
||||||
"infini.sh/framework/core/elastic"
|
"infini.sh/framework/core/elastic"
|
||||||
"infini.sh/framework/core/event"
|
|
||||||
"infini.sh/framework/core/orm"
|
"infini.sh/framework/core/orm"
|
||||||
"infini.sh/framework/core/util"
|
"infini.sh/framework/core/util"
|
||||||
elastic2 "infini.sh/framework/modules/elastic"
|
elastic2 "infini.sh/framework/modules/elastic"
|
||||||
|
@ -110,10 +109,6 @@ func (h *APIHandler) createInstance(w http.ResponseWriter, req *http.Request, ps
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
_, err = refreshNodesInfo(obj)
|
|
||||||
if err != nil {
|
|
||||||
log.Error(err)
|
|
||||||
}
|
|
||||||
err = client.GetClient().SaveIngestConfig(context.Background(), obj.GetEndpoint())
|
err = client.GetClient().SaveIngestConfig(context.Background(), obj.GetEndpoint())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
|
@ -221,50 +216,17 @@ func (h *APIHandler) getInstanceStats(w http.ResponseWriter, req *http.Request,
|
||||||
h.WriteJSON(w, util.MapStr{}, http.StatusOK)
|
h.WriteJSON(w, util.MapStr{}, http.StatusOK)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
q := orm.Query{
|
q := orm.Query{}
|
||||||
WildcardIndex: true,
|
|
||||||
}
|
|
||||||
queryDSL := util.MapStr{
|
queryDSL := util.MapStr{
|
||||||
"sort": []util.MapStr{
|
|
||||||
{
|
|
||||||
"timestamp": util.MapStr{
|
|
||||||
"order": "desc",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
"collapse": util.MapStr{
|
|
||||||
"field": "agent.id",
|
|
||||||
},
|
|
||||||
"query": util.MapStr{
|
"query": util.MapStr{
|
||||||
"bool": util.MapStr{
|
|
||||||
"filter": []util.MapStr{
|
|
||||||
{
|
|
||||||
"range": util.MapStr{
|
|
||||||
"timestamp": util.MapStr{
|
|
||||||
"gte": "now-1m",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
"must": []util.MapStr{
|
|
||||||
{
|
|
||||||
"term": util.MapStr{
|
|
||||||
"metadata.name": util.MapStr{
|
|
||||||
"value": "agent",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}, {
|
|
||||||
"terms": util.MapStr{
|
"terms": util.MapStr{
|
||||||
"agent.id": instanceIDs,
|
"_id": instanceIDs,
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
q.RawQuery = util.MustToJSONBytes(queryDSL)
|
q.RawQuery = util.MustToJSONBytes(queryDSL)
|
||||||
|
|
||||||
err, res := orm.Search(event.Event{}, &q)
|
err, res := orm.Search(&agent.Instance{}, &q)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
@ -272,27 +234,32 @@ func (h *APIHandler) getInstanceStats(w http.ResponseWriter, req *http.Request,
|
||||||
}
|
}
|
||||||
result := util.MapStr{}
|
result := util.MapStr{}
|
||||||
for _, item := range res.Result {
|
for _, item := range res.Result {
|
||||||
if itemV, ok := item.(map[string]interface{}); ok {
|
instBytes, err := util.ToJSONBytes(item)
|
||||||
if agentID, ok := util.GetMapValueByKeys([]string{"agent", "id"}, itemV); ok {
|
if err != nil {
|
||||||
if v, ok := agentID.(string); ok {
|
log.Error(err)
|
||||||
if ab, ok := util.GetMapValueByKeys([]string{"payload", "instance", "system"}, itemV); ok {
|
continue
|
||||||
if abV, ok := ab.(map[string]interface{}); ok {
|
|
||||||
result[v] = util.MapStr{
|
|
||||||
"timestamp": itemV["timestamp"],
|
|
||||||
"system": util.MapStr{
|
|
||||||
"cpu": abV["cpu"],
|
|
||||||
"mem": abV["mem"],
|
|
||||||
"uptime_in_ms": abV["uptime_in_ms"],
|
|
||||||
"status": "online",
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
instance := agent.Instance{}
|
||||||
|
err = util.FromJSONBytes(instBytes, &instance)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
agReq := &util.Request{
|
||||||
|
Method: http.MethodGet,
|
||||||
|
Url: fmt.Sprintf("%s/stats", instance.GetEndpoint()),
|
||||||
|
|
||||||
}
|
}
|
||||||
|
var resMap = util.MapStr{}
|
||||||
|
err = client.GetClient().DoRequest(agReq, &resMap)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
result[instance.ID] = util.MapStr{}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
result[instance.ID] = resMap
|
||||||
|
}
|
||||||
h.WriteJSON(w, result, http.StatusOK)
|
h.WriteJSON(w, result, http.StatusOK)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -409,16 +376,12 @@ func (h *APIHandler) getESNodesInfo(w http.ResponseWriter, req *http.Request, ps
|
||||||
}, http.StatusNotFound)
|
}, http.StatusNotFound)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
nodesM, err := getNodesInfoFromES(obj.ID)
|
nodes, err := refreshNodesInfo(&obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
var nodes []*agent.ESNodeInfo
|
|
||||||
for _, node := range nodesM {
|
|
||||||
nodes = append(nodes, node)
|
|
||||||
}
|
|
||||||
h.WriteJSON(w, nodes, http.StatusOK)
|
h.WriteJSON(w, nodes, http.StatusOK)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -656,14 +619,21 @@ func (h *APIHandler) tryConnect(w http.ResponseWriter, req *http.Request, ps htt
|
||||||
}
|
}
|
||||||
|
|
||||||
func refreshNodesInfo(inst *agent.Instance) ([]agent.ESNodeInfo, error) {
|
func refreshNodesInfo(inst *agent.Instance) ([]agent.ESNodeInfo, error) {
|
||||||
nodesInfo, err := client.GetClient().GetElasticsearchNodes(context.Background(), inst.GetEndpoint())
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("get elasticsearch nodes error: %w", err)
|
|
||||||
}
|
|
||||||
oldNodesInfo, err := getNodesInfoFromES(inst.ID)
|
oldNodesInfo, err := getNodesInfoFromES(inst.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("get elasticsearch nodes info from es error: %w", err)
|
return nil, fmt.Errorf("get elasticsearch nodes info from es error: %w", err)
|
||||||
}
|
}
|
||||||
|
nodesInfo, err := client.GetClient().GetElasticsearchNodes(context.Background(), inst.GetEndpoint())
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("get elasticsearch nodes error: %v", err)
|
||||||
|
//return nodes info from es after failed to get nodes info from agent
|
||||||
|
var nodes = []agent.ESNodeInfo{}
|
||||||
|
for _, nodeInfo := range oldNodesInfo {
|
||||||
|
nodes = append(nodes, *nodeInfo)
|
||||||
|
}
|
||||||
|
return nodes, nil
|
||||||
|
}
|
||||||
|
|
||||||
oldPids := map[int]struct{}{}
|
oldPids := map[int]struct{}{}
|
||||||
var resultNodes []agent.ESNodeInfo
|
var resultNodes []agent.ESNodeInfo
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -41,6 +41,7 @@ type ClientAPI interface {
|
||||||
SetKeystoreValue(ctx context.Context, agentBaseURL string, key, value string) error
|
SetKeystoreValue(ctx context.Context, agentBaseURL string, key, value string) error
|
||||||
SaveDynamicConfig(ctx context.Context, agentBaseURL string, name, content string) error
|
SaveDynamicConfig(ctx context.Context, agentBaseURL string, name, content string) error
|
||||||
SaveIngestConfig(ctx context.Context, agentBaseURL string) error
|
SaveIngestConfig(ctx context.Context, agentBaseURL string) error
|
||||||
|
DoRequest(req *util.Request, respObj interface{}) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type Client struct {
|
type Client struct {
|
||||||
|
@ -59,7 +60,7 @@ func (client *Client) GetHostInfo(ctx context.Context, agentBaseURL string) (*ho
|
||||||
Error string `json:"error"`
|
Error string `json:"error"`
|
||||||
HostInfo *host.HostInfo `json:"result"`
|
HostInfo *host.HostInfo `json:"result"`
|
||||||
}{}
|
}{}
|
||||||
err := client.doRequest(req, &resBody)
|
err := client.DoRequest(req, &resBody)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -76,7 +77,7 @@ func (client *Client) GetElasticProcess(ctx context.Context, agentBaseURL string
|
||||||
Context: ctx,
|
Context: ctx,
|
||||||
}
|
}
|
||||||
resBody := map[string]interface{}{}
|
resBody := map[string]interface{}{}
|
||||||
err := client.doRequest(req, &resBody)
|
err := client.DoRequest(req, &resBody)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -97,7 +98,7 @@ func (client *Client) GetElasticLogFiles(ctx context.Context, agentBaseURL strin
|
||||||
Body: reqBody,
|
Body: reqBody,
|
||||||
}
|
}
|
||||||
resBody := map[string]interface{}{}
|
resBody := map[string]interface{}{}
|
||||||
err := client.doRequest(req, &resBody)
|
err := client.DoRequest(req, &resBody)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -115,7 +116,7 @@ func (client *Client) GetElasticLogFileContent(ctx context.Context, agentBaseURL
|
||||||
Body: util.MustToJSONBytes(body),
|
Body: util.MustToJSONBytes(body),
|
||||||
}
|
}
|
||||||
resBody := map[string]interface{}{}
|
resBody := map[string]interface{}{}
|
||||||
err := client.doRequest(req, &resBody)
|
err := client.DoRequest(req, &resBody)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -139,7 +140,7 @@ func (client *Client) GetInstanceBasicInfo(ctx context.Context, agentBaseURL str
|
||||||
Context: ctx,
|
Context: ctx,
|
||||||
}
|
}
|
||||||
resBody := &agent.Instance{}
|
resBody := &agent.Instance{}
|
||||||
err := client.doRequest(req, &resBody)
|
err := client.DoRequest(req, &resBody)
|
||||||
return resBody, err
|
return resBody, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -155,7 +156,7 @@ func (client *Client) RegisterElasticsearch(ctx context.Context, agentBaseURL st
|
||||||
Body: reqBody,
|
Body: reqBody,
|
||||||
}
|
}
|
||||||
resBody := util.MapStr{}
|
resBody := util.MapStr{}
|
||||||
err = client.doRequest(req, &resBody)
|
err = client.DoRequest(req, &resBody)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -172,7 +173,7 @@ func (client *Client) GetElasticsearchNodes(ctx context.Context, agentBaseURL st
|
||||||
Context: ctx,
|
Context: ctx,
|
||||||
}
|
}
|
||||||
resBody := []agent.ESNodeInfo{}
|
resBody := []agent.ESNodeInfo{}
|
||||||
err := client.doRequest(req, &resBody)
|
err := client.DoRequest(req, &resBody)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -192,7 +193,7 @@ func (client *Client) AuthESNode(ctx context.Context, agentBaseURL string, cfg e
|
||||||
Body: reqBody,
|
Body: reqBody,
|
||||||
}
|
}
|
||||||
resBody := &agent.ESNodeInfo{}
|
resBody := &agent.ESNodeInfo{}
|
||||||
err = client.doRequest(req, resBody)
|
err = client.DoRequest(req, resBody)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -207,7 +208,7 @@ func (client *Client) CreatePipeline(ctx context.Context, agentBaseURL string, b
|
||||||
Context: ctx,
|
Context: ctx,
|
||||||
}
|
}
|
||||||
resBody := util.MapStr{}
|
resBody := util.MapStr{}
|
||||||
return client.doRequest(req, &resBody)
|
return client.DoRequest(req, &resBody)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (client *Client) DeletePipeline(ctx context.Context, agentBaseURL, pipelineID string) error{
|
func (client *Client) DeletePipeline(ctx context.Context, agentBaseURL, pipelineID string) error{
|
||||||
|
@ -216,7 +217,7 @@ func (client *Client) DeletePipeline(ctx context.Context, agentBaseURL, pipeline
|
||||||
Url: fmt.Sprintf("%s/pipeline/task/%s", agentBaseURL, pipelineID),
|
Url: fmt.Sprintf("%s/pipeline/task/%s", agentBaseURL, pipelineID),
|
||||||
Context: ctx,
|
Context: ctx,
|
||||||
}
|
}
|
||||||
return client.doRequest(req, nil)
|
return client.DoRequest(req, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (client *Client) SetKeystoreValue(ctx context.Context, agentBaseURL string, key, value string) error{
|
func (client *Client) SetKeystoreValue(ctx context.Context, agentBaseURL string, key, value string) error{
|
||||||
|
@ -230,7 +231,7 @@ func (client *Client) SetKeystoreValue(ctx context.Context, agentBaseURL string,
|
||||||
Context: ctx,
|
Context: ctx,
|
||||||
Body: util.MustToJSONBytes(body),
|
Body: util.MustToJSONBytes(body),
|
||||||
}
|
}
|
||||||
return client.doRequest(req, nil)
|
return client.DoRequest(req, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (client *Client) SaveDynamicConfig(ctx context.Context, agentBaseURL string, name, content string) error{
|
func (client *Client) SaveDynamicConfig(ctx context.Context, agentBaseURL string, name, content string) error{
|
||||||
|
@ -245,7 +246,7 @@ func (client *Client) SaveDynamicConfig(ctx context.Context, agentBaseURL string
|
||||||
Context: ctx,
|
Context: ctx,
|
||||||
Body: util.MustToJSONBytes(body),
|
Body: util.MustToJSONBytes(body),
|
||||||
}
|
}
|
||||||
return client.doRequest(req, nil)
|
return client.DoRequest(req, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (client *Client) SaveIngestConfig(ctx context.Context, agentBaseURL string) error {
|
func (client *Client) SaveIngestConfig(ctx context.Context, agentBaseURL string) error {
|
||||||
|
@ -267,7 +268,7 @@ func (client *Client) SaveIngestConfig(ctx context.Context, agentBaseURL string)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
func (client *Client) doRequest(req *util.Request, respObj interface{}) error {
|
func (client *Client) DoRequest(req *util.Request, respObj interface{}) error {
|
||||||
return client.Executor.DoRequest(req, respObj)
|
return client.Executor.DoRequest(req, respObj)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue