diff --git a/modules/agent/api/instance.go b/modules/agent/api/instance.go index 9514c8e0..50aba75f 100644 --- a/modules/agent/api/instance.go +++ b/modules/agent/api/instance.go @@ -21,7 +21,6 @@ import ( "infini.sh/framework/core/api" httprouter "infini.sh/framework/core/api/router" "infini.sh/framework/core/elastic" - "infini.sh/framework/core/event" "infini.sh/framework/core/orm" "infini.sh/framework/core/util" elastic2 "infini.sh/framework/modules/elastic" @@ -110,10 +109,6 @@ func (h *APIHandler) createInstance(w http.ResponseWriter, req *http.Request, ps log.Error(err) return } - _, err = refreshNodesInfo(obj) - if err != nil { - log.Error(err) - } err = client.GetClient().SaveIngestConfig(context.Background(), obj.GetEndpoint()) if err != nil { log.Error(err) @@ -221,50 +216,17 @@ func (h *APIHandler) getInstanceStats(w http.ResponseWriter, req *http.Request, h.WriteJSON(w, util.MapStr{}, http.StatusOK) return } - q := orm.Query{ - WildcardIndex: true, - } + q := orm.Query{} queryDSL := util.MapStr{ - "sort": []util.MapStr{ - { - "timestamp": util.MapStr{ - "order": "desc", - }, - }, - }, - "collapse": util.MapStr{ - "field": "agent.id", - }, "query": util.MapStr{ - "bool": util.MapStr{ - "filter": []util.MapStr{ - { - "range": util.MapStr{ - "timestamp": util.MapStr{ - "gte": "now-1m", - }, - }, - }, - }, - "must": []util.MapStr{ - { - "term": util.MapStr{ - "metadata.name": util.MapStr{ - "value": "agent", - }, - }, - }, { - "terms": util.MapStr{ - "agent.id": instanceIDs, - }, - }, - }, + "terms": util.MapStr{ + "_id": instanceIDs, }, }, } q.RawQuery = util.MustToJSONBytes(queryDSL) - err, res := orm.Search(event.Event{}, &q) + err, res := orm.Search(&agent.Instance{}, &q) if err != nil { log.Error(err) h.WriteError(w, err.Error(), http.StatusInternalServerError) @@ -272,26 +234,31 @@ func (h *APIHandler) getInstanceStats(w http.ResponseWriter, req *http.Request, } result := util.MapStr{} for _, item := range res.Result { - if itemV, ok := item.(map[string]interface{}); ok { - if agentID, ok := util.GetMapValueByKeys([]string{"agent", "id"}, itemV); ok { - if v, ok := agentID.(string); ok { - if ab, ok := util.GetMapValueByKeys([]string{"payload", "instance", "system"}, itemV); ok { - if abV, ok := ab.(map[string]interface{}); ok { - result[v] = util.MapStr{ - "timestamp": itemV["timestamp"], - "system": util.MapStr{ - "cpu": abV["cpu"], - "mem": abV["mem"], - "uptime_in_ms": abV["uptime_in_ms"], - "status": "online", - }, - } - } - } - } - } + instBytes, err := util.ToJSONBytes(item) + if err != nil { + log.Error(err) + continue } + instance := agent.Instance{} + err = util.FromJSONBytes(instBytes, &instance) + if err != nil { + log.Error(err) + continue + } + agReq := &util.Request{ + Method: http.MethodGet, + Url: fmt.Sprintf("%s/stats", instance.GetEndpoint()), + } + var resMap = util.MapStr{} + err = client.GetClient().DoRequest(agReq, &resMap) + + if err != nil { + log.Error(err) + result[instance.ID] = util.MapStr{} + continue + } + result[instance.ID] = resMap } h.WriteJSON(w, result, http.StatusOK) } @@ -409,16 +376,12 @@ func (h *APIHandler) getESNodesInfo(w http.ResponseWriter, req *http.Request, ps }, http.StatusNotFound) return } - nodesM, err := getNodesInfoFromES(obj.ID) + nodes, err := refreshNodesInfo(&obj) if err != nil { log.Error(err) h.WriteError(w, err.Error(), http.StatusInternalServerError) return } - var nodes []*agent.ESNodeInfo - for _, node := range nodesM { - nodes = append(nodes, node) - } h.WriteJSON(w, nodes, http.StatusOK) } @@ -656,14 +619,21 @@ func (h *APIHandler) tryConnect(w http.ResponseWriter, req *http.Request, ps htt } func refreshNodesInfo(inst *agent.Instance) ([]agent.ESNodeInfo, error) { - nodesInfo, err := client.GetClient().GetElasticsearchNodes(context.Background(), inst.GetEndpoint()) - if err != nil { - return nil, fmt.Errorf("get elasticsearch nodes error: %w", err) - } oldNodesInfo, err := getNodesInfoFromES(inst.ID) if err != nil { return nil, fmt.Errorf("get elasticsearch nodes info from es error: %w", err) } + nodesInfo, err := client.GetClient().GetElasticsearchNodes(context.Background(), inst.GetEndpoint()) + if err != nil { + log.Errorf("get elasticsearch nodes error: %v", err) + //return nodes info from es after failed to get nodes info from agent + var nodes = []agent.ESNodeInfo{} + for _, nodeInfo := range oldNodesInfo { + nodes = append(nodes, *nodeInfo) + } + return nodes, nil + } + oldPids := map[int]struct{}{} var resultNodes []agent.ESNodeInfo if err != nil { diff --git a/modules/agent/client/client.go b/modules/agent/client/client.go index 46f80e42..586dc67f 100644 --- a/modules/agent/client/client.go +++ b/modules/agent/client/client.go @@ -41,6 +41,7 @@ type ClientAPI interface { SetKeystoreValue(ctx context.Context, agentBaseURL string, key, value string) error SaveDynamicConfig(ctx context.Context, agentBaseURL string, name, content string) error SaveIngestConfig(ctx context.Context, agentBaseURL string) error + DoRequest(req *util.Request, respObj interface{}) error } type Client struct { @@ -59,7 +60,7 @@ func (client *Client) GetHostInfo(ctx context.Context, agentBaseURL string) (*ho Error string `json:"error"` HostInfo *host.HostInfo `json:"result"` }{} - err := client.doRequest(req, &resBody) + err := client.DoRequest(req, &resBody) if err != nil { return nil, err } @@ -76,7 +77,7 @@ func (client *Client) GetElasticProcess(ctx context.Context, agentBaseURL string Context: ctx, } resBody := map[string]interface{}{} - err := client.doRequest(req, &resBody) + err := client.DoRequest(req, &resBody) if err != nil { return nil, err } @@ -97,7 +98,7 @@ func (client *Client) GetElasticLogFiles(ctx context.Context, agentBaseURL strin Body: reqBody, } resBody := map[string]interface{}{} - err := client.doRequest(req, &resBody) + err := client.DoRequest(req, &resBody) if err != nil { return nil, err } @@ -115,7 +116,7 @@ func (client *Client) GetElasticLogFileContent(ctx context.Context, agentBaseURL Body: util.MustToJSONBytes(body), } resBody := map[string]interface{}{} - err := client.doRequest(req, &resBody) + err := client.DoRequest(req, &resBody) if err != nil { return nil, err } @@ -139,7 +140,7 @@ func (client *Client) GetInstanceBasicInfo(ctx context.Context, agentBaseURL str Context: ctx, } resBody := &agent.Instance{} - err := client.doRequest(req, &resBody) + err := client.DoRequest(req, &resBody) return resBody, err } @@ -155,7 +156,7 @@ func (client *Client) RegisterElasticsearch(ctx context.Context, agentBaseURL st Body: reqBody, } resBody := util.MapStr{} - err = client.doRequest(req, &resBody) + err = client.DoRequest(req, &resBody) if err != nil { return err } @@ -172,7 +173,7 @@ func (client *Client) GetElasticsearchNodes(ctx context.Context, agentBaseURL st Context: ctx, } resBody := []agent.ESNodeInfo{} - err := client.doRequest(req, &resBody) + err := client.DoRequest(req, &resBody) if err != nil { return nil, err } @@ -192,7 +193,7 @@ func (client *Client) AuthESNode(ctx context.Context, agentBaseURL string, cfg e Body: reqBody, } resBody := &agent.ESNodeInfo{} - err = client.doRequest(req, resBody) + err = client.DoRequest(req, resBody) if err != nil { return nil, err } @@ -207,7 +208,7 @@ func (client *Client) CreatePipeline(ctx context.Context, agentBaseURL string, b Context: ctx, } resBody := util.MapStr{} - return client.doRequest(req, &resBody) + return client.DoRequest(req, &resBody) } func (client *Client) DeletePipeline(ctx context.Context, agentBaseURL, pipelineID string) error{ @@ -216,7 +217,7 @@ func (client *Client) DeletePipeline(ctx context.Context, agentBaseURL, pipeline Url: fmt.Sprintf("%s/pipeline/task/%s", agentBaseURL, pipelineID), Context: ctx, } - return client.doRequest(req, nil) + return client.DoRequest(req, nil) } func (client *Client) SetKeystoreValue(ctx context.Context, agentBaseURL string, key, value string) error{ @@ -230,7 +231,7 @@ func (client *Client) SetKeystoreValue(ctx context.Context, agentBaseURL string, Context: ctx, Body: util.MustToJSONBytes(body), } - return client.doRequest(req, nil) + return client.DoRequest(req, nil) } func (client *Client) SaveDynamicConfig(ctx context.Context, agentBaseURL string, name, content string) error{ @@ -245,7 +246,7 @@ func (client *Client) SaveDynamicConfig(ctx context.Context, agentBaseURL string Context: ctx, Body: util.MustToJSONBytes(body), } - return client.doRequest(req, nil) + return client.DoRequest(req, nil) } func (client *Client) SaveIngestConfig(ctx context.Context, agentBaseURL string) error { @@ -267,7 +268,7 @@ func (client *Client) SaveIngestConfig(ctx context.Context, agentBaseURL string) } -func (client *Client) doRequest(req *util.Request, respObj interface{}) error { +func (client *Client) DoRequest(req *util.Request, respObj interface{}) error { return client.Executor.DoRequest(req, respObj) }