Merge pull request 'migration' (#17) from migration into master

This commit is contained in:
silenceqi 2022-10-25 17:38:26 +08:00
commit c916a6a55e
6 changed files with 196 additions and 4 deletions

View File

@ -75,3 +75,11 @@ pipeline:
group: activity group: activity
when: when:
cluster_available: ["$[[CLUSTER_ID]]"] cluster_available: ["$[[CLUSTER_ID]]"]
- name: cluster_migration_split
auto_start: true
keep_running: true
processor:
- cluster_migration:
elasticsearch: "$[[CLUSTER_ID]]"
when:
cluster_available: ["$[[CLUSTER_ID]]"]

View File

@ -23,6 +23,7 @@ import (
_ "infini.sh/framework/modules/api" _ "infini.sh/framework/modules/api"
elastic2 "infini.sh/framework/modules/elastic" elastic2 "infini.sh/framework/modules/elastic"
"infini.sh/framework/modules/metrics" "infini.sh/framework/modules/metrics"
"infini.sh/framework/modules/migration"
"infini.sh/framework/modules/pipeline" "infini.sh/framework/modules/pipeline"
queue2 "infini.sh/framework/modules/queue/disk_queue" queue2 "infini.sh/framework/modules/queue/disk_queue"
"infini.sh/framework/modules/redis" "infini.sh/framework/modules/redis"
@ -70,6 +71,7 @@ func main() {
modules=append(modules,&agent.AgentModule{}) modules=append(modules,&agent.AgentModule{})
modules=append(modules,&metrics.MetricsModule{}) modules=append(modules,&metrics.MetricsModule{})
modules=append(modules,&security.Module{}) modules=append(modules,&security.Module{})
modules=append(modules,&migration.MigrationModule{})
uiModule:=&ui.UIModule{} uiModule:=&ui.UIModule{}
@ -89,6 +91,7 @@ func main() {
module.RegisterSystemModule(&agent.AgentModule{}) module.RegisterSystemModule(&agent.AgentModule{})
module.RegisterSystemModule(&metrics.MetricsModule{}) module.RegisterSystemModule(&metrics.MetricsModule{})
module.RegisterSystemModule(&security.Module{}) module.RegisterSystemModule(&security.Module{})
module.RegisterSystemModule(&migration.MigrationModule{})
}else{ }else{
for _, v := range modules { for _, v := range modules {
v.Setup() v.Setup()
@ -137,6 +140,8 @@ func main() {
orm.RegisterSchemaWithIndexName(alerting.Channel{}, "channel") orm.RegisterSchemaWithIndexName(alerting.Channel{}, "channel")
orm.RegisterSchemaWithIndexName(insight.Visualization{}, "visualization") orm.RegisterSchemaWithIndexName(insight.Visualization{}, "visualization")
orm.RegisterSchemaWithIndexName(insight.Dashboard{}, "dashboard") orm.RegisterSchemaWithIndexName(insight.Dashboard{}, "dashboard")
orm.RegisterSchemaWithIndexName(task1.Task{}, "task")
orm.RegisterSchemaWithIndexName(task1.Log{}, "task-log")
api.RegisterSchema() api.RegisterSchema()
if global.Env().SetupRequired() { if global.Env().SetupRequired() {

View File

@ -13,7 +13,7 @@ import (
type Instance struct { type Instance struct {
orm.ORMObjectBase orm.ORMObjectBase
InstanceID string `json:"instance_id,omitempty" elastic_mapping:"instance_id: { type: keyword }"` //InstanceID string `json:"instance_id,omitempty" elastic_mapping:"instance_id: { type: keyword }"`
Name string `json:"name,omitempty" elastic_mapping:"name:{type:keyword,fields:{text: {type: text}}}"` Name string `json:"name,omitempty" elastic_mapping:"name:{type:keyword,fields:{text: {type: text}}}"`
Endpoint string `json:"endpoint,omitempty" elastic_mapping:"endpoint: { type: keyword }"` Endpoint string `json:"endpoint,omitempty" elastic_mapping:"endpoint: { type: keyword }"`
Version map[string]interface{} `json:"version,omitempty" elastic_mapping:"version: { type: object }"` Version map[string]interface{} `json:"version,omitempty" elastic_mapping:"version: { type: object }"`

View File

@ -24,4 +24,6 @@ func InitAPI() {
api.HandleAPIMethod(api.POST, "/gateway/instance/status", gateway.RequirePermission(gateway.getInstanceStatus, enum.PermissionGatewayInstanceRead)) api.HandleAPIMethod(api.POST, "/gateway/instance/status", gateway.RequirePermission(gateway.getInstanceStatus, enum.PermissionGatewayInstanceRead))
api.HandleAPIMethod(api.POST, "/gateway/instance/:instance_id/_proxy", gateway.RequirePermission(gateway.proxy, enum.PermissionGatewayInstanceRead)) api.HandleAPIMethod(api.POST, "/gateway/instance/:instance_id/_proxy", gateway.RequirePermission(gateway.proxy, enum.PermissionGatewayInstanceRead))
api.HandleAPIMethod(api.GET, "/_platform/nodes", gateway.getExecutionNodes)
} }

View File

@ -5,18 +5,22 @@
package gateway package gateway
import ( import (
"context"
"fmt" "fmt"
log "github.com/cihub/seelog" log "github.com/cihub/seelog"
"github.com/segmentio/encoding/json" "github.com/segmentio/encoding/json"
"infini.sh/console/model/gateway" "infini.sh/console/model/gateway"
"infini.sh/framework/core/agent" "infini.sh/framework/core/agent"
httprouter "infini.sh/framework/core/api/router" httprouter "infini.sh/framework/core/api/router"
elastic2 "infini.sh/framework/core/elastic"
"infini.sh/framework/core/orm" "infini.sh/framework/core/orm"
"infini.sh/framework/core/proxy" "infini.sh/framework/core/proxy"
"infini.sh/framework/core/util" "infini.sh/framework/core/util"
"infini.sh/framework/modules/elastic"
"net/http" "net/http"
"strconv" "strconv"
"strings" "strings"
"time"
) )
func (h *GatewayAPI) createInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { func (h *GatewayAPI) createInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
@ -28,6 +32,24 @@ func (h *GatewayAPI) createInstance(w http.ResponseWriter, req *http.Request, ps
return return
} }
res, err := h.doConnect(obj.Endpoint, obj.BasicAuth)
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err)
return
}
obj.ID = res.ID
exists, err := orm.Get(obj)
if err != nil && err != elastic.ErrNotFound {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err)
return
}
if exists {
h.WriteError(w, "gateway instance already registered", http.StatusInternalServerError)
return
}
err = orm.Create(obj) err = orm.Create(obj)
if err != nil { if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError) h.WriteError(w, err.Error(), http.StatusInternalServerError)
@ -330,3 +352,158 @@ func (h *GatewayAPI) tryConnect(w http.ResponseWriter, req *http.Request, ps htt
} }
h.WriteJSON(w, connectRes, http.StatusOK) h.WriteJSON(w, connectRes, http.StatusOK)
} }
func (h *GatewayAPI) getExecutionNodes(w http.ResponseWriter, req *http.Request, ps httprouter.Params){
var (
keyword = h.GetParameterOrDefault(req, "keyword", "")
strSize = h.GetParameterOrDefault(req, "size", "10")
strFrom = h.GetParameterOrDefault(req, "from", "0")
)
size, _ := strconv.Atoi(strSize)
if size <= 0 {
size = 10
}
from, _ := strconv.Atoi(strFrom)
if from < 0 {
from = 0
}
agentIndexName := orm.GetIndexName(agent.Instance{})
gatewayIndexName := orm.GetIndexName(gateway.Instance{})
agentMust := []util.MapStr{
{
"term": util.MapStr{
"enrolled": util.MapStr{
"value": true,
},
},
},
{
"term": util.MapStr{
"status": util.MapStr{
"value": "online",
},
},
},
{
"term": util.MapStr{
"_index": util.MapStr{
"value": agentIndexName,
},
},
},
}
boolQ := util.MapStr{
"minimum_should_match": 1,
"should": []util.MapStr{
{
"bool": util.MapStr{
"must": agentMust,
},
},
{
"term": util.MapStr{
"_index": util.MapStr{
"value": gatewayIndexName,
},
},
},
},
}
if keyword != "" {
boolQ["must"] = []util.MapStr{
{
"query_string": util.MapStr{
"default_field":"*",
"query": keyword,
},
},
}
}
query := util.MapStr{
"size": size,
"from": from,
"sort": []util.MapStr{
{
"created": util.MapStr{
"order": "desc",
},
},
},
"query": util.MapStr{
"bool": boolQ,
},
}
q := orm.Query{
IndexName: fmt.Sprintf("%s,%s", gatewayIndexName, agentIndexName),
RawQuery: util.MustToJSONBytes(query),
}
err, result := orm.Search(nil, &q)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
searchRes := elastic2.SearchResponse{}
err = util.FromJSONBytes(result.Raw, &searchRes)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
var nodes []util.MapStr
for _, hit := range searchRes.Hits.Hits {
var (
endpoint string
ok bool
)
node := util.MapStr{
"id": hit.Source["id"],
"name": hit.Source["name"],
"available": false,
}
hasErr := false
if hit.Index == gatewayIndexName {
node["type"] = "gateway"
if endpoint, ok = hit.Source["endpoint"].(string); !ok {
log.Warnf("got unexpect endpoint type of gateway instance [%s]: %s", hit.ID, hit.Source["endpoint"])
hasErr = true
}
}else if hit.Index == agentIndexName {
node["type"] = "agent"
endpoint = fmt.Sprintf("%s://%s:%v", hit.Source["schema"], hit.Source["remote_ip"], hit.Source["port"])
}
if !hasErr {
available, err := isNodeAvailable(endpoint)
if err != nil {
log.Error(err)
}
node["available"] = available
}
nodes = append(nodes, node)
}
h.WriteJSON(w, nodes, http.StatusOK)
}
func isNodeAvailable(endpoint string) (bool, error){
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
rq := &util.Request{
Method: http.MethodGet,
Url: fmt.Sprintf("%s%s", endpoint, "/pipeline/tasks/_dynamic"),
Context: ctx,
}
resp, err := util.ExecuteRequest(rq)
if err != nil {
return false, err
}
resBody := struct {
Success bool `json:"success"`
}{}
err = util.FromJSONBytes(resp.Body, &resBody)
if err != nil {
return false, err
}
return resBody.Success, nil
}

6
ui.go
View File

@ -6,11 +6,11 @@ import (
"net/http" "net/http"
log "github.com/cihub/seelog" log "github.com/cihub/seelog"
"infini.sh/console/config"
uiapi "infini.sh/console/plugin/api"
"infini.sh/framework/core/api" "infini.sh/framework/core/api"
"infini.sh/framework/core/util" "infini.sh/framework/core/util"
"infini.sh/framework/core/vfs" "infini.sh/framework/core/vfs"
"infini.sh/console/config"
uiapi "infini.sh/console/plugin/api"
) )
type UI struct { type UI struct {
@ -31,7 +31,7 @@ func (h UI) InitUI() {
// //
//api.HandleUIFunc("/config", func(w http.ResponseWriter, req *http.Request){ //api.HandleUIFunc("/config", func(w http.ResponseWriter, req *http.Request){
// if(strings.TrimSpace(apiEndpoint) == ""){ // if(strings.TrimSpace(apiEndpoint) == ""){
// hostParts := strings.Split(req.Host, ":") // hostParts := strings.Split(req.RemoteIP, ":")
// apiEndpoint = fmt.Sprintf("%s//%s:%s", apiConfig.GetSchema(), hostParts[0], apiConfig.NetworkConfig.GetBindingPort()) // apiEndpoint = fmt.Sprintf("%s//%s:%s", apiConfig.GetSchema(), hostParts[0], apiConfig.NetworkConfig.GetBindingPort())
// } // }
// buf, _ := json.Marshal(util.MapStr{ // buf, _ := json.Marshal(util.MapStr{