diff --git a/modules/agent/api/elasticsearch.go b/modules/agent/api/elasticsearch.go index 1034abb6..1926428a 100644 --- a/modules/agent/api/elasticsearch.go +++ b/modules/agent/api/elasticsearch.go @@ -7,16 +7,21 @@ package api import ( "bytes" "context" + "errors" "fmt" log "github.com/cihub/seelog" httprouter "infini.sh/framework/core/api/router" "infini.sh/framework/core/elastic" + "infini.sh/framework/core/global" "infini.sh/framework/core/model" "infini.sh/framework/core/orm" "infini.sh/framework/core/util" common2 "infini.sh/framework/modules/elastic/common" "infini.sh/framework/plugins/managed/common" + "infini.sh/framework/plugins/managed/server" "net/http" + "net/url" + "sync" "time" ) @@ -91,7 +96,6 @@ func refreshNodesInfo(inst *model.Instance) (*elastic.DiscoveryResult, error) { } for nodeID, node := range nodesInfo.Nodes { - v, ok := enrolledNodesByAgent[nodeID] if ok { node.ClusterID = v.ClusterID @@ -99,188 +103,14 @@ func refreshNodesInfo(inst *model.Instance) (*elastic.DiscoveryResult, error) { } } - ////not recognized by agent, need auth? - //for _, node := range nodesInfo.UnknownProcess{ - // for _, v := range node.ListenAddresses { - // //ask user to manual enroll this node - // //check local credentials, if it works, get node info - // } - //} - - // { - // //node was not recognized by agent, need auth? - // if node.HttpPort != "" { - // for _, v := range enrolledNodesByAgent { - // if v.PublishAddress != "" { - // if util.UnifyLocalAddress(v.PublishAddress) == util.UnifyLocalAddress(node.PublishAddress) { - // node.ClusterID = v.ClusterID - // node.ClusterName = v.ClusterName - // node.NodeUUID = v.NodeUUID - // node.ClusterUuid = v.ClusterUUID - // node.NodeName = v.NodeName - // node.Path.Home = v.PathHome - // node.Path.Logs = v.PathLogs - // node.AgentID = inst.ID - // //TODO verify node info if the node id really match, need to fetch the credentials for agent - // //or let manager sync configs to this agent, verify the node info after receiving the configs - // //report any error along with this agent and node info - // break - // } - // } - // } - // } - // - //} - return nodesInfo, nil } -type RemoteConfig struct { - orm.ORMObjectBase - Metadata model.Metadata `json:"metadata" elastic_mapping:"metadata: { type: object }"` - Payload common.ConfigFile `json:"payload" elastic_mapping:"payload: { type: object}"` -} - -func remoteConfigProvider(instance model.Instance) []*common.ConfigFile { - - //fetch configs from remote db - //fetch configs assigned to (instance=_all OR instance=$instance_id ) AND application.name=$application.name - - q := orm.Query{ - Size: 1000, - Conds: orm.And(orm.Eq("metadata.category", "app_settings"), - orm.Eq("metadata.name", instance.Application.Name), - orm.Eq("metadata.labels.instance", "_all"), - ), - } - - err, searchResult := orm.Search(RemoteConfig{}, &q) - if err != nil { - panic(err) - } - - result := []*common.ConfigFile{} - - for _, row := range searchResult.Result { - v, ok := row.(map[string]interface{}) - if ok { - x, ok := v["payload"] - if ok { - f, ok := x.(map[string]interface{}) - if ok { - name, ok := f["name"].(string) - if ok { - item := common.ConfigFile{} - item.Name = util.ToString(name) - item.Location = util.ToString(f["location"]) - item.Content = util.ToString(f["content"]) - item.Version,_ = util.ToInt64(util.ToString(f["version"])) - item.Size = int64(len(item.Content)) - item.Managed = true - t, ok := v["updated"] - if ok { - layout := "2006-01-02T15:04:05.999999-07:00" - t1, err := time.Parse(layout, util.ToString(t)) - if err == nil { - item.Updated = t1.Unix() - } - } - result=append(result,&item) - } - } - } - } - } - - return result -} - -func dynamicAgentConfigProvider(instance model.Instance) []*common.ConfigFile { - - //get config files from remote db - //get settings with this agent id - result := []*common.ConfigFile{} - ids, err := GetEnrolledNodesByAgent(&instance) - if err != nil { - panic(err) - } - - var latestTimestamp int64 - for _, v := range ids { - if v.Updated > latestTimestamp { - latestTimestamp = v.Updated - } - } - - if len(ids) > 0 { - - cfg := common.ConfigFile{} - cfg.Name = "generated_metrics_tasks.yml" - cfg.Location = "generated_metrics_tasks.yml" - cfg.Content = getConfigs(ids) - cfg.Size = int64(len(cfg.Content)) - cfg.Version = latestTimestamp - cfg.Managed = true - cfg.Updated = latestTimestamp - result = append(result, &cfg) - } - - return result -} - -func getConfigs(items map[string]BindingItem) string { - buffer := bytes.NewBuffer([]byte("configs.template:\n ")) - - for _, v := range items { - - if v.ClusterID == "" { - panic("cluster id is empty") - } - metadata := elastic.GetMetadata(v.ClusterID) - var clusterLevelEnabled = false - var nodeLevelEnabled = true - var clusterEndPoint = metadata.Config.GetAnyEndpoint() - credential, err := common2.GetCredential(metadata.Config.CredentialID) - if err != nil { - panic(err) - } - var dv interface{} - dv, err = credential.Decode() - if err != nil { - panic(err) - } - var username = "" - var password = "" - - if auth, ok := dv.(model.BasicAuth); ok { - username = auth.Username - password = auth.Password - } - - buffer.Write([]byte(fmt.Sprintf("- name: \"%v\"\n path: ./config/task_config.tpl\n "+ - "variable:\n "+ - "CLUSTER_ID: %v\n "+ - "CLUSTER_ENDPOINT: [\"%v\"]\n "+ - "CLUSTER_USERNAME: \"%v\"\n "+ - "CLUSTER_PASSWORD: \"%v\"\n "+ - "CLUSTER_LEVEL_TASKS_ENABLED: %v\n "+ - "NODE_LEVEL_TASKS_ENABLED: %v\n "+ - "NODE_LOGS_PATH: \"%v\"\n\n\n"+ - "#MANAGED_CONFIG_VERSION: %v\n"+ - "#MANAGED: true", - v.NodeUUID, v.ClusterID, clusterEndPoint, username, password, clusterLevelEnabled, nodeLevelEnabled, v.PathLogs, v.Updated))) - } - - //password: $[[keystore.$[[CLUSTER_ID]]_password]] - - return buffer.String() -} - //get nodes info via agent func GetElasticsearchNodesViaAgent(ctx context.Context, instance *model.Instance) (*elastic.DiscoveryResult, error) { req := &util.Request{ Method: http.MethodGet, - Path: "/elasticsearch/nodes/_discovery", + Path: "/elasticsearch/node/_discovery", Context: ctx, } @@ -293,37 +123,6 @@ func GetElasticsearchNodesViaAgent(ctx context.Context, instance *model.Instance return &obj, nil } -func AuthESNode(ctx context.Context, agentBaseURL string, cfg elastic.ElasticsearchConfig) (*model.ESNodeInfo, error) { - reqBody, err := util.ToJSONBytes(cfg) - if err != nil { - return nil, err - } - req := &util.Request{ - Method: http.MethodPost, - Path: "/elasticsearch/_auth", - Context: ctx, - Body: reqBody, - } - resBody := &model.ESNodeInfo{} - err = DoRequest(req, resBody) - if err != nil { - return nil, err - } - return resBody, nil -} - -func getNodeByPidOrUUID(nodes map[int]*model.ESNodeInfo, pid int, uuid string, port string) *model.ESNodeInfo { - if nodes[pid] != nil { - return nodes[pid] - } - for _, node := range nodes { - if node.NodeUUID != "" && node.NodeUUID == uuid { - return node - } - } - return nil -} - type BindingItem struct { ClusterName string `json:"cluster_name"` ClusterUUID string `json:"cluster_uuid"` @@ -338,50 +137,19 @@ type BindingItem struct { Updated int64 `json:"updated"` } -func getUnAssociateNodes() (map[string][]model.ESNodeInfo, error) { - query := util.MapStr{ - "size": 3000, - "query": util.MapStr{ - "bool": util.MapStr{ - "must_not": []util.MapStr{ - { - "exists": util.MapStr{ - "field": "cluster_id", - }, - }, - }, - }, - }, - } - q := orm.Query{ - RawQuery: util.MustToJSONBytes(query), - } - - err, result := orm.Search(model.ESNodeInfo{}, &q) - if err != nil { - return nil, err - } - nodesInfo := map[string][]model.ESNodeInfo{} - for _, row := range result.Result { - node := model.ESNodeInfo{} - buf := util.MustToJSONBytes(row) - util.MustFromJSONBytes(buf, &node) - nodesInfo[node.AgentID] = append(nodesInfo[node.AgentID], node) - } - return nodesInfo, nil -} - func GetElasticLogFiles(ctx context.Context, instance *model.Instance, logsPath string) (interface{}, error) { reqBody := util.MustToJSONBytes(util.MapStr{ "logs_path": logsPath, }) + req := &util.Request{ Method: http.MethodPost, Path: "/elasticsearch/logs/_list", Context: ctx, Body: reqBody, } + resBody := map[string]interface{}{} err := doRequest(instance, req, &resBody) if err != nil { @@ -391,6 +159,7 @@ func GetElasticLogFiles(ctx context.Context, instance *model.Instance, logsPath return nil, fmt.Errorf("get elasticsearch log files error: %v", resBody) } return resBody["result"], nil + } func GetElasticLogFileContent(ctx context.Context, instance *model.Instance, body interface{}) (interface{}, error) { @@ -535,3 +304,281 @@ func getAgentByNodeID(nodeID string) (*model.Instance, string, error) { } return nil, "", nil } + +type ClusterInfo struct { + ClusterIDs []string `json:"cluster_id"` +} + +func (h *APIHandler) discoveryESNodesInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + + id := ps.MustGetParameter("instance_id") + instance := model.Instance{} + instance.ID = id + exists, err := orm.Get(&instance) + if !exists || err != nil { + h.WriteJSON(w, util.MapStr{ + "_id": id, + "found": false, + }, http.StatusNotFound) + return + } + + nodes, err := refreshNodesInfo(&instance) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + if len(nodes.UnknownProcess) > 0 { + + var discoveredPIDs map[int]*elastic.LocalNodeInfo = make(map[int]*elastic.LocalNodeInfo) + + if req.Method == "POST" { + bytes, err := h.GetRawBody(req) + if err != nil { + panic(err) + } + if len(bytes) > 0 { + clusterInfo := ClusterInfo{} + util.FromJSONBytes(bytes, &clusterInfo) + if len(clusterInfo.ClusterIDs) > 0 { + //try connect this node to cluster by using this cluster's agent credential + for _, clusterID := range clusterInfo.ClusterIDs { + meta := elastic.GetMetadata(clusterID) + if meta != nil { + if meta.Config.AgentCredentialID != "" { + auth, err := common2.GetAgentBasicAuth(meta.Config) + if err != nil { + panic(err) + } + if auth != nil { + //try connect + for _, node := range nodes.UnknownProcess { + for _, v := range node.ListenAddresses { + ip := v.IP + if util.ContainStr(v.IP, "::") { + ip = fmt.Sprintf("[%s]", v.IP) + } + nodeHost := fmt.Sprintf("%s:%d", ip, v.Port) + success, tryAgain, nodeInfo := h.getESNodeInfoViaProxy(nodeHost, "http", auth, instance) + if !success && tryAgain { + //try https again + success, tryAgain, nodeInfo = h.getESNodeInfoViaProxy(nodeHost, "https", auth, instance) + } + if success { + log.Error("connect to es node success:", nodeHost) + discoveredPIDs[node.PID] = nodeInfo + break + } + } + } + } + } + } + } + } + } + } + + newUnknownProcess := []model.ProcessInfo{} + if len(discoveredPIDs) > 0 { + for _, node := range nodes.UnknownProcess { + if item, ok := discoveredPIDs[node.PID]; !ok { + newUnknownProcess = append(newUnknownProcess, node) + } else { + nodes.Nodes[item.NodeUUID] = item + } + } + nodes.UnknownProcess = newUnknownProcess + } + } + + h.WriteJSON(w, nodes, http.StatusOK) +} + +func (h *APIHandler) getESNodeInfoViaProxy(host string, schema string, auth *model.BasicAuth, instance model.Instance) (success, tryAgain bool, info *elastic.LocalNodeInfo) { + esConfig := elastic.ElasticsearchConfig{Host: host, Schema: schema, BasicAuth: auth} + body := util.MustToJSONBytes(esConfig) + res, err := server.ProxyRequestToRuntimeInstance(instance.GetEndpoint(), "POST", "/elasticsearch/node/_info", + body, int64(len(body)), auth) + + if err != nil { + panic(err) + } + + if global.Env().IsDebug { + if res != nil { // && res.StatusCode==http.StatusOK + log.Debug(string(res.Body)) + } + } + + if res != nil && res.StatusCode == http.StatusForbidden { + return false, false, nil + } + + if res != nil && res.StatusCode == http.StatusOK { + node := elastic.LocalNodeInfo{} + err := util.FromJSONBytes(res.Body, &node) + if err != nil { + panic(err) + } + return true, false, &node + } + + return false, true, nil +} + +func NewClusterSettings(clusterID string) *model.Setting { + settings := model.Setting{ + Metadata: model.Metadata{ + Category: Cluster, + }, + } + settings.ID = fmt.Sprintf("%v_%v_%v", settings.Metadata.Category, settings.Metadata.Name, clusterID) + + settings.Metadata.Labels = util.MapStr{ + "cluster_id": clusterID, + } + + return &settings +} + +func NewNodeAgentSettings(instanceID string, item *BindingItem) *model.Setting { + + settings := model.Setting{ + Metadata: model.Metadata{ + Category: Node, + Name: "agent", + }, + } + settings.ID = fmt.Sprintf("%v_%v_%v", settings.Metadata.Category, settings.Metadata.Name, item.NodeUUID) + + settings.Metadata.Labels = util.MapStr{ + "agent_id": instanceID, + } + + settings.Payload = util.MapStr{ + "cluster_id": item.ClusterID, + "cluster_name": item.ClusterName, + "cluster_uuid": item.ClusterUUID, + "node_uuid": item.NodeUUID, + "publish_address": item.PublishAddress, + "node_name": item.NodeName, + "path_home": item.PathHome, + "path_logs": item.PathLogs, + } + + return &settings +} + +func NewIndexSettings(clusterID, nodeID, agentID, indexName, indexID string) *model.Setting { + + settings := model.Setting{ + Metadata: model.Metadata{ + Category: Index, + }, + } + settings.ID = fmt.Sprintf("%v_%v_%v", settings.Metadata.Category, settings.Metadata.Name, nodeID) + + settings.Metadata.Labels = util.MapStr{ + "cluster_id": clusterID, + "node_id": nodeID, + "agent_id": agentID, + "index_name": indexName, + "index_id": indexID, + } + + return &settings +} + +const Cluster = "cluster_settings" +const Node = "node_settings" +const Index = "index_settings" + +func (h *APIHandler) revokeESNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + //agent id + instID := ps.MustGetParameter("instance_id") + item := BindingItem{} + err := h.DecodeJSON(req, &item) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + settings := NewNodeAgentSettings(instID, &item) + err = orm.Delete(&orm.Context{ + Refresh: "wait_for", + }, settings) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + h.WriteAckOKJSON(w) +} + +func (h *APIHandler) enrollESNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + + //agent id + instID := ps.MustGetParameter("instance_id") + + //node id and cluster id + item := BindingItem{} + err := h.DecodeJSON(req, &item) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + //update node's setting + settings := NewNodeAgentSettings(instID, &item) + err = orm.Update(&orm.Context{ + Refresh: "wait_for", + }, settings) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + h.WriteAckOKJSON(w) +} + +var mTLSClient *http.Client //TODO get mTLSClient +var initOnce = sync.Once{} + +func doRequest(instance *model.Instance, req *util.Request, obj interface{}) error { + var err error + var res *util.Result + + initOnce.Do(func() { + if global.Env().SystemConfig.Configs.TLSConfig.TLSEnabled && global.Env().SystemConfig.Configs.TLSConfig.TLSCAFile != "" { + + //init client + hClient, err := util.NewMTLSClient( + global.Env().SystemConfig.Configs.TLSConfig.TLSCAFile, + global.Env().SystemConfig.Configs.TLSConfig.TLSCertFile, + global.Env().SystemConfig.Configs.TLSConfig.TLSKeyFile) + if err != nil { + panic(err) + } + mTLSClient = hClient + } + }) + + req.Url, err = url.JoinPath(instance.GetEndpoint(), req.Path) + res, err = util.ExecuteRequestWithCatchFlag(mTLSClient, req, true) + if err != nil || res.StatusCode != 200 { + body := "" + if res != nil { + body = string(res.Body) + } + return errors.New(fmt.Sprintf("request error: %v, %v", err, body)) + } + + if res != nil { + if res.Body != nil { + return util.FromJSONBytes(res.Body, obj) + } + } + + return nil +} diff --git a/modules/agent/api/init.go b/modules/agent/api/init.go index d863e023..b1a25c7d 100644 --- a/modules/agent/api/init.go +++ b/modules/agent/api/init.go @@ -10,6 +10,10 @@ import ( "infini.sh/framework/plugins/managed/server" ) +type APIHandler struct { + api.Handler +} + func Init() { handler := APIHandler{} api.HandleAPIMethod(api.POST, "/host/_enroll", handler.enrollHost) @@ -18,23 +22,17 @@ func Init() { api.HandleAPIMethod(api.DELETE, "/host/:host_id", handler.deleteHost) //bind agent with nodes - api.HandleAPIMethod(api.GET, "/instance/:instance_id/node/_discovery", handler.RequirePermission(handler.getESNodesInfo, enum.PermissionAgentInstanceRead)) + api.HandleAPIMethod(api.GET, "/instance/:instance_id/node/_discovery", handler.RequirePermission(handler.discoveryESNodesInfo, enum.PermissionAgentInstanceRead)) + api.HandleAPIMethod(api.POST, "/instance/:instance_id/node/_discovery", handler.RequirePermission(handler.discoveryESNodesInfo, enum.PermissionAgentInstanceRead)) api.HandleAPIMethod(api.POST, "/instance/:instance_id/node/_enroll", handler.RequirePermission(handler.enrollESNode, enum.PermissionAgentInstanceWrite)) api.HandleAPIMethod(api.POST, "/instance/:instance_id/node/_revoke", handler.RequirePermission(handler.revokeESNode, enum.PermissionAgentInstanceWrite)) - api.HandleAPIMethod(api.POST, "/auto_associate", handler.RequirePermission(handler.autoAssociateESNode, enum.PermissionAgentInstanceWrite)) - - //api.HandleAPIMethod(api.POST, "/instance/:instance_id/node/_auth", handler.RequirePermission(handler.authESNode, enum.PermissionAgentInstanceWrite)) - //api.HandleAPIMethod(api.DELETE, "/instance/:instance_id/_nodes", handler.RequirePermission(handler.deleteESNode, enum.PermissionAgentInstanceWrite)) + //api.HandleAPIMethod(api.POST, "/auto_associate", handler.RequirePermission(handler.autoAssociateESNode, enum.PermissionAgentInstanceWrite)) //get elasticsearch node logs, direct fetch or via stored logs(TODO) api.HandleAPIMethod(api.GET, "/elasticsearch/:id/node/:node_id/logs/_list", handler.RequirePermission(handler.getLogFilesByNode, enum.PermissionAgentInstanceRead)) api.HandleAPIMethod(api.POST, "/elasticsearch/:id/node/:node_id/logs/_read", handler.RequirePermission(handler.getLogFileContent, enum.PermissionAgentInstanceRead)) - //api.HandleAPIMethod(api.POST, "/agent/install_command", handler.RequireLogin(handler.generateInstallCommand)) - //api.HandleAPIMethod(api.GET, "/agent/install.sh", handler.getInstallScript) - - server.RegisterConfigProvider(remoteConfigProvider) server.RegisterConfigProvider(dynamicAgentConfigProvider) } diff --git a/modules/agent/api/instance.go b/modules/agent/api/instance.go deleted file mode 100644 index 563ce8cf..00000000 --- a/modules/agent/api/instance.go +++ /dev/null @@ -1,14 +0,0 @@ -/* Copyright © INFINI Ltd. All rights reserved. - * Web: https://infinilabs.com - * Email: hello#infini.ltd */ - -package api - -import ( - "infini.sh/framework/core/api" -) - -type APIHandler struct { - api.Handler -} - diff --git a/modules/agent/api/remote_config.go b/modules/agent/api/remote_config.go new file mode 100644 index 00000000..e30fd7c2 --- /dev/null +++ b/modules/agent/api/remote_config.go @@ -0,0 +1,165 @@ +/* Copyright © INFINI LTD. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package api + +import ( + "bytes" + "fmt" + "infini.sh/framework/core/elastic" + "infini.sh/framework/core/model" + "infini.sh/framework/core/orm" + "infini.sh/framework/core/util" + common2 "infini.sh/framework/modules/elastic/common" + "infini.sh/framework/plugins/managed/common" + "time" +) + +type RemoteConfig struct { + orm.ORMObjectBase + Metadata model.Metadata `json:"metadata" elastic_mapping:"metadata: { type: object }"` + Payload common.ConfigFile `json:"payload" elastic_mapping:"payload: { type: object}"` +} + +func remoteConfigProvider(instance model.Instance) []*common.ConfigFile { + + //fetch configs from remote db + //fetch configs assigned to (instance=_all OR instance=$instance_id ) AND application.name=$application.name + + q := orm.Query{ + Size: 1000, + Conds: orm.And(orm.Eq("metadata.category", "app_settings"), + orm.Eq("metadata.name", instance.Application.Name), + orm.Eq("metadata.labels.instance", "_all"), + ), + } + + err, searchResult := orm.Search(RemoteConfig{}, &q) + if err != nil { + panic(err) + } + + result := []*common.ConfigFile{} + + for _, row := range searchResult.Result { + v, ok := row.(map[string]interface{}) + if ok { + x, ok := v["payload"] + if ok { + f, ok := x.(map[string]interface{}) + if ok { + name, ok := f["name"].(string) + if ok { + item := common.ConfigFile{} + item.Name = util.ToString(name) + item.Location = util.ToString(f["location"]) + item.Content = util.ToString(f["content"]) + item.Version, _ = util.ToInt64(util.ToString(f["version"])) + item.Size = int64(len(item.Content)) + item.Managed = true + t, ok := v["updated"] + if ok { + layout := "2006-01-02T15:04:05.999999-07:00" + t1, err := time.Parse(layout, util.ToString(t)) + if err == nil { + item.Updated = t1.Unix() + } + } + result = append(result, &item) + } + } + } + } + } + + return result +} + +func dynamicAgentConfigProvider(instance model.Instance) []*common.ConfigFile { + + //get config files from remote db + //get settings with this agent id + + result := []*common.ConfigFile{} + ids, err := GetEnrolledNodesByAgent(&instance) + if err != nil { + panic(err) + } + + var latestTimestamp int64 + for _, v := range ids { + if v.Updated > latestTimestamp { + latestTimestamp = v.Updated + } + } + + if len(ids) > 0 { + + cfg := common.ConfigFile{} + cfg.Name = "generated_metrics_tasks.yml" + cfg.Location = "generated_metrics_tasks.yml" + cfg.Content = getAgentIngestConfigs(ids) + cfg.Size = int64(len(cfg.Content)) + cfg.Version = latestTimestamp + cfg.Managed = true + cfg.Updated = latestTimestamp + result = append(result, &cfg) + } + + return result +} + +func getAgentIngestConfigs(items map[string]BindingItem) string { + + buffer := bytes.NewBuffer([]byte("configs.template:\n ")) + + for _, v := range items { + + if v.ClusterID == "" { + panic("cluster id is empty") + } + + metadata := elastic.GetMetadata(v.ClusterID) + var clusterLevelEnabled = false + var nodeLevelEnabled = true + var clusterEndPoint = metadata.Config.GetAnyEndpoint() + + var username = "" + var password = "" + + if metadata.Config.AgentCredentialID != "" { + credential, err := common2.GetCredential(metadata.Config.AgentCredentialID) + if err != nil { + panic(err) + } + var dv interface{} + dv, err = credential.Decode() + if err != nil { + panic(err) + } + if auth, ok := dv.(model.BasicAuth); ok { + username = auth.Username + password = auth.Password + } + } + + buffer.Write([]byte(fmt.Sprintf("- name: \"%v\"\n path: ./config/task_config.tpl\n "+ + "variable:\n "+ + "CLUSTER_ID: %v\n "+ + "CLUSTER_ENDPOINT: [\"%v\"]\n "+ + "CLUSTER_USERNAME: \"%v\"\n "+ + "CLUSTER_PASSWORD: \"%v\"\n "+ + "CLUSTER_LEVEL_TASKS_ENABLED: %v\n "+ + "NODE_LEVEL_TASKS_ENABLED: %v\n "+ + "NODE_LOGS_PATH: \"%v\"\n\n\n"+ + "#MANAGED_CONFIG_VERSION: %v\n"+ + "#MANAGED: true", + v.NodeUUID, v.ClusterID, clusterEndPoint, username, password, clusterLevelEnabled, nodeLevelEnabled, v.PathLogs, v.Updated))) + } + + //password: $[[keystore.$[[CLUSTER_ID]]_password]] + + return buffer.String() +} + diff --git a/modules/agent/api/tod.go b/modules/agent/api/tod.go deleted file mode 100644 index da6c172f..00000000 --- a/modules/agent/api/tod.go +++ /dev/null @@ -1,882 +0,0 @@ -/* Copyright © INFINI LTD. All rights reserved. - * Web: https://infinilabs.com - * Email: hello#infini.ltd */ - -package api - -import ( - "context" - "errors" - "fmt" - log "github.com/cihub/seelog" - common2 "infini.sh/console/modules/agent/common" - model3 "infini.sh/console/modules/agent/model" - httprouter "infini.sh/framework/core/api/router" - "infini.sh/framework/core/elastic" - "infini.sh/framework/core/global" - "infini.sh/framework/core/host" - "infini.sh/framework/core/model" - "infini.sh/framework/core/orm" - "infini.sh/framework/core/util" - "infini.sh/framework/modules/elastic/common" - "net/http" - "net/url" - "strings" - "sync" -) - -//func (h *APIHandler) updateInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { -// id := ps.MustGetParameter("instance_id") -// oldInst := model.Instance{} -// oldInst.ID = id -// _, err := orm.Get(&oldInst) -// if err != nil { -// if err == elastic2.ErrNotFound { -// h.WriteJSON(w, util.MapStr{ -// "_id": id, -// "result": "not_found", -// }, http.StatusNotFound) -// return -// } -// h.WriteError(w, err.Error(), http.StatusInternalServerError) -// log.Error(err) -// return -// } -// -// obj := model.Instance{} -// err = h.DecodeJSON(req, &obj) -// if err != nil { -// h.WriteError(w, err.Error(), http.StatusInternalServerError) -// log.Error(err) -// return -// } -// -// oldInst.Name = obj.Name -// oldInst.Endpoint = obj.Endpoint -// oldInst.Description = obj.Description -// oldInst.Tags = obj.Tags -// oldInst.BasicAuth = obj.BasicAuth -// err = orm.Update(&orm.Context{ -// Refresh: "wait_for", -// }, &oldInst) -// if err != nil { -// h.WriteError(w, err.Error(), http.StatusInternalServerError) -// log.Error(err) -// return -// } -// -// h.WriteJSON(w, util.MapStr{ -// "_id": obj.ID, -// "result": "updated", -// }, 200) -//} -// -//func (h *APIHandler) searchInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { -// -// var ( -// keyword = h.GetParameterOrDefault(req, "keyword", "") -// //queryDSL = `{"query":{"bool":{"must":[%s]}}, "size": %d, "from": %d}` -// strSize = h.GetParameterOrDefault(req, "size", "20") -// strFrom = h.GetParameterOrDefault(req, "from", "0") -// ) -// -// var ( -// mustQ []interface{} -// ) -// -// if keyword != "" { -// mustQ = append(mustQ, util.MapStr{ -// "query_string": util.MapStr{ -// "default_field": "*", -// "query": keyword, -// }, -// }) -// } -// size, _ := strconv.Atoi(strSize) -// if size <= 0 { -// size = 20 -// } -// from, _ := strconv.Atoi(strFrom) -// if from < 0 { -// from = 0 -// } -// -// queryDSL := util.MapStr{ -// "size": size, -// "from": from, -// } -// if len(mustQ) > 0 { -// queryDSL["query"] = util.MapStr{ -// "bool": util.MapStr{ -// "must": mustQ, -// }, -// } -// } -// -// q := orm.Query{} -// q.RawQuery = util.MustToJSONBytes(queryDSL) -// -// err, res := orm.Search(&model.Instance{}, &q) -// if err != nil { -// log.Error(err) -// h.WriteError(w, err.Error(), http.StatusInternalServerError) -// return -// } -// -// h.Write(w, res.Raw) -//} - -func (h *APIHandler) getESNodesInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - - id := ps.MustGetParameter("instance_id") - obj := model.Instance{} - obj.ID = id - exists, err := orm.Get(&obj) - if !exists || err != nil { - h.WriteJSON(w, util.MapStr{ - "_id": id, - "found": false, - }, http.StatusNotFound) - return - } - - nodes, err := refreshNodesInfo(&obj) - if err != nil { - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - - h.WriteJSON(w, nodes, http.StatusOK) -} - -func (h *APIHandler) authESNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - id := ps.MustGetParameter("instance_id") - inst := model.Instance{} - inst.ID = id - exists, err := orm.Get(&inst) - if !exists || err != nil { - h.WriteJSON(w, util.MapStr{ - "_id": id, - "found": false, - }, http.StatusNotFound) - return - } - reqBody := struct { - NodeID string `json:"node_id"` - ESConfig *elastic.ElasticsearchConfig `json:"es_config"` - }{} - err = h.DecodeJSON(req, &reqBody) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - - //node id maybe is missing - if reqBody.NodeID != "" { - //verify the node id, if the node is is actually the node of the instance - oldNodeInfo := &model.ESNodeInfo{ - ID: reqBody.NodeID, - } - exists, err = orm.Get(oldNodeInfo) - if !exists || err != nil { - h.WriteJSON(w, fmt.Sprintf("node [%s] of agent [%s] was not found", oldNodeInfo.ID, inst.Name), http.StatusInternalServerError) - return - } - } else { - //find out the node id with credentials - cfg := reqBody.ESConfig - if cfg.Endpoint == "" { - cfg.Endpoint = cfg.GetAnyEndpoint() - } - basicAuth, err := common.GetBasicAuth(cfg) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - cfg.BasicAuth = basicAuth - nodeInfo, err := AuthESNode(context.Background(), inst.GetEndpoint(), *cfg) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - - //host, port, err := net.SplitHostPort(nodeInfo.PublishAddress) - //if err != nil { - // log.Error(err) - // h.WriteError(w, err.Error(), http.StatusInternalServerError) - // return - //} - //if !util.StringInArray(inst.Network.IP, host) && !net.ParseIP(host).IsLoopback() { - // h.WriteError(w, fmt.Sprintf("got node host %s not match any ip of %v", host, inst.Network.IP), http.StatusInternalServerError) - // return - //} - //if oldNodeInfo.HttpPort != port { - // h.WriteError(w, fmt.Sprintf("port mismatch, got: %s,expected: %s", port, oldNodeInfo.HttpPort), http.StatusInternalServerError) - // return - //} - //if oldNodeInfo.ProcessInfo.PID != nodeInfo.ProcessInfo.PID { - // h.WriteError(w, fmt.Sprintf("process id mismatch, got: %d,expected: %d", nodeInfo.ProcessInfo.PID, oldNodeInfo.ProcessInfo.PID), http.StatusInternalServerError) - // return - //} - - reqBody.NodeID = nodeInfo.NodeUUID - } - - //nodeInfo:=elastic.NodeConfig{} - //nodeInfo.ID = reqBody.NodeID - //nodeInfo.AgentID = inst.ID - //err = orm.Update(nil, nodeInfo) //update node's info and agent_id - //if err != nil { - // log.Error(err) - // h.WriteError(w, err.Error(), http.StatusInternalServerError) - // return - //} - //h.WriteJSON(w, nodeInfo, http.StatusOK) -} - -func NewClusterSettings(clusterID string) *model.Setting { - settings := model.Setting{ - Metadata: model.Metadata{ - Category: Cluster, - }, - } - settings.ID = fmt.Sprintf("%v_%v_%v", settings.Metadata.Category, settings.Metadata.Name, clusterID) - - settings.Metadata.Labels = util.MapStr{ - "cluster_id": clusterID, - } - - return &settings -} - -func NewNodeAgentSettings(instanceID string, item *BindingItem) *model.Setting { - - settings := model.Setting{ - Metadata: model.Metadata{ - Category: Node, - Name: "agent", - }, - } - settings.ID = fmt.Sprintf("%v_%v_%v", settings.Metadata.Category, settings.Metadata.Name, item.NodeUUID) - - settings.Metadata.Labels = util.MapStr{ - "agent_id": instanceID, - } - - settings.Payload = util.MapStr{ - "cluster_id": item.ClusterID, - "cluster_name": item.ClusterName, - "cluster_uuid": item.ClusterUUID, - "node_uuid": item.NodeUUID, - "publish_address": item.PublishAddress, - "node_name": item.NodeName, - "path_home": item.PathHome, - "path_logs": item.PathLogs, - } - - return &settings -} - -func NewIndexSettings(clusterID, nodeID, agentID, indexName, indexID string) *model.Setting { - - settings := model.Setting{ - Metadata: model.Metadata{ - Category: Index, - }, - } - settings.ID = fmt.Sprintf("%v_%v_%v", settings.Metadata.Category, settings.Metadata.Name, nodeID) - - settings.Metadata.Labels = util.MapStr{ - "cluster_id": clusterID, - "node_id": nodeID, - "agent_id": agentID, - "index_name": indexName, - "index_id": indexID, - } - - return &settings -} - -const Cluster = "cluster_settings" -const Node = "node_settings" -const Index = "index_settings" - -func (h *APIHandler) revokeESNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - //agent id - instID := ps.MustGetParameter("instance_id") - item := BindingItem{} - err := h.DecodeJSON(req, &item) - if err != nil { - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - - settings := NewNodeAgentSettings(instID, &item) - err = orm.Delete(&orm.Context{ - Refresh: "wait_for", - }, settings) - if err != nil { - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - h.WriteAckOKJSON(w) -} - -func (h *APIHandler) enrollESNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - - //agent id - instID := ps.MustGetParameter("instance_id") - - //node id and cluster id - item := BindingItem{} - err := h.DecodeJSON(req, &item) - if err != nil { - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - - //update node's setting - settings := NewNodeAgentSettings(instID, &item) - err = orm.Update(&orm.Context{ - Refresh: "wait_for", - }, settings) - if err != nil { - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - - h.WriteAckOKJSON(w) -} - -func (h *APIHandler) autoAssociateESNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - reqBody := struct { - ClusterIDs []string `json:"cluster_ids"` - }{} - err := h.DecodeJSON(req, &reqBody) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - // query not associated nodes info - nodesM, err := getUnAssociateNodes() - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - if len(nodesM) == 0 { - h.WriteAckOKJSON(w) - return - } - agentIds := make([]string, 0, len(nodesM)) - for agentID := range nodesM { - agentIds = append(agentIds, agentID) - } - agents, err := getAgentByIds(agentIds) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - for _, clusterID := range reqBody.ClusterIDs { - // query cluster basicauth - cfg := elastic.GetConfig(clusterID) - basicAuth, err := common.GetBasicAuth(cfg) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - taskSetting, err := getSettingsByClusterID(cfg.ID) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - for agentID, nodes := range nodesM { - var ( - inst *model.Instance - ok bool - ) - if inst, ok = agents[agentID]; !ok { - log.Warnf("agent [%v] was not found", agentID) - continue - } - settings, err := common2.GetAgentSettings(agentID, 0) - if err != nil { - log.Error(err) - continue - } - for _, v := range nodes { - host := v.PublishAddress - var endpoint string - if strings.HasPrefix(host, "::") { //for ipv6 - instURL, err := url.Parse(inst.Endpoint) - if err != nil { - log.Error(err) - continue - } - host = instURL.Hostname() - endpoint = fmt.Sprintf("%s://[%s]:%s", v.Schema, host, v.HttpPort) - } else { - endpoint = fmt.Sprintf("%s://%s", v.Schema, host) - } - escfg := elastic.ElasticsearchConfig{ - Endpoint: endpoint, - BasicAuth: basicAuth, - } - nodeInfo, err := AuthESNode(context.Background(), inst.GetEndpoint(), escfg) - if err != nil { - log.Warn(err) - continue - } - //matched - if nodeInfo.ClusterUuid == cfg.ClusterUUID { - //update node info - nodeInfo.ID = v.ID - nodeInfo.AgentID = inst.ID - nodeInfo.ClusterID = cfg.ID - err = orm.Save(nil, nodeInfo) - if err != nil { - log.Error(err) - continue - } - setting := pickAgentSettings(settings, v) - if setting == nil { - tsetting := model3.TaskSetting{ - NodeStats: &model3.NodeStatsTask{ - Enabled: true, - }, - Logs: &model3.LogsTask{ - Enabled: true, - LogsPath: nodeInfo.Path.Logs, - }, - } - if taskSetting.IndexStats != nil { - tsetting.IndexStats = taskSetting.IndexStats - taskSetting.IndexStats = nil - } - if taskSetting.ClusterHealth != nil { - tsetting.ClusterHealth = taskSetting.ClusterHealth - taskSetting.ClusterHealth = nil - } - if taskSetting.ClusterStats != nil { - tsetting.ClusterStats = taskSetting.ClusterStats - taskSetting.ClusterStats = nil - } - setting = &model.Setting{ - Metadata: model.Metadata{ - Category: "agent", - Name: "task", - Labels: util.MapStr{ - "agent_id": agentID, - "cluster_uuid": nodeInfo.ClusterUuid, - "cluster_id": nodeInfo.ClusterID, - "node_uuid": nodeInfo.NodeUUID, - "endpoint": fmt.Sprintf("%s://%s", nodeInfo.Schema, nodeInfo.PublishAddress), - }, - }, - Payload: util.MapStr{ - "task": tsetting, - }, - } - err = orm.Create(nil, setting) - if err != nil { - log.Error("save agent task setting error: ", err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - } - } - } - - } - } - h.WriteAckOKJSON(w) -} - -func getAgentByIds(agentIDs []string) (map[string]*model.Instance, error) { - query := util.MapStr{ - "size": len(agentIDs), - "query": util.MapStr{ - "terms": util.MapStr{ - "id": agentIDs, - }, - }, - } - q := orm.Query{ - RawQuery: util.MustToJSONBytes(query), - } - err, result := orm.Search(model.Instance{}, &q) - if err != nil { - return nil, err - } - agents := map[string]*model.Instance{} - for _, row := range result.Result { - inst := model.Instance{} - buf := util.MustToJSONBytes(row) - util.MustFromJSONBytes(buf, &inst) - agents[inst.ID] = &inst - } - return agents, nil -} - -func (h *APIHandler) deleteESNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - id := ps.MustGetParameter("instance_id") - nodeIDs := []string{} - err := h.DecodeJSON(req, &nodeIDs) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - if len(nodeIDs) > 0 { - q := util.MapStr{ - "query": util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "terms": util.MapStr{ - "id": nodeIDs, - }, - }, - { - "term": util.MapStr{ - "agent_id": util.MapStr{ - "value": id, - }, - }, - }, - }, - }, - }, - } - err = orm.DeleteBy(model.ESNodeInfo{}, util.MustToJSONBytes(q)) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - q = util.MapStr{ - "query": util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "terms": util.MapStr{ - "metadata.labels.node_uuid": nodeIDs, - }, - }, - { - "term": util.MapStr{ - "metadata.labels.agent_id": util.MapStr{ - "value": id, - }, - }, - }, - }, - }, - }, - } - } - h.WriteAckOKJSON(w) -} - -// -//func (h *APIHandler) tryConnect(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { -// var reqBody = struct { -// Endpoint string `json:"endpoint"` -// BasicAuth model.BasicAuth -// }{} -// err := h.DecodeJSON(req, &reqBody) -// if err != nil { -// h.WriteError(w, err.Error(), http.StatusInternalServerError) -// return -// } -// connectRes, err := client.GetClient().GetInstanceBasicInfo(context.Background(), reqBody.Endpoint) -// if err != nil { -// h.WriteError(w, err.Error(), http.StatusInternalServerError) -// return -// } -// h.WriteJSON(w, connectRes, http.StatusOK) -//} - -func pickAgentSettings(settings []model.Setting, nodeInfo model.ESNodeInfo) *model.Setting { - for _, setting := range settings { - if setting.Metadata.Labels["node_uuid"] == nodeInfo.NodeUUID { - return &setting - } - } - return nil -} - -func getAgentTaskSetting(agentID string, v model.ESNodeInfo) (*model.Setting, error) { - taskSetting, err := getSettingsByClusterID(v.ClusterID) - if err != nil { - return nil, err - } - taskSetting.Logs = &model3.LogsTask{ - Enabled: true, - LogsPath: v.Path.Logs, - } - return &model.Setting{ - Metadata: model.Metadata{ - Category: "agent", - Name: "task", - Labels: util.MapStr{ - "agent_id": agentID, - "cluster_uuid": v.ClusterUuid, - "cluster_id": v.ClusterID, - "node_uuid": v.NodeUUID, - "endpoint": fmt.Sprintf("%s://%s", v.Schema, v.PublishAddress), - }, - }, - Payload: util.MapStr{ - "task": taskSetting, - }, - }, nil -} - -// getSettingsByClusterID query agent task settings with cluster id -func getSettingsByClusterID(clusterID string) (*model3.TaskSetting, error) { - err, result := querySettingsByClusterID(clusterID) - if err != nil { - return nil, err - } - - setting := &model3.TaskSetting{ - NodeStats: &model3.NodeStatsTask{ - Enabled: true, - }, - } - var ( - clusterStats = true - indexStats = true - clusterHealth = true - ) - keys := []string{"payload.task.cluster_stats.enabled", "payload.task.cluster_health.enabled", "payload.task.index_stats.enabled"} - for _, row := range result.Result { - if v, ok := row.(map[string]interface{}); ok { - vm := util.MapStr(v) - for _, key := range keys { - tv, _ := vm.GetValue(key) - if tv == true { - switch key { - case "payload.task.cluster_stats.enabled": - clusterStats = false - case "payload.task.index_stats.enabled": - indexStats = false - case "payload.task.cluster_health.enabled": - clusterHealth = false - } - } - } - } - } - if clusterStats { - setting.ClusterStats = &model3.ClusterStatsTask{ - Enabled: true, - } - } - if indexStats { - setting.IndexStats = &model3.IndexStatsTask{ - Enabled: true, - } - } - if clusterHealth { - setting.ClusterHealth = &model3.ClusterHealthTask{ - Enabled: true, - } - } - return setting, nil -} - -func querySettingsByClusterID(clusterID string) (error, orm.Result) { - queryDsl := util.MapStr{ - "size": 500, - "query": util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "metadata.labels.cluster_id": util.MapStr{ - "value": clusterID, - }, - }, - }, - }, - "minimum_should_match": 1, - "should": []util.MapStr{ - { - "term": util.MapStr{ - "payload.task.cluster_health.enabled": util.MapStr{ - "value": true, - }, - }, - }, - { - "term": util.MapStr{ - "payload.task.cluster_stats.enabled": util.MapStr{ - "value": true, - }, - }, - }, - { - "term": util.MapStr{ - "payload.task.index_stats.enabled": util.MapStr{ - "value": true, - }, - }, - }, - }, - }, - }, - } - q := orm.Query{ - RawQuery: util.MustToJSONBytes(queryDsl), - } - return orm.Search(model.Setting{}, &q) -} - -func GetHostInfo(ctx context.Context, agentBaseURL string) (*host.HostInfo, error) { - req := &util.Request{ - Method: http.MethodGet, - Path: "/agent/host/_basic", - Context: ctx, - } - resBody := struct { - Success bool `json:"success"` - Error string `json:"error"` - HostInfo *host.HostInfo `json:"result"` - }{} - - req.Body = util.MustToJSONBytes(resBody) - - err := DoRequest(req, &resBody) - if err != nil { - return nil, err - } - - if resBody.Success != true { - return nil, fmt.Errorf("enroll error from client: %v", resBody.Error) - } - return resBody.HostInfo, nil -} - -func GetElasticProcess(ctx context.Context, agentBaseURL string, agentID string) (interface{}, error) { - req := &util.Request{ - Method: http.MethodGet, - Path: fmt.Sprintf("/elasticsearch/%s/process/_elastic", agentID), - Context: ctx, - } - resBody := map[string]interface{}{} - err := DoRequest(req, &resBody) - if err != nil { - return nil, err - } - if resBody["success"] != true { - return nil, fmt.Errorf("discover host callback error: %v", resBody["error"]) - } - return resBody["elastic_process"], nil -} - -func GetInstanceBasicInfo(ctx context.Context, agentBaseURL string) (*model.Instance, error) { - req := &util.Request{ - Method: http.MethodGet, - Path: "/_info", - Context: ctx, - } - resBody := &model.Instance{} - err := DoRequest(req, &resBody) - return resBody, err -} - -func RegisterElasticsearch(ctx context.Context, agentBaseURL string, cfgs []elastic.ElasticsearchConfig) error { - reqBody, err := util.ToJSONBytes(cfgs) - if err != nil { - return err - } - req := &util.Request{ - Method: http.MethodPost, - Path: "/elasticsearch/_register", - Context: ctx, - Body: reqBody, - } - resBody := util.MapStr{} - err = DoRequest(req, &resBody) - if err != nil { - return err - } - if resBody["acknowledged"] != true { - return fmt.Errorf("%v", resBody["error"]) - } - return nil -} - -func CreatePipeline(ctx context.Context, agentBaseURL string, body []byte) error { - req := &util.Request{ - Method: http.MethodPost, - Path: "/pipeline/tasks/", - Body: body, - Context: ctx, - } - resBody := util.MapStr{} - return DoRequest(req, &resBody) -} - -func DeletePipeline(ctx context.Context, agentBaseURL, pipelineID string) error { - req := &util.Request{ - Method: http.MethodDelete, - Path: fmt.Sprintf("/pipeline/task/%s", pipelineID), - Context: ctx, - } - return DoRequest(req, nil) -} - -func DoRequest(req *util.Request, obj interface{}) error { - panic("implement me") -} - -var mTLSClient *http.Client //TODO get mTLSClient -var initOnce = sync.Once{} - -func doRequest(instance *model.Instance, req *util.Request, obj interface{}) error { - var err error - var res *util.Result - - initOnce.Do(func() { - if global.Env().SystemConfig.Configs.TLSConfig.TLSEnabled && global.Env().SystemConfig.Configs.TLSConfig.TLSCAFile != "" { - - //init client - hClient, err := util.NewMTLSClient( - global.Env().SystemConfig.Configs.TLSConfig.TLSCAFile, - global.Env().SystemConfig.Configs.TLSConfig.TLSCertFile, - global.Env().SystemConfig.Configs.TLSConfig.TLSKeyFile) - if err != nil { - panic(err) - } - mTLSClient = hClient - } - }) - - req.Url, err = url.JoinPath(instance.GetEndpoint(), req.Path) - res, err = util.ExecuteRequestWithCatchFlag(mTLSClient, req, true) - if err != nil || res.StatusCode != 200 { - body := "" - if res != nil { - body = string(res.Body) - } - return errors.New(fmt.Sprintf("request error: %v, %v", err, body)) - } - - if res != nil { - if res.Body != nil { - return util.FromJSONBytes(res.Body, obj) - } - } - - return nil -} diff --git a/modules/agent/common/helper.go b/modules/agent/common/helper.go deleted file mode 100644 index 7baa7952..00000000 --- a/modules/agent/common/helper.go +++ /dev/null @@ -1,455 +0,0 @@ -/* Copyright © INFINI Ltd. All rights reserved. - * Web: https://infinilabs.com - * Email: hello#infini.ltd */ - -package common - -import ( - "fmt" - log "github.com/cihub/seelog" - model2 "infini.sh/console/modules/agent/model" - "infini.sh/framework/core/credential" - "infini.sh/framework/core/elastic" - "infini.sh/framework/core/event" - "infini.sh/framework/core/global" - "infini.sh/framework/core/model" - "infini.sh/framework/core/orm" - "infini.sh/framework/core/util" - "strings" -) - -func ParseAgentSettings(settings []model.Setting)(*model2.ParseAgentSettingsResult, error){ - var clusterCfgs []elastic.ElasticsearchConfig - var ( - pipelines []util.MapStr - toDeletePipelineNames []string - ) - for _, setting := range settings { - if setting.Metadata.Labels == nil { - return nil, fmt.Errorf("empty metadata labels of setting [%s]", setting.ID) - } - var ( - clusterID string - ok bool - ) - nodeUUID := util.ToString(setting.Metadata.Labels["node_uuid"]) - if clusterID, ok = setting.Metadata.Labels["cluster_id"].(string); ok && clusterID != ""{ - cfg := elastic.GetConfig(clusterID) - newID := getClusterConfigReferenceName(clusterID, nodeUUID) - newCfg := elastic.ElasticsearchConfig{ - Enabled: true, - Name: newID, - BasicAuth: cfg.BasicAuth, - //todo get endpoint from agent node info - Endpoint: setting.Metadata.Labels["endpoint"].(string), - ClusterUUID: cfg.ClusterUUID, - } - newCfg.ID = newID - clusterCfgs = append(clusterCfgs, newCfg) - }else{ - return nil, fmt.Errorf("got wrong cluster id [%v] from metadata labels", setting.Metadata.Labels["cluster_id"]) - } - - taskCfg, err := util.MapStr(setting.Payload).GetValue("task") - if err != nil { - return nil, err - } - vBytes, err := util.ToJSONBytes(taskCfg) - if err != nil { - return nil, err - } - taskSetting := model2.TaskSetting{} - err = util.FromJSONBytes(vBytes, &taskSetting) - if err != nil { - return nil, err - } - partPipelines, partDeletePipelineNames, err := TransformSettingsToConfig(&taskSetting, clusterID, nodeUUID) - if err != nil { - return nil, err - } - pipelines = append(pipelines, partPipelines...) - toDeletePipelineNames = append(toDeletePipelineNames, partDeletePipelineNames...) - } - return &model2.ParseAgentSettingsResult{ - ClusterConfigs: clusterCfgs, - Pipelines: pipelines, - ToDeletePipelineNames: toDeletePipelineNames, - }, nil -} - -// GetAgentSettings query agent setting by agent id and updated timestamp, -// if there has any setting was updated, then return setting list includes settings not changed, -// otherwise return empty setting list -func GetAgentSettings(agentID string, timestamp int64) ([]model.Setting, error) { - query := util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "metadata.category": util.MapStr{ - "value": "agent", - }, - }, - }, - { - "term": util.MapStr{ - "metadata.name": util.MapStr{ - "value": "task", - }, - }, - }, - { - "term": util.MapStr{ - "metadata.labels.agent_id": util.MapStr{ - "value": agentID, - }, - }, - }, - //{ - // "range": util.MapStr{ - // "updated": util.MapStr{ - // "gt": timestamp, - // }, - // }, - //}, - }, - }, - } - queryDsl := util.MapStr{ - "size": 1000, - "query": query, - } - q := orm.Query{ - RawQuery: util.MustToJSONBytes(queryDsl), - } - err, result := orm.Search(model.Setting{}, &q) - if err != nil { - return nil, fmt.Errorf("search settings error: %w", err) - } - if len(result.Result) == 0 { - return nil, nil - } - var ( - settings []model.Setting - hasUpdated bool - ) - for _, row := range result.Result { - setting := model.Setting{} - buf, err := util.ToJSONBytes(row) - if err != nil { - return nil, err - } - err = util.FromJSONBytes(buf, &setting) - if err != nil { - return nil, err - } - if setting.Updated != nil && setting.Updated.UnixMilli() > timestamp { - hasUpdated = true - } - settings = append(settings, setting) - } - if !hasUpdated { - return nil, nil - } - return settings, nil -} - -func getClusterConfigReferenceName(clusterID, nodeUUID string) string { - return fmt.Sprintf("%s_%s", clusterID, nodeUUID) -} - -func TransformSettingsToConfig(setting *model2.TaskSetting, clusterID, nodeUUID string) ([]util.MapStr, []string, error) { - if setting == nil { - return nil, nil, fmt.Errorf("empty setting") - } - var ( - pipelines []util.MapStr - toDeletePipelineNames []string - ) - if setting.ClusterStats != nil { - var processorName = "es_cluster_stats" - if setting.ClusterStats.Enabled { - pipelineCfg, err := newClusterMetricPipeline(processorName, clusterID, nodeUUID) - if err != nil { - return nil, nil, err - } - pipelines = append(pipelines, pipelineCfg) - }else{ - toDeletePipelineNames = append(toDeletePipelineNames, getMetricPipelineName(clusterID, processorName)) - } - } - if setting.IndexStats != nil { - var processorName = "es_index_stats" - if setting.IndexStats.Enabled { - pipelineCfg, err := newClusterMetricPipeline(processorName, clusterID, nodeUUID) - if err != nil { - return nil, nil, err - } - pipelines = append(pipelines, pipelineCfg) - }else{ - toDeletePipelineNames = append(toDeletePipelineNames, getMetricPipelineName(clusterID, processorName)) - } - } - if setting.ClusterHealth != nil { - var processorName = "es_cluster_health" - if setting.ClusterHealth.Enabled { - pipelineCfg, err := newClusterMetricPipeline(processorName, clusterID, nodeUUID) - if err != nil { - return nil, nil, err - } - pipelines = append(pipelines, pipelineCfg) - }else{ - toDeletePipelineNames = append(toDeletePipelineNames, getMetricPipelineName(clusterID, processorName)) - } - } - if setting.NodeStats != nil { - var processorName = "es_node_stats" - if setting.NodeStats.Enabled { - params := util.MapStr{ - "elasticsearch": getClusterConfigReferenceName(clusterID, nodeUUID), - "labels": util.MapStr{ - "cluster_id": clusterID, - }, - } - if len(setting.NodeStats.NodeIDs) > 0{ - params["node_uuids"] = setting.NodeStats.NodeIDs - } - cfg := util.MapStr{ - processorName: params, - } - enabled := true - pipelineCfg := util.MapStr{ - "enabled": &enabled, - "name": getMetricPipelineName(nodeUUID, processorName), - "auto_start": true, - "keep_running": true, - "retry_delay_in_ms": 10000, - "processor": []util.MapStr{cfg}, - } - pipelines = append(pipelines, pipelineCfg) - }else{ - toDeletePipelineNames = append(toDeletePipelineNames, getMetricPipelineName(nodeUUID, processorName)) - } - } - if setting.Logs != nil { - var processorName = "es_logs_processor" - if setting.Logs.Enabled { - params := util.MapStr{ - "elasticsearch": getClusterConfigReferenceName(clusterID, nodeUUID), - "queue_name": "logs", - "labels": util.MapStr{ - "cluster_id": clusterID, - }, - } - if setting.Logs.LogsPath != "" { - params["logs_path"] = setting.Logs.LogsPath - } - cfg := util.MapStr{ - processorName: params, - } - enabled := true - pipelineCfg := util.MapStr{ - "enabled": &enabled, - "name": fmt.Sprintf("collect_%s_es_logs", nodeUUID), - "auto_start": true, - "keep_running": true, - "retry_delay_in_ms": 3000, - "processor": []util.MapStr{cfg}, - } - pipelines = append(pipelines, pipelineCfg) - } - } - return pipelines, toDeletePipelineNames, nil -} - - -func newClusterMetricPipeline(processorName string, clusterID string, nodeUUID string)(util.MapStr, error){ - referName := getClusterConfigReferenceName(clusterID, nodeUUID) - cfg := util.MapStr{ - processorName: util.MapStr{ - "elasticsearch": referName, - "labels": util.MapStr{ - "cluster_id": clusterID, - }, - }, - } - enabled := true - pipelineCfg := util.MapStr{ - "enabled": &enabled, - "name": getMetricPipelineName(clusterID, processorName), - "auto_start": true, - "keep_running": true, - "singleton": true, - "retry_delay_in_ms": 10000, - "processor": []util.MapStr{cfg}, - } - return pipelineCfg, nil -} - -func getMetricPipelineName(clusterID, processorName string) string{ - return fmt.Sprintf("collect_%s_%s", clusterID, processorName) -} - - -func LoadAgentsFromES(clusterID string) ([]model.Instance, error) { - q := orm.Query{ - Size: 1000, - } - if clusterID != "" { - q.Conds = orm.And(orm.Eq("id", clusterID)) - } - err, result := orm.Search(model.Instance{}, &q) - if err != nil { - return nil, fmt.Errorf("query agent error: %w", err) - } - - if len(result.Result) > 0 { - var agents = make([]model.Instance, 0, len(result.Result)) - for _, row := range result.Result { - ag := model.Instance{} - bytes := util.MustToJSONBytes(row) - err = util.FromJSONBytes(bytes, &ag) - if err != nil { - log.Errorf("got unexpected agent: %s, error: %v", string(bytes), err) - continue - } - agents = append(agents, ag) - } - return agents, nil - } - return nil, nil -} - -func GetLatestOnlineAgentIDs(agentIds []string, lastSeconds int) (map[string]struct{}, error) { - q := orm.Query{ - WildcardIndex: true, - } - mustQ := []util.MapStr{ - { - "term": util.MapStr{ - "metadata.name": util.MapStr{ - "value": "agent", - }, - }, - }, - { - "term": util.MapStr{ - "metadata.category": util.MapStr{ - "value": "instance", - }, - }, - }, - } - if len(agentIds) > 0 { - mustQ = append(mustQ, util.MapStr{ - "terms": util.MapStr{ - "agent.id": agentIds, - }, - }) - } - queryDSL := util.MapStr{ - "sort": []util.MapStr{ - { - "timestamp": util.MapStr{ - "order": "desc", - }, - }, - }, - "collapse": util.MapStr{ - "field": "agent.id", - }, - "query": util.MapStr{ - "bool": util.MapStr{ - "filter": []util.MapStr{ - { - "range": util.MapStr{ - "timestamp": util.MapStr{ - "gte": fmt.Sprintf("now-%ds", lastSeconds), - }, - }, - }, - }, - "must": mustQ, - }, - }, - } - if len(agentIds) == 0 { - queryDSL["size"] = 2000 - }else{ - queryDSL["size"] = len(agentIds) - } - q.RawQuery = util.MustToJSONBytes(queryDSL) - err, result := orm.Search(event.Event{}, &q) - if err != nil { - return nil, fmt.Errorf("query agent instance metric error: %w", err) - } - agentIDs := map[string]struct{}{} - if len(result.Result) > 0 { - searchRes := elastic.SearchResponse{} - err = util.FromJSONBytes(result.Raw, &searchRes) - if err != nil { - return nil, err - } - agentIDKeyPath := []string{"agent", "id"} - for _, hit := range searchRes.Hits.Hits { - agentID, _ := util.GetMapValueByKeys(agentIDKeyPath, hit.Source) - if v, ok := agentID.(string); ok { - agentIDs[v] = struct{}{} - } - } - } - return agentIDs, nil -} - -func GetAgentIngestConfig() (string, *model.BasicAuth, error) { - agCfg := GetAgentConfig() - var ( - endpoint string - ok bool - ) - emptyIngestClusterEndpoint := false - if agCfg.Setup.IngestClusterEndpoint == nil { - emptyIngestClusterEndpoint = true - } - if endpoint, ok = agCfg.Setup.IngestClusterEndpoint.(string);ok { - if endpoint = strings.TrimSpace(endpoint); endpoint == "" { - emptyIngestClusterEndpoint = true - } - } - if emptyIngestClusterEndpoint { - cfg := elastic.GetConfig(global.MustLookupString(elastic.GlobalSystemElasticsearchID)) - endpoint = cfg.GetAnyEndpoint() - } - - var ( - basicAuth model.BasicAuth - ) - if agCfg.Setup.IngestClusterCredentialID != "" { - cred := credential.Credential{} - cred.ID = agCfg.Setup.IngestClusterCredentialID - _, err := orm.Get(&cred) - if err != nil { - return "", nil, fmt.Errorf("query credential [%s] error: %w", cred.ID, err) - } - info, err := cred.Decode() - if err != nil { - return "", nil, fmt.Errorf("decode credential [%s] error: %w", cred.ID, err) - } - if basicAuth, ok = info.(model.BasicAuth); !ok { - log.Debug("invalid credential: ", cred) - } - }else{ - cfg := elastic.GetConfig(global.MustLookupString(elastic.GlobalSystemElasticsearchID)) - basicAuth = *cfg.BasicAuth - } - tpl := `configs.template: - - name: "ingest" - path: ./config/ingest_config.tpl - variable: - INGEST_CLUSTER_ID: "default_ingest_cluster" - INGEST_CLUSTER_ENDPOINT: ["%s"] - INGEST_CLUSTER_USERNAME: "%s" -` - tpl = fmt.Sprintf(tpl, endpoint, basicAuth.Username) - return tpl, &basicAuth, nil -} \ No newline at end of file diff --git a/modules/agent/model/config.go b/modules/agent/model/config.go index d731a3f8..d20526b3 100644 --- a/modules/agent/model/config.go +++ b/modules/agent/model/config.go @@ -6,9 +6,6 @@ package model type AgentConfig struct { Enabled bool `config:"enabled"` - StateManager struct { - Enabled bool `config:"enabled"` - } `config:"state_manager"` Setup *SetupConfig `config:"setup"` } @@ -18,7 +15,5 @@ type SetupConfig struct { CACertFile string `config:"ca_cert"` CAKeyFile string `config:"ca_key"` ConsoleEndpoint string `config:"console_endpoint"` - IngestClusterEndpoint interface{} `config:"ingest_cluster_endpoint"` - IngestClusterCredentialID string `config:"ingest_cluster_credential_id"` Port string `config:"port"` } diff --git a/modules/agent/model/task.go b/modules/agent/model/task.go deleted file mode 100644 index eed98ee4..00000000 --- a/modules/agent/model/task.go +++ /dev/null @@ -1,46 +0,0 @@ -/* Copyright © INFINI Ltd. All rights reserved. - * Web: https://infinilabs.com - * Email: hello#infini.ltd */ - -package model - -import ( - "infini.sh/framework/core/elastic" - "infini.sh/framework/core/util" -) - -type TaskSetting struct { - ClusterHealth *ClusterHealthTask `json:"cluster_health,omitempty"` - ClusterStats *ClusterStatsTask `json:"cluster_stats,omitempty"` - IndexStats *IndexStatsTask `json:"index_stats,omitempty"` - NodeStats *NodeStatsTask `json:"node_stats,omitempty"` - Logs *LogsTask `json:"logs,omitempty"` -} - -type ClusterHealthTask struct { - Enabled bool `json:"enabled"` -} - -type ClusterStatsTask struct { - Enabled bool `json:"enabled"` -} - -type IndexStatsTask struct { - Enabled bool `json:"enabled"` -} - -type NodeStatsTask struct { - Enabled bool `json:"enabled"` - NodeIDs []string `json:"node_ids,omitempty"` -} - -type LogsTask struct { - Enabled bool `json:"enabled"` - LogsPath string `json:"logs_path"` -} - -type ParseAgentSettingsResult struct { - ClusterConfigs []elastic.ElasticsearchConfig - Pipelines []util.MapStr - ToDeletePipelineNames []string -}