diff --git a/config/system_config.tpl b/config/system_config.tpl index 0f8b93f8..694ccdbc 100644 --- a/config/system_config.tpl +++ b/config/system_config.tpl @@ -75,3 +75,11 @@ pipeline: group: activity when: cluster_available: ["$[[CLUSTER_ID]]"] + - name: cluster_migration_split + auto_start: true + keep_running: true + processor: + - cluster_migration: + elasticsearch: "$[[CLUSTER_ID]]" + when: + cluster_available: ["$[[CLUSTER_ID]]"] \ No newline at end of file diff --git a/main.go b/main.go index d70418fe..d5b0fc09 100644 --- a/main.go +++ b/main.go @@ -23,6 +23,7 @@ import ( _ "infini.sh/framework/modules/api" elastic2 "infini.sh/framework/modules/elastic" "infini.sh/framework/modules/metrics" + "infini.sh/framework/modules/migration" "infini.sh/framework/modules/pipeline" queue2 "infini.sh/framework/modules/queue/disk_queue" "infini.sh/framework/modules/redis" @@ -70,6 +71,7 @@ func main() { modules=append(modules,&agent.AgentModule{}) modules=append(modules,&metrics.MetricsModule{}) modules=append(modules,&security.Module{}) + modules=append(modules,&migration.MigrationModule{}) uiModule:=&ui.UIModule{} @@ -89,6 +91,7 @@ func main() { module.RegisterSystemModule(&agent.AgentModule{}) module.RegisterSystemModule(&metrics.MetricsModule{}) module.RegisterSystemModule(&security.Module{}) + module.RegisterSystemModule(&migration.MigrationModule{}) }else{ for _, v := range modules { v.Setup() @@ -137,6 +140,8 @@ func main() { orm.RegisterSchemaWithIndexName(alerting.Channel{}, "channel") orm.RegisterSchemaWithIndexName(insight.Visualization{}, "visualization") orm.RegisterSchemaWithIndexName(insight.Dashboard{}, "dashboard") + orm.RegisterSchemaWithIndexName(task1.Task{}, "task") + orm.RegisterSchemaWithIndexName(task1.Log{}, "task-log") api.RegisterSchema() if global.Env().SetupRequired() { diff --git a/model/gateway/instance.go b/model/gateway/instance.go index 73977adb..59c32005 100644 --- a/model/gateway/instance.go +++ b/model/gateway/instance.go @@ -13,7 +13,7 @@ import ( type Instance struct { orm.ORMObjectBase - InstanceID string `json:"instance_id,omitempty" elastic_mapping:"instance_id: { type: keyword }"` + //InstanceID string `json:"instance_id,omitempty" elastic_mapping:"instance_id: { type: keyword }"` Name string `json:"name,omitempty" elastic_mapping:"name:{type:keyword,fields:{text: {type: text}}}"` Endpoint string `json:"endpoint,omitempty" elastic_mapping:"endpoint: { type: keyword }"` Version map[string]interface{} `json:"version,omitempty" elastic_mapping:"version: { type: object }"` diff --git a/plugin/api/gateway/api.go b/plugin/api/gateway/api.go index e37d444a..3034a3a8 100644 --- a/plugin/api/gateway/api.go +++ b/plugin/api/gateway/api.go @@ -24,4 +24,6 @@ func InitAPI() { api.HandleAPIMethod(api.POST, "/gateway/instance/status", gateway.RequirePermission(gateway.getInstanceStatus, enum.PermissionGatewayInstanceRead)) api.HandleAPIMethod(api.POST, "/gateway/instance/:instance_id/_proxy", gateway.RequirePermission(gateway.proxy, enum.PermissionGatewayInstanceRead)) + + api.HandleAPIMethod(api.GET, "/_platform/nodes", gateway.getExecutionNodes) } diff --git a/plugin/api/gateway/instance.go b/plugin/api/gateway/instance.go index 9e09f7f1..2869adee 100644 --- a/plugin/api/gateway/instance.go +++ b/plugin/api/gateway/instance.go @@ -5,18 +5,22 @@ package gateway import ( + "context" "fmt" log "github.com/cihub/seelog" "github.com/segmentio/encoding/json" "infini.sh/console/model/gateway" "infini.sh/framework/core/agent" httprouter "infini.sh/framework/core/api/router" + elastic2 "infini.sh/framework/core/elastic" "infini.sh/framework/core/orm" "infini.sh/framework/core/proxy" "infini.sh/framework/core/util" + "infini.sh/framework/modules/elastic" "net/http" "strconv" "strings" + "time" ) func (h *GatewayAPI) createInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { @@ -28,6 +32,24 @@ func (h *GatewayAPI) createInstance(w http.ResponseWriter, req *http.Request, ps return } + res, err := h.doConnect(obj.Endpoint, obj.BasicAuth) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + obj.ID = res.ID + + exists, err := orm.Get(obj) + if err != nil && err != elastic.ErrNotFound { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + if exists { + h.WriteError(w, "gateway instance already registered", http.StatusInternalServerError) + return + } err = orm.Create(obj) if err != nil { h.WriteError(w, err.Error(), http.StatusInternalServerError) @@ -329,4 +351,159 @@ func (h *GatewayAPI) tryConnect(w http.ResponseWriter, req *http.Request, ps htt return } h.WriteJSON(w, connectRes, http.StatusOK) +} + +func (h *GatewayAPI) getExecutionNodes(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ + var ( + keyword = h.GetParameterOrDefault(req, "keyword", "") + strSize = h.GetParameterOrDefault(req, "size", "10") + strFrom = h.GetParameterOrDefault(req, "from", "0") + ) + size, _ := strconv.Atoi(strSize) + if size <= 0 { + size = 10 + } + from, _ := strconv.Atoi(strFrom) + if from < 0 { + from = 0 + } + agentIndexName := orm.GetIndexName(agent.Instance{}) + gatewayIndexName := orm.GetIndexName(gateway.Instance{}) + agentMust := []util.MapStr{ + { + "term": util.MapStr{ + "enrolled": util.MapStr{ + "value": true, + }, + }, + }, + { + "term": util.MapStr{ + "status": util.MapStr{ + "value": "online", + }, + }, + }, + { + "term": util.MapStr{ + "_index": util.MapStr{ + "value": agentIndexName, + }, + }, + }, + } + + boolQ := util.MapStr{ + "minimum_should_match": 1, + "should": []util.MapStr{ + { + "bool": util.MapStr{ + "must": agentMust, + }, + }, + { + "term": util.MapStr{ + "_index": util.MapStr{ + "value": gatewayIndexName, + }, + }, + }, + }, + } + if keyword != "" { + boolQ["must"] = []util.MapStr{ + { + "query_string": util.MapStr{ + "default_field":"*", + "query": keyword, + }, + }, + } + } + + query := util.MapStr{ + "size": size, + "from": from, + "sort": []util.MapStr{ + { + "created": util.MapStr{ + "order": "desc", + }, + }, + }, + "query": util.MapStr{ + "bool": boolQ, + }, + } + q := orm.Query{ + IndexName: fmt.Sprintf("%s,%s", gatewayIndexName, agentIndexName), + RawQuery: util.MustToJSONBytes(query), + } + err, result := orm.Search(nil, &q) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + searchRes := elastic2.SearchResponse{} + err = util.FromJSONBytes(result.Raw, &searchRes) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + var nodes []util.MapStr + for _, hit := range searchRes.Hits.Hits { + var ( + endpoint string + ok bool + ) + node := util.MapStr{ + "id": hit.Source["id"], + "name": hit.Source["name"], + "available": false, + } + hasErr := false + if hit.Index == gatewayIndexName { + node["type"] = "gateway" + if endpoint, ok = hit.Source["endpoint"].(string); !ok { + log.Warnf("got unexpect endpoint type of gateway instance [%s]: %s", hit.ID, hit.Source["endpoint"]) + hasErr = true + } + }else if hit.Index == agentIndexName { + node["type"] = "agent" + endpoint = fmt.Sprintf("%s://%s:%v", hit.Source["schema"], hit.Source["remote_ip"], hit.Source["port"]) + } + if !hasErr { + available, err := isNodeAvailable(endpoint) + if err != nil { + log.Error(err) + } + node["available"] = available + } + nodes = append(nodes, node) + } + h.WriteJSON(w, nodes, http.StatusOK) +} + +func isNodeAvailable(endpoint string) (bool, error){ + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + rq := &util.Request{ + Method: http.MethodGet, + Url: fmt.Sprintf("%s%s", endpoint, "/pipeline/tasks/_dynamic"), + Context: ctx, + } + resp, err := util.ExecuteRequest(rq) + if err != nil { + return false, err + } + resBody := struct { + Success bool `json:"success"` + }{} + err = util.FromJSONBytes(resp.Body, &resBody) + if err != nil { + return false, err + } + return resBody.Success, nil } \ No newline at end of file diff --git a/ui.go b/ui.go index 95b7a9cd..92524c93 100644 --- a/ui.go +++ b/ui.go @@ -6,11 +6,11 @@ import ( "net/http" log "github.com/cihub/seelog" + "infini.sh/console/config" + uiapi "infini.sh/console/plugin/api" "infini.sh/framework/core/api" "infini.sh/framework/core/util" "infini.sh/framework/core/vfs" - "infini.sh/console/config" - uiapi "infini.sh/console/plugin/api" ) type UI struct { @@ -31,7 +31,7 @@ func (h UI) InitUI() { // //api.HandleUIFunc("/config", func(w http.ResponseWriter, req *http.Request){ // if(strings.TrimSpace(apiEndpoint) == ""){ - // hostParts := strings.Split(req.Host, ":") + // hostParts := strings.Split(req.RemoteIP, ":") // apiEndpoint = fmt.Sprintf("%s//%s:%s", apiConfig.GetSchema(), hostParts[0], apiConfig.NetworkConfig.GetBindingPort()) // } // buf, _ := json.Marshal(util.MapStr{