diff --git a/main.go b/main.go index 3145e2bf..8c31eadb 100644 --- a/main.go +++ b/main.go @@ -16,11 +16,12 @@ import ( _ "infini.sh/framework/core/log" "infini.sh/framework/core/module" "infini.sh/framework/core/orm" + task2 "infini.sh/framework/core/task" "infini.sh/framework/modules/agent" _ "infini.sh/framework/modules/api" elastic2 "infini.sh/framework/modules/elastic" - "infini.sh/framework/modules/filter" "infini.sh/framework/modules/metrics" + "infini.sh/framework/modules/migration" "infini.sh/framework/modules/pipeline" queue2 "infini.sh/framework/modules/queue/disk_queue" "infini.sh/framework/modules/redis" @@ -67,7 +68,6 @@ func main() { } //load core modules first - module.RegisterSystemModule(&filter.FilterModule{}) module.RegisterSystemModule(&elastic2.ElasticModule{}) module.RegisterSystemModule(&stats.SimpleStatsModule{}) module.RegisterSystemModule(&queue2.DiskQueue{}) @@ -76,6 +76,7 @@ func main() { module.RegisterSystemModule(&pipeline.PipeModule{}) module.RegisterSystemModule(&task.TaskModule{}) module.RegisterSystemModule(&agent.AgentModule{}) + module.RegisterSystemModule(&migration.MigrationModule{}) module.RegisterUserPlugin(&metrics.MetricsModule{}) @@ -118,6 +119,8 @@ func main() { orm.RegisterSchemaWithIndexName(alerting.Channel{}, "channel") orm.RegisterSchemaWithIndexName(insight.Visualization{}, "visualization") orm.RegisterSchemaWithIndexName(insight.Dashboard{}, "dashboard") + orm.RegisterSchemaWithIndexName(task2.Task{}, "task") + orm.RegisterSchemaWithIndexName(task2.Log{}, "task-log") api.RegisterSchema() go func() { diff --git a/model/gateway/instance.go b/model/gateway/instance.go index 73977adb..59c32005 100644 --- a/model/gateway/instance.go +++ b/model/gateway/instance.go @@ -13,7 +13,7 @@ import ( type Instance struct { orm.ORMObjectBase - InstanceID string `json:"instance_id,omitempty" elastic_mapping:"instance_id: { type: keyword }"` + //InstanceID string `json:"instance_id,omitempty" elastic_mapping:"instance_id: { type: keyword }"` Name string `json:"name,omitempty" elastic_mapping:"name:{type:keyword,fields:{text: {type: text}}}"` Endpoint string `json:"endpoint,omitempty" elastic_mapping:"endpoint: { type: keyword }"` Version map[string]interface{} `json:"version,omitempty" elastic_mapping:"version: { type: object }"` diff --git a/plugin/api/gateway/api.go b/plugin/api/gateway/api.go index d41a9524..23c6929c 100644 --- a/plugin/api/gateway/api.go +++ b/plugin/api/gateway/api.go @@ -24,4 +24,6 @@ func init() { api.HandleAPIMethod(api.POST, "/gateway/instance/status", gateway.RequirePermission(gateway.getInstanceStatus, enum.PermissionGatewayInstanceRead)) api.HandleAPIMethod(api.POST, "/gateway/instance/:instance_id/_proxy", gateway.RequirePermission(gateway.proxy, enum.PermissionGatewayInstanceRead)) + + api.HandleAPIMethod(api.GET, "/_platform/nodes", gateway.getExecutionNodes) } diff --git a/plugin/api/gateway/instance.go b/plugin/api/gateway/instance.go index 9e09f7f1..0c1423a2 100644 --- a/plugin/api/gateway/instance.go +++ b/plugin/api/gateway/instance.go @@ -14,6 +14,7 @@ import ( "infini.sh/framework/core/orm" "infini.sh/framework/core/proxy" "infini.sh/framework/core/util" + "infini.sh/framework/modules/elastic" "net/http" "strconv" "strings" @@ -28,6 +29,24 @@ func (h *GatewayAPI) createInstance(w http.ResponseWriter, req *http.Request, ps return } + res, err := h.doConnect(obj.Endpoint, obj.BasicAuth) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + obj.ID = res.ID + + exists, err := orm.Get(obj) + if err != nil && err != elastic.ErrNotFound { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + if exists { + h.WriteError(w, "gateway instance already registered", http.StatusInternalServerError) + return + } err = orm.Create(obj) if err != nil { h.WriteError(w, err.Error(), http.StatusInternalServerError) @@ -329,4 +348,73 @@ func (h *GatewayAPI) tryConnect(w http.ResponseWriter, req *http.Request, ps htt return } h.WriteJSON(w, connectRes, http.StatusOK) +} + +func (h *GatewayAPI) getExecutionNodes(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ + query := util.MapStr{ + "size": 1000, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "enrolled": util.MapStr{ + "value": true, + }, + }, + }, + { + "term": util.MapStr{ + "status": util.MapStr{ + "value": "online", + }, + }, + }, + }, + }, + }, + } + q := orm.Query{ + RawQuery: util.MustToJSONBytes(query), + } + err, result := orm.Search(agent.Instance{}, &q) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + var nodes []util.MapStr + //nodes from agent + for _, row := range result.Result { + if rowM, ok := row.(map[string]interface{}); ok { + nodes = append(nodes, util.MapStr{ + "id": rowM["id"], + "name": rowM["name"], + "type": "agent", + }) + } + } + + q = orm.Query{ + Size: 1000, + } + err, result = orm.Search(gateway.Instance{}, &q) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + //nodes from gateway + for _, row := range result.Result { + if rowM, ok := row.(map[string]interface{}); ok { + nodes = append(nodes, util.MapStr{ + "id": rowM["id"], + "name": rowM["name"], + "type": "gateway", + }) + } + } + h.WriteJSON(w, nodes, http.StatusOK) } \ No newline at end of file