diff --git a/plugin/api/gateway/instance.go b/plugin/api/gateway/instance.go index b12585ac..2869adee 100644 --- a/plugin/api/gateway/instance.go +++ b/plugin/api/gateway/instance.go @@ -5,12 +5,14 @@ package gateway import ( + "context" "fmt" log "github.com/cihub/seelog" "github.com/segmentio/encoding/json" "infini.sh/console/model/gateway" "infini.sh/framework/core/agent" httprouter "infini.sh/framework/core/api/router" + elastic2 "infini.sh/framework/core/elastic" "infini.sh/framework/core/orm" "infini.sh/framework/core/proxy" "infini.sh/framework/core/util" @@ -18,6 +20,7 @@ import ( "net/http" "strconv" "strings" + "time" ) func (h *GatewayAPI) createInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { @@ -351,75 +354,6 @@ func (h *GatewayAPI) tryConnect(w http.ResponseWriter, req *http.Request, ps htt } func (h *GatewayAPI) getExecutionNodes(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ - query := util.MapStr{ - "size": 1000, - "query": util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "enrolled": util.MapStr{ - "value": true, - }, - }, - }, - { - "term": util.MapStr{ - "status": util.MapStr{ - "value": "online", - }, - }, - }, - }, - }, - }, - } - q := orm.Query{ - RawQuery: util.MustToJSONBytes(query), - } - err, result := orm.Search(agent.Instance{}, &q) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - - var nodes []util.MapStr - //nodes from agent - for _, row := range result.Result { - if rowM, ok := row.(map[string]interface{}); ok { - nodes = append(nodes, util.MapStr{ - "id": rowM["id"], - "name": rowM["name"], - "type": "agent", - }) - } - } - - q = orm.Query{ - Size: 1000, - } - err, result = orm.Search(gateway.Instance{}, &q) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - - //nodes from gateway - for _, row := range result.Result { - if rowM, ok := row.(map[string]interface{}); ok { - nodes = append(nodes, util.MapStr{ - "id": rowM["id"], - "name": rowM["name"], - "type": "gateway", - }) - } - } - h.WriteJSON(w, nodes, http.StatusOK) -} - -func (h *GatewayAPI) getExecutionNodesNew(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ var ( keyword = h.GetParameterOrDefault(req, "keyword", "") strSize = h.GetParameterOrDefault(req, "size", "10") @@ -433,8 +367,63 @@ func (h *GatewayAPI) getExecutionNodesNew(w http.ResponseWriter, req *http.Reque if from < 0 { from = 0 } + agentIndexName := orm.GetIndexName(agent.Instance{}) + gatewayIndexName := orm.GetIndexName(gateway.Instance{}) + agentMust := []util.MapStr{ + { + "term": util.MapStr{ + "enrolled": util.MapStr{ + "value": true, + }, + }, + }, + { + "term": util.MapStr{ + "status": util.MapStr{ + "value": "online", + }, + }, + }, + { + "term": util.MapStr{ + "_index": util.MapStr{ + "value": agentIndexName, + }, + }, + }, + } + + boolQ := util.MapStr{ + "minimum_should_match": 1, + "should": []util.MapStr{ + { + "bool": util.MapStr{ + "must": agentMust, + }, + }, + { + "term": util.MapStr{ + "_index": util.MapStr{ + "value": gatewayIndexName, + }, + }, + }, + }, + } + if keyword != "" { + boolQ["must"] = []util.MapStr{ + { + "query_string": util.MapStr{ + "default_field":"*", + "query": keyword, + }, + }, + } + } + query := util.MapStr{ "size": size, + "from": from, "sort": []util.MapStr{ { "created": util.MapStr{ @@ -443,28 +432,78 @@ func (h *GatewayAPI) getExecutionNodesNew(w http.ResponseWriter, req *http.Reque }, }, "query": util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "enrolled": util.MapStr{ - "value": true, - }, - }, - }, - { - "term": util.MapStr{ - "status": util.MapStr{ - "value": "online", - }, - }, - }, - }, - }, + "bool": boolQ, }, } q := orm.Query{ + IndexName: fmt.Sprintf("%s,%s", gatewayIndexName, agentIndexName), RawQuery: util.MustToJSONBytes(query), } - _ = q + err, result := orm.Search(nil, &q) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + searchRes := elastic2.SearchResponse{} + err = util.FromJSONBytes(result.Raw, &searchRes) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + var nodes []util.MapStr + for _, hit := range searchRes.Hits.Hits { + var ( + endpoint string + ok bool + ) + node := util.MapStr{ + "id": hit.Source["id"], + "name": hit.Source["name"], + "available": false, + } + hasErr := false + if hit.Index == gatewayIndexName { + node["type"] = "gateway" + if endpoint, ok = hit.Source["endpoint"].(string); !ok { + log.Warnf("got unexpect endpoint type of gateway instance [%s]: %s", hit.ID, hit.Source["endpoint"]) + hasErr = true + } + }else if hit.Index == agentIndexName { + node["type"] = "agent" + endpoint = fmt.Sprintf("%s://%s:%v", hit.Source["schema"], hit.Source["remote_ip"], hit.Source["port"]) + } + if !hasErr { + available, err := isNodeAvailable(endpoint) + if err != nil { + log.Error(err) + } + node["available"] = available + } + nodes = append(nodes, node) + } + h.WriteJSON(w, nodes, http.StatusOK) +} + +func isNodeAvailable(endpoint string) (bool, error){ + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + rq := &util.Request{ + Method: http.MethodGet, + Url: fmt.Sprintf("%s%s", endpoint, "/pipeline/tasks/_dynamic"), + Context: ctx, + } + resp, err := util.ExecuteRequest(rq) + if err != nil { + return false, err + } + resBody := struct { + Success bool `json:"success"` + }{} + err = util.FromJSONBytes(resp.Body, &resBody) + if err != nil { + return false, err + } + return resBody.Success, nil } \ No newline at end of file