update getExecutionNodes api

This commit is contained in:
liugq 2022-10-25 17:20:28 +08:00
commit 0922db5024
1 changed files with 127 additions and 88 deletions

View File

@ -5,12 +5,14 @@
package gateway package gateway
import ( import (
"context"
"fmt" "fmt"
log "github.com/cihub/seelog" log "github.com/cihub/seelog"
"github.com/segmentio/encoding/json" "github.com/segmentio/encoding/json"
"infini.sh/console/model/gateway" "infini.sh/console/model/gateway"
"infini.sh/framework/core/agent" "infini.sh/framework/core/agent"
httprouter "infini.sh/framework/core/api/router" httprouter "infini.sh/framework/core/api/router"
elastic2 "infini.sh/framework/core/elastic"
"infini.sh/framework/core/orm" "infini.sh/framework/core/orm"
"infini.sh/framework/core/proxy" "infini.sh/framework/core/proxy"
"infini.sh/framework/core/util" "infini.sh/framework/core/util"
@ -18,6 +20,7 @@ import (
"net/http" "net/http"
"strconv" "strconv"
"strings" "strings"
"time"
) )
func (h *GatewayAPI) createInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { func (h *GatewayAPI) createInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
@ -351,75 +354,6 @@ func (h *GatewayAPI) tryConnect(w http.ResponseWriter, req *http.Request, ps htt
} }
func (h *GatewayAPI) getExecutionNodes(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ func (h *GatewayAPI) getExecutionNodes(w http.ResponseWriter, req *http.Request, ps httprouter.Params){
query := util.MapStr{
"size": 1000,
"query": util.MapStr{
"bool": util.MapStr{
"must": []util.MapStr{
{
"term": util.MapStr{
"enrolled": util.MapStr{
"value": true,
},
},
},
{
"term": util.MapStr{
"status": util.MapStr{
"value": "online",
},
},
},
},
},
},
}
q := orm.Query{
RawQuery: util.MustToJSONBytes(query),
}
err, result := orm.Search(agent.Instance{}, &q)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
var nodes []util.MapStr
//nodes from agent
for _, row := range result.Result {
if rowM, ok := row.(map[string]interface{}); ok {
nodes = append(nodes, util.MapStr{
"id": rowM["id"],
"name": rowM["name"],
"type": "agent",
})
}
}
q = orm.Query{
Size: 1000,
}
err, result = orm.Search(gateway.Instance{}, &q)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
//nodes from gateway
for _, row := range result.Result {
if rowM, ok := row.(map[string]interface{}); ok {
nodes = append(nodes, util.MapStr{
"id": rowM["id"],
"name": rowM["name"],
"type": "gateway",
})
}
}
h.WriteJSON(w, nodes, http.StatusOK)
}
func (h *GatewayAPI) getExecutionNodesNew(w http.ResponseWriter, req *http.Request, ps httprouter.Params){
var ( var (
keyword = h.GetParameterOrDefault(req, "keyword", "") keyword = h.GetParameterOrDefault(req, "keyword", "")
strSize = h.GetParameterOrDefault(req, "size", "10") strSize = h.GetParameterOrDefault(req, "size", "10")
@ -433,18 +367,9 @@ func (h *GatewayAPI) getExecutionNodesNew(w http.ResponseWriter, req *http.Reque
if from < 0 { if from < 0 {
from = 0 from = 0
} }
query := util.MapStr{ agentIndexName := orm.GetIndexName(agent.Instance{})
"size": size, gatewayIndexName := orm.GetIndexName(gateway.Instance{})
"sort": []util.MapStr{ agentMust := []util.MapStr{
{
"created": util.MapStr{
"order": "desc",
},
},
},
"query": util.MapStr{
"bool": util.MapStr{
"must": []util.MapStr{
{ {
"term": util.MapStr{ "term": util.MapStr{
"enrolled": util.MapStr{ "enrolled": util.MapStr{
@ -459,12 +384,126 @@ func (h *GatewayAPI) getExecutionNodesNew(w http.ResponseWriter, req *http.Reque
}, },
}, },
}, },
{
"term": util.MapStr{
"_index": util.MapStr{
"value": agentIndexName,
}, },
}, },
}, },
} }
boolQ := util.MapStr{
"minimum_should_match": 1,
"should": []util.MapStr{
{
"bool": util.MapStr{
"must": agentMust,
},
},
{
"term": util.MapStr{
"_index": util.MapStr{
"value": gatewayIndexName,
},
},
},
},
}
if keyword != "" {
boolQ["must"] = []util.MapStr{
{
"query_string": util.MapStr{
"default_field":"*",
"query": keyword,
},
},
}
}
query := util.MapStr{
"size": size,
"from": from,
"sort": []util.MapStr{
{
"created": util.MapStr{
"order": "desc",
},
},
},
"query": util.MapStr{
"bool": boolQ,
},
}
q := orm.Query{ q := orm.Query{
IndexName: fmt.Sprintf("%s,%s", gatewayIndexName, agentIndexName),
RawQuery: util.MustToJSONBytes(query), RawQuery: util.MustToJSONBytes(query),
} }
_ = q err, result := orm.Search(nil, &q)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
searchRes := elastic2.SearchResponse{}
err = util.FromJSONBytes(result.Raw, &searchRes)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
var nodes []util.MapStr
for _, hit := range searchRes.Hits.Hits {
var (
endpoint string
ok bool
)
node := util.MapStr{
"id": hit.Source["id"],
"name": hit.Source["name"],
"available": false,
}
hasErr := false
if hit.Index == gatewayIndexName {
node["type"] = "gateway"
if endpoint, ok = hit.Source["endpoint"].(string); !ok {
log.Warnf("got unexpect endpoint type of gateway instance [%s]: %s", hit.ID, hit.Source["endpoint"])
hasErr = true
}
}else if hit.Index == agentIndexName {
node["type"] = "agent"
endpoint = fmt.Sprintf("%s://%s:%v", hit.Source["schema"], hit.Source["remote_ip"], hit.Source["port"])
}
if !hasErr {
available, err := isNodeAvailable(endpoint)
if err != nil {
log.Error(err)
}
node["available"] = available
}
nodes = append(nodes, node)
}
h.WriteJSON(w, nodes, http.StatusOK)
}
func isNodeAvailable(endpoint string) (bool, error){
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
rq := &util.Request{
Method: http.MethodGet,
Url: fmt.Sprintf("%s%s", endpoint, "/pipeline/tasks/_dynamic"),
Context: ctx,
}
resp, err := util.ExecuteRequest(rq)
if err != nil {
return false, err
}
resBody := struct {
Success bool `json:"success"`
}{}
err = util.FromJSONBytes(resp.Body, &resBody)
if err != nil {
return false, err
}
return resBody.Success, nil
} }