diff --git a/config/install_agent.tpl b/config/install_agent.tpl new file mode 100644 index 00000000..e9fa4676 --- /dev/null +++ b/config/install_agent.tpl @@ -0,0 +1,250 @@ +#!/bin/bash +# Agent install script for UNIX-like OS +# Author: INFINI +# BASE_URL : need, download server address,eg: https://release.infinilabs.com/agent/stable +# AGENT_VER : need, Agent version, eg: 0.4.0-126 +# INSTALL_PATH : option, download path. eg: /home/user/infini default: /opt +# ES_NAME +# ES_PWD + +printf "\n* _ ___ __ __ _____ " +printf "\n* /_\\ / _ \\ /__\\/\\ \\ \\/__ \\" +printf "\n* //_\\\\ / /_\\//_\\ / \\/ / / /\\/" +printf "\n* / _ \\/ /_\\\\//__/ /\\ / / / " +printf "\n* \\_/ \\_/\\____/\\__/\\_\\ \\/ \\/ \n\n" +# detect root user +if [ "$(echo "$UID")" = "0" ]; then + sudo_cmd='' +else + sudo_cmd='sudo' +fi + +################## +# colors +################## +RED="\033[31m" +CLR="\033[0m" +GREEN="\033[32m" + +################## +# validate os & arch +################## + +arch= +case $(uname -m) in + + "x86_64") + arch="amd64" + ;; + + "i386" | "i686") + arch="386" + ;; + + "aarch64") + arch="arm64" + ;; + + "arm" | "armv7l") + arch="arm" + ;; + + "arm64") + arch="arm64" + ;; + + *) + # shellcheck disable=SC2059 + printf "${RED}[E] Unsupport arch $(uname -m) ${CLR}\n" + exit 1 + ;; +esac + +os="linux" + +if [[ "$OSTYPE" == "darwin"* ]]; then + if [[ $arch != "amd64" ]] && [[ $arch != "arm64" ]]; then # Darwin only support amd64 and arm64 + # shellcheck disable=SC2059 + printf "${RED}[E] Darwin only support amd64/arm64.${CLR}\n" + exit 1; + fi + + os="mac" + + # # NOTE: under darwin, for arm64 and amd64, both use amd64 + # arch="arm" +fi + +################## +# validate params +################## + +base_url="{{base_url}}" +if [ -n "$BASE_URL" ]; then + base_url=$BASE_URL +fi + +agent_ver="{{agent_version}}" +if [ -n "$AGENT_VER" ]; then + agent_ver=$AGENT_VER +fi + +ca_crt="{{ca_crt}}" +client_crt="{{client_crt}}" +client_key="{{client_key}}" + +################## +# download agent +################## + +suffix="tar.gz" +if [[ "$os" == "mac" ]]; then + suffix="zip" +fi + +download_url="${base_url}/agent-${agent_ver}-${os}-${arch}.${suffix}" + +install_path="/opt" +if [ -n "$INSTALL_PATH" ]; then + install_path=$INSTALL_PATH +fi + +file_name="agent-${agent_ver}-${os}-${arch}.${suffix}" #agent在服务器上的文件名 +agent="${install_path}/agent/${file_name}" #agent下载后保存的文件 +agent_exc="${install_path}/agent/agent-${os}-${arch}" #agent可执行文件 + +agent_exsit="true" +if [ ! -d "${install_path}/agent" ]; then + printf "\n* mkdir ${install_path}/agent" + $sudo_cmd mkdir "${install_path}/agent" + agent_exsit="false" +fi + +if [ $? -ne 0 ]; then + exit 1 +fi + +printf "\n* downloading ${download_url}\n" + +printf "\n* save to : ${agent}\n" + +cd "$install_path/agent" + +sudo curl -O --progress-bar $download_url + +if [ $? -ne 0 ]; then + exit 1 +fi + +printf "\n* downloaded: ${agent}" + +################## +# install agent +################## + +printf "\n* start install" + +if [[ "${suffix}" == "zip" ]]; then + printf "\n* uzip ${agent}\n" + $sudo_cmd unzip $agent +else + printf "\n* tar -xzvf ${agent}\n" + $sudo_cmd tar -xzvf $agent +fi + +if [ $? -ne 0 ]; then + exit 1 +fi + +rm -f ${agent} + +################## +# save cert +################## +$sudo_cmd mkdir config +$sudo_cmd sh -c "echo '${ca_crt}' > ./config/ca.crt" +$sudo_cmd sh -c "echo '${client_crt}' > ./config/client.crt" +$sudo_cmd sh -c "echo '${client_key}' > ./config/client.key" +if [ $? -ne 0 ]; then + exit 1 +fi + +port="{{port}}" + +## generate agent.yml +agent_config="path.configs: "config" +configs.auto_reload: true +env: + API_BINDING: "0.0.0.0:${port}" + +path.data: data +path.logs: log + +api: + enabled: true + tls: + enabled: true + cert_file: "${install_path}/agent/config/client.crt" + key_file: "${install_path}/agent/config/client.key" + ca_file: "${install_path}/agent/config/ca.crt" + skip_insecure_verify: false + network: + binding: \$[[env.API_BINDING]] + +badger: + value_log_max_entries: 1000000 + value_log_file_size: 104857600 + value_threshold: 1024 +agent: + major_ip_pattern: "192.*" +" + +agent_yml_path="${install_path}/agent/agent.yml" + +$sudo_cmd rm $agent_yml_path +$sudo_cmd touch $agent_yml_path +$sudo_cmd sh -c "echo '${agent_config}' > $agent_yml_path" + +if [ $? -ne 0 ]; then + exit 1 +fi + +$sudo_cmd chmod +x $agent_exc + +#try to stop and uninstall service +if [[ "$agent_exsit" == "true" ]]; then + printf "\n* stop && uninstall service\n" + $sudo_cmd $agent_exc -service stop + $sudo_cmd $agent_exc -service uninstall +fi + +printf "\n* start install service\n" +$sudo_cmd $agent_exc -service install + +if [ $? -ne 0 ]; then + exit 1 +fi + +printf "\n* service installed\n" +printf "\n* service starting >>>>>>\n" +$sudo_cmd $agent_exc -service start + +if [ $? -ne 0 ]; then + exit 1 +fi + +printf "\n* agent service started" +console_endpoint="{{console_endpoint}}" +sleep 3 +printf "\n* start register\n" +token={{token}} +curl -X POST ${console_endpoint}/agent/instance?token=${token} + +if [ $? -ne 0 ]; then + exit 1 +fi +printf "\n* agent registered\n" + +printf "\n* ${GREEN}Congratulations, install success!${CLR}\n\n" + + diff --git a/modules/agent/agent.go b/modules/agent/agent.go index 1461856e..9e8c3ee5 100644 --- a/modules/agent/agent.go +++ b/modules/agent/agent.go @@ -7,7 +7,10 @@ package agent import ( log "github.com/cihub/seelog" "infini.sh/console/modules/agent/api" + "infini.sh/console/modules/agent/client" "infini.sh/console/modules/agent/common" + "infini.sh/console/modules/agent/model" + "infini.sh/console/modules/agent/state" "infini.sh/framework/core/agent" "infini.sh/framework/core/env" "infini.sh/framework/core/host" @@ -38,7 +41,22 @@ func (module *AgentModule) Start() error { orm.RegisterSchemaWithIndexName(agent.ESNodeInfo{}, "agent-node") orm.RegisterSchemaWithIndexName(host.HostInfo{}, "host") orm.RegisterSchemaWithIndexName(agent.Setting{}, "agent-setting") - common.RegisterClient(&common.Client{}) + var ( + executor client.Executor + err error + ) + if module.AgentConfig.Setup == nil { + executor = &client.HttpExecutor{} + }else{ + executor, err = client.NewMTLSExecutor(module.AgentConfig.Setup.CACertFile, module.AgentConfig.Setup.CAKeyFile) + if err != nil { + panic(err) + } + } + agClient := &client.Client{ + Executor: executor, + } + client.RegisterClient(agClient) if module.AgentConfig.StateManager.Enabled { onlineAgentIDs, err := common.GetLatestOnlineAgentIDs(nil, 60) @@ -56,8 +74,8 @@ func (module *AgentModule) Start() error { } } - sm := common.NewStateManager(time.Second*30, "agent_state", agentIds) - common.RegisterStateManager(sm) + sm := state.NewStateManager(time.Second*30, "agent_state", agentIds, agClient) + state.RegisterStateManager(sm) go sm.LoopState() } return nil @@ -69,12 +87,12 @@ func (module *AgentModule) Stop() error { } log.Info("start to stop agent module") if module.AgentConfig.StateManager.Enabled { - common.GetStateManager().Stop() + state.GetStateManager().Stop() } log.Info("agent module was stopped") return nil } type AgentModule struct { - common.AgentConfig + model.AgentConfig } diff --git a/modules/agent/api/host.go b/modules/agent/api/host.go index c873501c..3d1b15a1 100644 --- a/modules/agent/api/host.go +++ b/modules/agent/api/host.go @@ -7,13 +7,13 @@ package api import ( "context" "fmt" - common2 "infini.sh/console/modules/agent/common" + log "github.com/cihub/seelog" + "infini.sh/console/modules/agent/state" httprouter "infini.sh/framework/core/api/router" "infini.sh/framework/core/host" "infini.sh/framework/core/orm" "infini.sh/framework/core/util" "net/http" - log "github.com/cihub/seelog" "time" ) @@ -107,7 +107,7 @@ func (h *APIHandler) GetHostAgentInfo(w http.ResponseWriter, req *http.Request, return } - sm := common2.GetStateManager() + sm := state.GetStateManager() ag, err := sm.GetAgent(hostInfo.AgentID) if err != nil { log.Error(err) @@ -159,7 +159,7 @@ func (h *APIHandler) GetHostElasticProcess(w http.ResponseWriter, req *http.Requ h.WriteJSON(w, util.MapStr{}, http.StatusOK) return } - sm := common2.GetStateManager() + sm := state.GetStateManager() ag, err := sm.GetAgent(hostInfo.AgentID) if err != nil { log.Error(err) @@ -193,7 +193,7 @@ func (h *APIHandler) GetHostElasticProcess(w http.ResponseWriter, req *http.Requ } func enrollHostFromAgent(agentID string) (*host.HostInfo, error){ - sm := common2.GetStateManager() + sm := state.GetStateManager() ag, err := sm.GetAgent(agentID) if err != nil { return nil, err diff --git a/modules/agent/api/init.go b/modules/agent/api/init.go index 24b02e94..7a5048fd 100644 --- a/modules/agent/api/init.go +++ b/modules/agent/api/init.go @@ -23,8 +23,13 @@ func Init() { api.HandleAPIMethod(api.POST, "/agent/instance/:instance_id/node/_auth", handler.RequirePermission(handler.authESNode, enum.PermissionAgentInstanceWrite)) api.HandleAPIMethod(api.DELETE, "/agent/instance/:instance_id/_nodes", handler.RequirePermission(handler.deleteESNode, enum.PermissionAgentInstanceWrite)) api.HandleAPIMethod(api.POST, "/agent/instance/:instance_id/node/_associate", handler.RequirePermission(handler.associateESNode, enum.PermissionAgentInstanceWrite)) + api.HandleAPIMethod(api.POST, "/agent/instance/try_connect", handler.RequireLogin(handler.tryConnect)) api.HandleAPIMethod(api.POST, "/host/_enroll", handler.enrollHost) api.HandleAPIMethod(api.GET, "/host/:host_id/agent/info",handler.GetHostAgentInfo) api.HandleAPIMethod(api.GET, "/host/:host_id/processes",handler.GetHostElasticProcess) + + + api.HandleAPIMethod(api.POST, "/agent/install_command", handler.RequireLogin(handler.generateInstallCommand)) + api.HandleAPIMethod(api.GET, "/agent/install.sh", handler.getInstallScript) } diff --git a/modules/agent/api/instance.go b/modules/agent/api/instance.go index 63e935f9..eab6e74f 100644 --- a/modules/agent/api/instance.go +++ b/modules/agent/api/instance.go @@ -8,20 +8,22 @@ import ( "context" "fmt" log "github.com/cihub/seelog" + "infini.sh/console/modules/agent/client" + common2 "infini.sh/console/modules/agent/common" + "infini.sh/console/modules/agent/model" + "infini.sh/console/modules/agent/state" "infini.sh/framework/core/agent" "infini.sh/framework/core/api" httprouter "infini.sh/framework/core/api/router" "infini.sh/framework/core/elastic" "infini.sh/framework/core/event" - "infini.sh/framework/core/host" "infini.sh/framework/core/orm" "infini.sh/framework/core/util" - common2 "infini.sh/console/modules/agent/common" elastic2 "infini.sh/framework/modules/elastic" - "infini.sh/framework/modules/elastic/adapter" "infini.sh/framework/modules/elastic/common" "net/http" "strconv" + "time" ) type APIHandler struct { @@ -36,24 +38,32 @@ func (h *APIHandler) createInstance(w http.ResponseWriter, req *http.Request, ps log.Error(err) return } - - oldInst := &agent.Instance{} - oldInst.ID = obj.ID - exists, err := orm.Get(oldInst) - - if err != nil && err != elastic2.ErrNotFound { - h.WriteError(w, err.Error(), http.StatusInternalServerError) - log.Error(err) - return - } - if exists { - errMsg := fmt.Sprintf("agent [%s] already exists", obj.ID) - h.WriteError(w, errMsg, http.StatusInternalServerError) - log.Error(errMsg) - return + //validate token for auto register + token := h.GetParameter(req, "token") + if token != "" { + if v, ok := tokens.Load(token); !ok { + h.WriteError(w, "token is invalid", http.StatusUnauthorized) + return + }else{ + if t, ok := v.(*Token); !ok || t.CreatedAt.Add(ExpiredIn).Before(time.Now()) { + tokens.Delete(token) + h.WriteError(w, "token was expired", http.StatusUnauthorized) + return + } + } + remoteIP := util.ClientIP(req) + agCfg := common2.GetAgentConfig() + port := agCfg.Setup.Port + if port == "" { + port = "8080" + } + obj.Endpoint = fmt.Sprintf("https://%s:%s", remoteIP, port) + obj.Tags = append(obj.Tags, "mtls") + obj.Tags = append(obj.Tags, "auto") } + //fetch more information of agent instance - res, err := common2.GetClient().GetInstanceBasicInfo(context.Background(), obj.GetEndpoint()) + res, err := client.GetClient().GetInstanceBasicInfo(context.Background(), obj.GetEndpoint()) if err != nil { errStr := fmt.Sprintf("get agent instance basic info error: %s", err.Error()) h.WriteError(w,errStr , http.StatusInternalServerError) @@ -71,9 +81,27 @@ func (h *APIHandler) createInstance(w http.ResponseWriter, req *http.Request, ps obj.MajorIP = res.MajorIP obj.Host = res.Host obj.IPS = res.IPS + if obj.Name == "" { + obj.Name = res.Name + } + } + oldInst := &agent.Instance{} + oldInst.ID = obj.ID + exists, err := orm.Get(oldInst) + + if err != nil && err != elastic2.ErrNotFound { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + if exists { + errMsg := fmt.Sprintf("agent [%s] already exists", obj.ID) + h.WriteError(w, errMsg, http.StatusInternalServerError) + log.Error(errMsg) + return } - obj.Status = common2.StatusOnline + obj.Status = model.StatusOnline err = orm.Create(nil, obj) if err != nil { h.WriteError(w, err.Error(), http.StatusInternalServerError) @@ -84,40 +112,30 @@ func (h *APIHandler) createInstance(w http.ResponseWriter, req *http.Request, ps if err != nil { log.Error(err) } + err = pushIngestConfigToAgent(obj) + if err != nil { + log.Error(err) + } h.WriteCreatedOKJSON(w, obj.ID) } -func bindAgentToHostByIP(ag *agent.Instance) error{ - err, result := orm.GetBy("ip", ag.MajorIP, host.HostInfo{}) +func pushIngestConfigToAgent(inst *agent.Instance) error{ + ingestCfg, basicAuth, err := common2.GetAgentIngestConfig() if err != nil { return err } - if len(result.Result) > 0 { - buf := util.MustToJSONBytes(result.Result[0]) - hostInfo := &host.HostInfo{} - err = util.FromJSONBytes(buf, hostInfo) + if basicAuth != nil && basicAuth.Password != "" { + err = client.GetClient().SetKeystoreValue(context.Background(), inst.GetEndpoint(), "ingest_cluster_password", basicAuth.Password) if err != nil { - return err - } - sm := common2.GetStateManager() - if ag.Status == "" { - _, err1 := sm.GetAgentClient().GetHostInfo(nil, ag.GetEndpoint()) - if err1 == nil { - ag.Status = "online" - }else{ - ag.Status = "offline" - } - } - - hostInfo.AgentStatus = ag.Status - hostInfo.AgentID = ag.ID - err = orm.Update(nil, hostInfo) - if err != nil { - return err + return fmt.Errorf("set keystore value to agent error: %w", err) } } + err = client.GetClient().SaveDynamicConfig(context.Background(), inst.GetEndpoint(), "ingest", ingestCfg ) + if err != nil { + fmt.Errorf("save dynamic config to agent error: %w", err) + } return nil } @@ -169,14 +187,42 @@ func (h *APIHandler) deleteInstance(w http.ResponseWriter, req *http.Request, ps log.Error(err) return } - if sm := common2.GetStateManager(); sm != nil { + if sm := state.GetStateManager(); sm != nil { sm.DeleteAgent(obj.ID) } + queryDsl := util.MapStr{ + "query": util.MapStr{ + "term": util.MapStr{ + "agent_id": util.MapStr{ + "value": id, + }, + }, + }, + } + err = orm.DeleteBy(agent.ESNodeInfo{}, util.MustToJSONBytes(queryDsl)) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error("delete node info error: ", err) + return + } - h.WriteJSON(w, util.MapStr{ - "_id": obj.ID, - "result": "deleted", - }, 200) + queryDsl = util.MapStr{ + "query": util.MapStr{ + "term": util.MapStr{ + "metadata.labels.agent_id": util.MapStr{ + "value": id, + }, + }, + }, + } + err = orm.DeleteBy(agent.Setting{}, util.MustToJSONBytes(queryDsl)) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error("delete agent settings error: ", err) + return + } + + h.WriteDeletedOKJSON(w, id) } func (h *APIHandler) getInstanceStats(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { @@ -410,7 +456,7 @@ func (h *APIHandler) authESNode(w http.ResponseWriter, req *http.Request, ps htt return } cfg.BasicAuth = &basicAuth - nodeInfo, err := common2.GetClient().AuthESNode(context.Background(), inst.GetEndpoint(), *cfg) + nodeInfo, err := client.GetClient().AuthESNode(context.Background(), inst.GetEndpoint(), *cfg) if err != nil { log.Error(err) h.WriteError(w, err.Error(), http.StatusInternalServerError) @@ -519,12 +565,50 @@ func (h *APIHandler) deleteESNode(w http.ResponseWriter, req *http.Request, ps h h.WriteError(w, err.Error(), http.StatusInternalServerError) return } + q = util.MapStr{ + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "terms": util.MapStr{ + "metadata.labels.node_uuid": nodeIDs, + }, + }, + { + "term": util.MapStr{ + "metadata.labels.agent_id": util.MapStr{ + "value": id, + }, + }, + }, + }, + }, + }, + } } h.WriteAckOKJSON(w) } +func (h *APIHandler) tryConnect(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + var reqBody = struct { + Endpoint string `json:"endpoint"` + BasicAuth agent.BasicAuth + }{} + err := h.DecodeJSON(req, &reqBody) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + connectRes, err := client.GetClient().GetInstanceBasicInfo(context.Background(), reqBody.Endpoint) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + h.WriteJSON(w, connectRes, http.StatusOK) +} + func refreshNodesInfo(inst *agent.Instance) ([]agent.ESNodeInfo, error) { - nodesInfo, err := common2.GetClient().GetElasticsearchNodes(context.Background(), inst.GetEndpoint()) + nodesInfo, err := client.GetClient().GetElasticsearchNodes(context.Background(), inst.GetEndpoint()) if err != nil { return nil, fmt.Errorf("get elasticsearch nodes error: %w", err) } @@ -534,7 +618,6 @@ func refreshNodesInfo(inst *agent.Instance) ([]agent.ESNodeInfo, error) { } oldPids := map[int]struct{}{} var resultNodes []agent.ESNodeInfo - //settings, err := common2.GetAgentSettings(inst.ID, 0) if err != nil { return nil, err } @@ -555,24 +638,6 @@ func refreshNodesInfo(inst *agent.Instance) ([]agent.ESNodeInfo, error) { if oldNode != nil && oldNode.ClusterID != "" { node.ClusterID = oldNode.ClusterID } - //else{ - // if cfg := clusterCfgs[node.ClusterUuid]; cfg != nil { - // node.ClusterID = cfg.ID - // setting := pickAgentSettings(settings, node) - // if setting == nil { - // setting, err = getAgentTaskSetting(inst.ID, node) - // if err != nil { - // log.Error() - // } - // err = orm.Create(nil, setting) - // if err != nil { - // log.Error("save agent task setting error: ", err) - // } - // } - // }else{ - // //cluster not registered in console - // } - //} } node.Status = "online" @@ -626,6 +691,7 @@ func getNodesInfoFromES(agentID string) (map[int]*agent.ESNodeInfo, error){ }, }, }, + } q := orm.Query{ RawQuery: util.MustToJSONBytes(query), @@ -645,26 +711,6 @@ func getNodesInfoFromES(agentID string) (map[int]*agent.ESNodeInfo, error){ return nodesInfo, nil } -func getClusterConfigs() map[string]*elastic.ElasticsearchConfig { - cfgs := map[string]*elastic.ElasticsearchConfig{} - elastic.WalkConfigs(func(key, value interface{}) bool { - if cfg, ok := value.(*elastic.ElasticsearchConfig); ok { - clusterUUID := cfg.ClusterUUID - if cfg.ClusterUUID == "" { - verInfo, err := adapter.ClusterVersion(elastic.GetMetadata(cfg.ID)) - if err != nil { - log.Error(err) - return true - } - clusterUUID = verInfo.ClusterUUID - } - cfgs[clusterUUID] = cfg - } - return true - }) - return cfgs -} - func pickAgentSettings(settings []agent.Setting, nodeInfo agent.ESNodeInfo) *agent.Setting { for _, setting := range settings { if setting.Metadata.Labels["node_uuid"] == nodeInfo.NodeUUID { @@ -698,7 +744,7 @@ func getAgentTaskSetting(agentID string, node agent.ESNodeInfo) (*agent.Setting, } // getSettingsByClusterID query agent task settings with cluster id -func getSettingsByClusterID(clusterID string) (*common2.TaskSetting, error) { +func getSettingsByClusterID(clusterID string) (*model.TaskSetting, error) { queryDsl := util.MapStr{ "size": 200, "query": util.MapStr{ @@ -746,8 +792,8 @@ func getSettingsByClusterID(clusterID string) (*common2.TaskSetting, error) { return nil, err } - setting := &common2.TaskSetting{ - NodeStats: &common2.NodeStatsTask{ + setting := &model.TaskSetting{ + NodeStats: &model.NodeStatsTask{ Enabled: true, }, } @@ -776,17 +822,17 @@ func getSettingsByClusterID(clusterID string) (*common2.TaskSetting, error) { } } if clusterStats { - setting.ClusterStats = &common2.ClusterStatsTask{ + setting.ClusterStats = &model.ClusterStatsTask{ Enabled: true, } } if indexStats { - setting.IndexStats = &common2.IndexStatsTask{ + setting.IndexStats = &model.IndexStatsTask{ Enabled: true, } } if clusterHealth { - setting.ClusterHealth = &common2.ClusterHealthTask{ + setting.ClusterHealth = &model.ClusterHealthTask{ Enabled: true, } } diff --git a/modules/agent/api/log.go b/modules/agent/api/log.go index d564f521..0164fc5d 100644 --- a/modules/agent/api/log.go +++ b/modules/agent/api/log.go @@ -7,7 +7,8 @@ package api import ( "fmt" log "github.com/cihub/seelog" - "infini.sh/console/modules/agent/common" + "infini.sh/console/modules/agent/client" + "infini.sh/console/modules/agent/state" "infini.sh/framework/core/agent" httprouter "infini.sh/framework/core/api/router" "infini.sh/framework/core/orm" @@ -31,7 +32,7 @@ func (h *APIHandler) getLogFilesByNode(w http.ResponseWriter, req *http.Request, }, http.StatusOK) return } - logFiles, err := common.GetClient().GetElasticLogFiles(nil, inst.GetEndpoint(), node.Path.Logs) + logFiles, err := client.GetClient().GetElasticLogFiles(nil, inst.GetEndpoint(), node.Path.Logs) if err != nil { h.WriteError(w, err.Error(), http.StatusInternalServerError) log.Error(err) @@ -68,7 +69,7 @@ func (h *APIHandler) getLogFileContent(w http.ResponseWriter, req *http.Request, return } reqBody.LogsPath = node.Path.Logs - sm := common.GetStateManager() + sm := state.GetStateManager() res, err := sm.GetAgentClient().GetElasticLogFileContent(nil, inst.GetEndpoint(), reqBody) if err != nil { h.WriteError(w, err.Error(), http.StatusInternalServerError) @@ -88,6 +89,13 @@ func getAgentByNodeID(nodeID string) (*agent.Instance, *agent.ESNodeInfo, error) }, }, }, + "sort": []util.MapStr{ + { + "timestamp": util.MapStr{ + "order": "desc", + }, + }, + }, } q := &orm.Query{ RawQuery: util.MustToJSONBytes(queryDsl), diff --git a/modules/agent/api/setup.go b/modules/agent/api/setup.go new file mode 100644 index 00000000..bcda07b5 --- /dev/null +++ b/modules/agent/api/setup.go @@ -0,0 +1,140 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package api + +import ( + "fmt" + log "github.com/cihub/seelog" + "github.com/valyala/fasttemplate" + "infini.sh/console/modules/agent/common" + "infini.sh/framework/core/api/rbac" + httprouter "infini.sh/framework/core/api/router" + "infini.sh/framework/core/global" + "infini.sh/framework/core/util" + "os" + + "net/http" + "path" + "strings" + "sync" + "time" +) + +var tokens = sync.Map{} +type Token struct { + CreatedAt time.Time + UserID string +} + +const ExpiredIn = time.Millisecond * 1000 * 60 * 20 +func (h *APIHandler) generateInstallCommand(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + claims, ok := req.Context().Value("user").(*rbac.UserClaims) + if !ok { + h.WriteError(w, "user not found", http.StatusInternalServerError) + return + } + var ( + t *Token + tokenStr string + ) + tokens.Range(func(key, value any) bool { + if v, ok := value.(*Token); ok && claims.UserId == v.UserID { + t = v + tokenStr = key.(string) + return false + } + return true + }) + + if t == nil { + tokenStr = util.GetUUID() + t = &Token{ + CreatedAt: time.Now(), + UserID: claims.UserId, + } + }else{ + if t.CreatedAt.Add(ExpiredIn).Before(time.Now()){ + tokens.Delete(tokenStr) + tokenStr = util.GetUUID() + t = &Token{ + CreatedAt: time.Now(), + UserID: claims.UserId, + } + }else{ + t.CreatedAt = time.Now() + } + } + tokens.Store(tokenStr, t) + agCfg := common.GetAgentConfig() + consoleEndpoint := agCfg.Setup.ConsoleEndpoint + if consoleEndpoint == "" { + scheme := "http" + if req.TLS != nil { + scheme = "https" + } + consoleEndpoint = fmt.Sprintf("%s://%s", scheme, req.Host) + } + h.WriteJSON(w, util.MapStr{ + "script": fmt.Sprintf(`sudo bash -c "$(curl -L '%s/agent/install.sh?token=%s')"`, consoleEndpoint, tokenStr), + "token": tokenStr, + "expired_at": t.CreatedAt.Add(ExpiredIn), + }, http.StatusOK) +} + +func (h *APIHandler) getInstallScript(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + tokenStr := h.GetParameter(req, "token") + if strings.TrimSpace(tokenStr) == "" { + h.WriteError(w, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized) + return + } + if v, ok := tokens.Load(tokenStr); !ok { + h.WriteError(w, "token is invalid", http.StatusUnauthorized) + return + }else{ + if t, ok := v.(*Token); !ok || t.CreatedAt.Add(ExpiredIn).Before(time.Now()) { + tokens.Delete(tokenStr) + h.WriteError(w, "token was expired", http.StatusUnauthorized) + return + } + } + agCfg := common.GetAgentConfig() + caCert, clientCertPEM, clientKeyPEM, err := common.GenerateServerCert(agCfg.Setup.CACertFile, agCfg.Setup.CAKeyFile) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + scriptTplPath := path.Join(global.Env().GetConfigDir(), "install_agent.tpl") + buf, err := os.ReadFile(scriptTplPath) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + tpl := fasttemplate.New(string(buf), "{{", "}}") + downloadURL := agCfg.Setup.DownloadURL + if downloadURL == "" { + downloadURL = "https://release.infinilabs.com/agent/stable/" + } + port := agCfg.Setup.Port + if port == "" { + port = "8080" + } + _, err = tpl.Execute(w, map[string]interface{}{ + "base_url": agCfg.Setup.DownloadURL, + "agent_version": agCfg.Setup.Version, + "console_endpoint": agCfg.Setup.ConsoleEndpoint, + "client_crt": clientCertPEM, + "client_key": clientKeyPEM, + "ca_crt": caCert, + "port": port, + "token": tokenStr, + }) + if err != nil { + log.Error(err) + } +} + diff --git a/modules/agent/common/client.go b/modules/agent/client/client.go similarity index 70% rename from modules/agent/common/client.go rename to modules/agent/client/client.go index 704fdda2..9f35870b 100644 --- a/modules/agent/common/client.go +++ b/modules/agent/client/client.go @@ -2,7 +2,7 @@ * Web: https://infinilabs.com * Email: hello#infini.ltd */ -package common +package client import ( "context" @@ -14,9 +14,38 @@ import ( "net/http" ) -type Client struct { +var defaultClient ClientAPI + +func GetClient() ClientAPI { + if defaultClient == nil { + panic("agent client not init") + } + return defaultClient } +func RegisterClient(client ClientAPI) { + defaultClient = client +} +type ClientAPI interface { + GetHostInfo(ctx context.Context, agentBaseURL string) (*host.HostInfo, error) + GetElasticProcess(ctx context.Context, agentBaseURL string, agentID string)(interface{}, error) + GetElasticLogFiles(ctx context.Context, agentBaseURL string, logsPath string)(interface{}, error) + GetElasticLogFileContent(ctx context.Context, agentBaseURL string, body interface{})(interface{}, error) + GetInstanceBasicInfo(ctx context.Context, agentBaseURL string) (*agent.Instance, error) + RegisterElasticsearch(ctx context.Context, agentBaseURL string, cfgs []elastic.ElasticsearchConfig) error + GetElasticsearchNodes(ctx context.Context, agentBaseURL string) ([]agent.ESNodeInfo, error) + AuthESNode(ctx context.Context, agentBaseURL string, cfg elastic.ElasticsearchConfig) (*agent.ESNodeInfo, error) + CreatePipeline(ctx context.Context, agentBaseURL string, body []byte) error + DeletePipeline(ctx context.Context, agentBaseURL, pipelineID string) error + SetKeystoreValue(ctx context.Context, agentBaseURL string, key, value string) error + SaveDynamicConfig(ctx context.Context, agentBaseURL string, name, content string) error +} + +type Client struct { + Executor Executor +} + + func (client *Client) GetHostInfo(ctx context.Context, agentBaseURL string) (*host.HostInfo, error) { req := &util.Request{ Method: http.MethodGet, @@ -188,16 +217,37 @@ func (client *Client) DeletePipeline(ctx context.Context, agentBaseURL, pipeline return client.doRequest(req, nil) } -func (client *Client) doRequest(req *util.Request, respObj interface{}) error { - result, err := util.ExecuteRequest(req) - if err != nil { - return err +func (client *Client) SetKeystoreValue(ctx context.Context, agentBaseURL string, key, value string) error{ + body := util.MapStr{ + "key": key, + "value": value, } - if result.StatusCode != 200 { - return fmt.Errorf(string(result.Body)) + req := &util.Request{ + Method: http.MethodPost, + Url: fmt.Sprintf("%s/_framework/keystore", agentBaseURL), + Context: ctx, + Body: util.MustToJSONBytes(body), } - if respObj == nil { - return nil - } - return util.FromJSONBytes(result.Body, respObj) + return client.doRequest(req, nil) } + +func (client *Client) SaveDynamicConfig(ctx context.Context, agentBaseURL string, name, content string) error{ + body := util.MapStr{ + "configs": util.MapStr{ + name: content, + }, + } + req := &util.Request{ + Method: http.MethodPost, + Url: fmt.Sprintf("%s/agent/config", agentBaseURL), + Context: ctx, + Body: util.MustToJSONBytes(body), + } + return client.doRequest(req, nil) +} + + +func (client *Client) doRequest(req *util.Request, respObj interface{}) error { + return client.Executor.DoRequest(req, respObj) +} + diff --git a/modules/agent/client/executor.go b/modules/agent/client/executor.go new file mode 100644 index 00000000..f756f3a1 --- /dev/null +++ b/modules/agent/client/executor.go @@ -0,0 +1,100 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package client + +import ( + "bytes" + "fmt" + "infini.sh/console/modules/agent/common" + "infini.sh/framework/core/util" + "io" + "net/http" +) + +type Executor interface { + DoRequest(req *util.Request, respObj interface{}) error +} + +type HttpExecutor struct { +} + +func (executor *HttpExecutor) DoRequest(req *util.Request, respObj interface{}) error { + result, err := util.ExecuteRequest(req) + if err != nil { + return err + } + if result.StatusCode != 200 { + return fmt.Errorf(string(result.Body)) + } + if respObj == nil { + return nil + } + return util.FromJSONBytes(result.Body, respObj) +} + +func NewMTLSExecutor(caCertFile, caKeyFile string) (*MTLSExecutor, error){ + var ( + instanceCrt string + instanceKey string + ) + instanceCrt, instanceKey, err := common.GetAgentInstanceCerts(caCertFile, caKeyFile) + if err != nil { + return nil, fmt.Errorf("generate tls cert error: %w", err) + } + hClient, err := util.NewMTLSClient(caCertFile, instanceCrt, instanceKey) + if err != nil { + return nil, err + } + return &MTLSExecutor{ + CaCertFile: caCertFile, + CAKeyFile: caKeyFile, + client: hClient, + }, nil +} + +type MTLSExecutor struct { + CaCertFile string + CAKeyFile string + client *http.Client +} + + +func (executor *MTLSExecutor) DoRequest(req *util.Request, respObj interface{}) error { + var reader io.Reader + if len(req.Body) > 0 { + reader = bytes.NewReader(req.Body) + } + var ( + hr *http.Request + err error + ) + if req.Context == nil { + hr, err = http.NewRequest(req.Method, req.Url, reader) + }else{ + hr, err = http.NewRequestWithContext(req.Context, req.Method, req.Url, reader) + } + if err != nil { + return err + } + res, err := executor.client.Do(hr) + if err != nil { + return err + } + defer res.Body.Close() + buf, err := io.ReadAll(res.Body) + if err != nil { + return err + } + if res.StatusCode != 200 { + return fmt.Errorf(string(buf)) + } + if respObj != nil { + err = util.FromJSONBytes(buf, respObj) + if err != nil { + return err + } + } + return nil +} \ No newline at end of file diff --git a/modules/agent/common/cert.go b/modules/agent/common/cert.go new file mode 100644 index 00000000..61521509 --- /dev/null +++ b/modules/agent/common/cert.go @@ -0,0 +1,97 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package common + +import ( + "crypto/rsa" + "crypto/x509" + "encoding/pem" + "infini.sh/framework/core/global" + "infini.sh/framework/core/util" + "os" + "path" +) + +func GenerateClientCert(caFile, caKey string) (caCert, clientCertPEM, clientKeyPEM []byte, err error){ + return generateCert(caFile, caKey, false) +} + +func GenerateServerCert(caFile, caKey string) (caCert, serverCertPEM, serverKeyPEM []byte, err error){ + return generateCert(caFile, caKey, true) +} + +func generateCert(caFile, caKey string, isServer bool)(caCert, instanceCertPEM, instanceKeyPEM []byte, err error){ + pool := x509.NewCertPool() + if caFile == "" { + caFile = path.Join(global.Env().GetConfigDir(), "certs", "ca.crt") + } + caCert, err = os.ReadFile(caFile) + if err != nil { + return + } + pool.AppendCertsFromPEM(caCert) + b, _ := pem.Decode(caCert) + var rootCert *x509.Certificate + caCertBytes := b.Bytes + rootCert, err = x509.ParseCertificate(b.Bytes) + if err != nil { + return + } + if caKey == "" { + caKey = path.Join(global.Env().GetConfigDir(), "certs", "ca.key") + } + var keyBytes []byte + keyBytes, err = os.ReadFile(caKey) + if err != nil { + return + } + b, _ = pem.Decode(keyBytes) + certKey, err := util.ParsePrivateKey(b.Bytes) + if err != nil { + return + } + if isServer{ + b = &pem.Block{Type: "CERTIFICATE", Bytes: caCertBytes} + certPEM := pem.EncodeToMemory(b) + instanceCertPEM, instanceKeyPEM, err = util.GenerateServerCert(rootCert, certKey.(*rsa.PrivateKey), certPEM, nil) + }else{ + _, instanceCertPEM, instanceKeyPEM = util.GetClientCert(rootCert, certKey) + } + return caCert, instanceCertPEM, instanceKeyPEM, nil +} + +func GetAgentInstanceCerts(caFile, caKey string) (string, string, error) { + dataDir := global.Env().GetDataDir() + instanceCrt := path.Join(dataDir, "certs/agent/instance.crt") + instanceKey := path.Join(dataDir, "certs/agent/instance.key") + var ( + err error + clientCertPEM []byte + clientKeyPEM []byte + ) + if util.FileExists(instanceCrt) && util.FileExists(instanceKey) { + return instanceCrt, instanceKey, nil + } + _, clientCertPEM, clientKeyPEM, err = GenerateClientCert(caFile, caKey) + if err != nil { + return "", "", err + } + baseDir := path.Join(dataDir, "certs/agent") + if !util.IsExist(baseDir){ + err = os.MkdirAll(baseDir, 0775) + if err != nil { + return "", "", err + } + } + _, err = util.FilePutContentWithByte(instanceCrt, clientCertPEM) + if err != nil { + return "", "", err + } + _, err = util.FilePutContentWithByte(instanceKey, clientKeyPEM) + if err != nil { + return "", "", err + } + return instanceCrt, instanceKey, nil +} \ No newline at end of file diff --git a/modules/agent/common/config.go b/modules/agent/common/config.go index 36b5568c..ac7e255e 100644 --- a/modules/agent/common/config.go +++ b/modules/agent/common/config.go @@ -4,18 +4,17 @@ package common -type AgentConfig struct { - Enabled bool `config:"enabled"` - StateManager struct{ - Enabled bool `config:"enabled"` - } `config:"state_manager"` - Setup SetupConfig `config:"setup"` -} +import ( + "infini.sh/console/modules/agent/model" + "infini.sh/framework/core/env" +) -type SetupConfig struct { - DownloadURL string `config:"download_url"` - Version string `config:"version"` - CACertFile string `config:"ca_cert"` - CAKeyFile string `config:"ca_key"` - ScriptEndpoint string `config:"script_endpoint"` + +func GetAgentConfig() *model.AgentConfig { + agentCfg := &model.AgentConfig{} + _, err := env.ParseConfig("agent", agentCfg ) + if err != nil { + panic(err) + } + return agentCfg } \ No newline at end of file diff --git a/modules/agent/common/helper.go b/modules/agent/common/helper.go index 5763fa7c..cc7ea55d 100644 --- a/modules/agent/common/helper.go +++ b/modules/agent/common/helper.go @@ -6,13 +6,18 @@ package common import ( "fmt" + "infini.sh/console/modules/agent/model" "infini.sh/framework/core/agent" + "infini.sh/framework/core/credential" "infini.sh/framework/core/elastic" + "infini.sh/framework/core/event" "infini.sh/framework/core/orm" "infini.sh/framework/core/util" + log "src/github.com/cihub/seelog" + "strings" ) -func ParseAgentSettings(settings []agent.Setting)(*ParseAgentSettingsResult, error){ +func ParseAgentSettings(settings []agent.Setting)(*model.ParseAgentSettingsResult, error){ var clusterCfgs []elastic.ElasticsearchConfig var ( pipelines []util.MapStr @@ -49,7 +54,7 @@ func ParseAgentSettings(settings []agent.Setting)(*ParseAgentSettingsResult, err if err != nil { return nil, err } - taskSetting := TaskSetting{} + taskSetting := model.TaskSetting{} err = util.FromJSONBytes(vBytes, &taskSetting) if err != nil { return nil, err @@ -61,51 +66,55 @@ func ParseAgentSettings(settings []agent.Setting)(*ParseAgentSettingsResult, err pipelines = append(pipelines, partPipelines...) toDeletePipelineNames = append(toDeletePipelineNames, partDeletePipelineNames...) } - return &ParseAgentSettingsResult{ + return &model.ParseAgentSettingsResult{ ClusterConfigs: clusterCfgs, Pipelines: pipelines, ToDeletePipelineNames: toDeletePipelineNames, }, nil } +// GetAgentSettings query agent setting by agent id and updated timestamp, +// if there has any setting was updated, then return setting list includes settings not changed, +// otherwise return empty setting list func GetAgentSettings(agentID string, timestamp int64) ([]agent.Setting, error) { - queryDsl := util.MapStr{ - "size": 500, - "query": util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "metadata.category": util.MapStr{ - "value": "agent", - }, - }, - }, - { - "term": util.MapStr{ - "metadata.name": util.MapStr{ - "value": "task", - }, - }, - }, - { - "term": util.MapStr{ - "metadata.labels.agent_id": util.MapStr{ - "value": agentID, - }, - }, - }, - { - "range": util.MapStr{ - "updated": util.MapStr{ - "gt": timestamp, - }, + query := util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "metadata.category": util.MapStr{ + "value": "agent", }, }, }, + { + "term": util.MapStr{ + "metadata.name": util.MapStr{ + "value": "task", + }, + }, + }, + { + "term": util.MapStr{ + "metadata.labels.agent_id": util.MapStr{ + "value": agentID, + }, + }, + }, + //{ + // "range": util.MapStr{ + // "updated": util.MapStr{ + // "gt": timestamp, + // }, + // }, + //}, }, }, } + queryDsl := util.MapStr{ + "size": 500, + "query": query, + } q := orm.Query{ RawQuery: util.MustToJSONBytes(queryDsl), } @@ -116,7 +125,10 @@ func GetAgentSettings(agentID string, timestamp int64) ([]agent.Setting, error) if len(result.Result) == 0 { return nil, nil } - var settings []agent.Setting + var ( + settings []agent.Setting + hasUpdated bool + ) for _, row := range result.Result { setting := agent.Setting{} buf, err := util.ToJSONBytes(row) @@ -127,12 +139,18 @@ func GetAgentSettings(agentID string, timestamp int64) ([]agent.Setting, error) if err != nil { return nil, err } + if setting.Updated != nil && setting.Updated.UnixMilli() > timestamp { + hasUpdated = true + } settings = append(settings, setting) } + if !hasUpdated { + return nil, nil + } return settings, nil } -func TransformSettingsToConfig(setting *TaskSetting, clusterID, nodeUUID string) ([]util.MapStr, []string, error) { +func TransformSettingsToConfig(setting *model.TaskSetting, clusterID, nodeUUID string) ([]util.MapStr, []string, error) { if setting == nil { return nil, nil, fmt.Errorf("empty setting") } @@ -227,3 +245,237 @@ func getMetricPipelineName(clusterID, processorName string) string{ return fmt.Sprintf("collect_%s_%s", clusterID, processorName) } + +func LoadAgentsFromES(clusterID string) ([]agent.Instance, error) { + q := orm.Query{ + Size: 1000, + } + if clusterID != "" { + q.Conds = orm.And(orm.Eq("id", clusterID)) + } + err, result := orm.Search(agent.Instance{}, &q) + if err != nil { + return nil, fmt.Errorf("query agent error: %w", err) + } + + if len(result.Result) > 0 { + var agents = make([]agent.Instance, 0, len(result.Result)) + for _, row := range result.Result { + ag := agent.Instance{} + bytes := util.MustToJSONBytes(row) + err = util.FromJSONBytes(bytes, &ag) + if err != nil { + log.Errorf("got unexpected agent: %s, error: %v", string(bytes), err) + continue + } + agents = append(agents, ag) + } + return agents, nil + } + return nil, nil +} + +func GetLatestOnlineAgentIDs(agentIds []string, lastSeconds int) (map[string]struct{}, error) { + q := orm.Query{ + WildcardIndex: true, + } + mustQ := []util.MapStr{ + { + "term": util.MapStr{ + "metadata.name": util.MapStr{ + "value": "agent", + }, + }, + }, + { + "term": util.MapStr{ + "metadata.category": util.MapStr{ + "value": "instance", + }, + }, + }, + } + if len(agentIds) > 0 { + mustQ = append(mustQ, util.MapStr{ + "terms": util.MapStr{ + "agent.id": agentIds, + }, + }) + } + queryDSL := util.MapStr{ + "_source": "agent.id", + "sort": []util.MapStr{ + { + "timestamp": util.MapStr{ + "order": "desc", + }, + }, + }, + "collapse": util.MapStr{ + "field": "agent.id", + }, + "query": util.MapStr{ + "bool": util.MapStr{ + "filter": []util.MapStr{ + { + "range": util.MapStr{ + "timestamp": util.MapStr{ + "gte": fmt.Sprintf("now-%ds", lastSeconds), + }, + }, + }, + }, + "must": mustQ, + }, + }, + } + q.RawQuery = util.MustToJSONBytes(queryDSL) + err, result := orm.Search(event.Event{}, &q) + if err != nil { + return nil, fmt.Errorf("query agent instance metric error: %w", err) + } + agentIDs := map[string]struct{}{} + if len(result.Result) > 0 { + searchRes := elastic.SearchResponse{} + err = util.FromJSONBytes(result.Raw, &searchRes) + if err != nil { + return nil, err + } + agentIDKeyPath := []string{"agent", "id"} + for _, hit := range searchRes.Hits.Hits { + agentID, _ := util.GetMapValueByKeys(agentIDKeyPath, hit.Source) + if v, ok := agentID.(string); ok { + agentIDs[v] = struct{}{} + } + } + } + return agentIDs, nil +} + +func GetAgentIngestConfig() (string, *elastic.BasicAuth, error) { + agCfg := GetAgentConfig() + var ( + endpoint string + ok bool + ) + if endpoint, ok = agCfg.Setup.IngestClusterEndpoint.(string);ok { + if endpoint = strings.TrimSpace(endpoint); endpoint == "" { + return "", nil, fmt.Errorf("config ingest_cluster_endpoint must not be empty") + } + } + var ( + basicAuth elastic.BasicAuth + ) + if agCfg.Setup.IngestClusterCredentialID != "" { + cred := credential.Credential{} + cred.ID = agCfg.Setup.IngestClusterCredentialID + _, err := orm.Get(&cred) + if err != nil { + return "", nil, fmt.Errorf("query credential [%s] error: %w", cred.ID, err) + } + info, err := cred.Decode() + if err != nil { + return "", nil, fmt.Errorf("decode credential [%s] error: %w", cred.ID, err) + } + if basicAuth, ok = info.(elastic.BasicAuth); !ok { + log.Debug("invalid credential: ", cred) + } + } + tpl := `elasticsearch: + - name: default + enabled: true + endpoint: %s + discovery: + enabled: true + basic_auth: + username: %s + password: $[[keystore.ingest_cluster_password]] +metrics: + enabled: true + queue: metrics + network: + enabled: true + summary: true + details: true + memory: + metrics: + - swap + - memory + disk: + metrics: + - iops + - usage + cpu: + metrics: + - idle + - system + - user + - iowait + - load + instance: + enabled: true +pipeline: + - name: logs_indexing_merge + auto_start: true + keep_running: true + processor: + - indexing_merge: + index_name: ".infini_logs" + elasticsearch: "default" + input_queue: "logs" + idle_timeout_in_seconds: 10 + output_queue: + name: "logs_requests" + label: + tag: "logs" + worker_size: 1 + bulk_size_in_mb: 10 + - name: ingest_logs + auto_start: true + keep_running: true + processor: + - bulk_indexing: + bulk: + compress: true + batch_size_in_mb: 5 + batch_size_in_docs: 5000 + consumer: + fetch_max_messages: 100 + queues: + type: indexing_merge + tag: "logs" + when: + cluster_available: ["default"] + - name: metrics_indexing_merge + auto_start: true + keep_running: true + processor: + - indexing_merge: + elasticsearch: "default" + index_name: ".infini_metrics" + input_queue: "metrics" + output_queue: + name: "metrics_requests" + label: + tag: "metrics" + worker_size: 1 + bulk_size_in_mb: 5 + - name: ingest_metrics + auto_start: true + keep_running: true + processor: + - bulk_indexing: + bulk: + compress: true + batch_size_in_mb: 5 + batch_size_in_docs: 5000 + consumer: + fetch_max_messages: 100 + queues: + type: indexing_merge + tag: "metrics" + when: + cluster_available: ["default"]` + tpl = fmt.Sprintf(tpl, endpoint, basicAuth.Username) + return tpl, &basicAuth, nil +} \ No newline at end of file diff --git a/modules/agent/common/interface.go b/modules/agent/common/interface.go deleted file mode 100644 index 6407358f..00000000 --- a/modules/agent/common/interface.go +++ /dev/null @@ -1,65 +0,0 @@ -/* Copyright © INFINI Ltd. All rights reserved. - * Web: https://infinilabs.com - * Email: hello#infini.ltd */ - -package common - -import ( - "context" - "infini.sh/framework/core/agent" - "infini.sh/framework/core/elastic" - "infini.sh/framework/core/host" -) - -var defaultClient ClientAPI - -func GetClient() ClientAPI { - if defaultClient == nil { - panic("agent client not init") - } - return defaultClient -} - -func RegisterClient(client ClientAPI) { - defaultClient = client -} -type ClientAPI interface { - GetHostInfo(ctx context.Context, agentBaseURL string) (*host.HostInfo, error) - GetElasticProcess(ctx context.Context, agentBaseURL string, agentID string)(interface{}, error) - GetElasticLogFiles(ctx context.Context, agentBaseURL string, logsPath string)(interface{}, error) - GetElasticLogFileContent(ctx context.Context, agentBaseURL string, body interface{})(interface{}, error) - GetInstanceBasicInfo(ctx context.Context, agentBaseURL string) (*agent.Instance, error) - RegisterElasticsearch(ctx context.Context, agentBaseURL string, cfgs []elastic.ElasticsearchConfig) error - GetElasticsearchNodes(ctx context.Context, agentBaseURL string) ([]agent.ESNodeInfo, error) - AuthESNode(ctx context.Context, agentBaseURL string, cfg elastic.ElasticsearchConfig) (*agent.ESNodeInfo, error) - CreatePipeline(ctx context.Context, agentBaseURL string, body []byte) error - DeletePipeline(ctx context.Context, agentBaseURL, pipelineID string) error -} - - -var stateManager IStateManager - -func GetStateManager() IStateManager { - if stateManager == nil { - panic("agent state manager not init") - } - return stateManager -} - -func RegisterStateManager(sm IStateManager) { - stateManager = sm -} - -func IsEnabled() bool { - return stateManager != nil -} - -type IStateManager interface { - GetAgent(ID string) (*agent.Instance, error) - UpdateAgent(inst *agent.Instance, syncToES bool) (*agent.Instance, error) - GetTaskAgent(clusterID string) (*agent.Instance, error) - DeleteAgent(agentID string) error - LoopState() - Stop() - GetAgentClient() ClientAPI -} \ No newline at end of file diff --git a/modules/agent/model/config.go b/modules/agent/model/config.go new file mode 100644 index 00000000..f4bb4a2d --- /dev/null +++ b/modules/agent/model/config.go @@ -0,0 +1,24 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package model + +type AgentConfig struct { + Enabled bool `config:"enabled"` + StateManager struct{ + Enabled bool `config:"enabled"` + } `config:"state_manager"` + Setup *SetupConfig `config:"setup"` +} + +type SetupConfig struct { + DownloadURL string `config:"download_url"` + Version string `config:"version"` + CACertFile string `config:"ca_cert"` + CAKeyFile string `config:"ca_key"` + ConsoleEndpoint string `config:"console_endpoint"` + IngestClusterEndpoint interface{} `config:"ingest_cluster_endpoint"` + IngestClusterCredentialID string `config:"ingest_cluster_credential_id"` + Port string `config:"port"` +} diff --git a/modules/agent/model/const.go b/modules/agent/model/const.go new file mode 100644 index 00000000..6115f841 --- /dev/null +++ b/modules/agent/model/const.go @@ -0,0 +1,10 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package model + +const ( + StatusOnline string = "online" + StatusOffline = "offline" +) diff --git a/modules/agent/common/model.go b/modules/agent/model/task.go similarity index 98% rename from modules/agent/common/model.go rename to modules/agent/model/task.go index e46452be..d8b42006 100644 --- a/modules/agent/common/model.go +++ b/modules/agent/model/task.go @@ -2,7 +2,7 @@ * Web: https://infinilabs.com * Email: hello#infini.ltd */ -package common +package model import ( "infini.sh/framework/core/elastic" diff --git a/modules/agent/common/state.go b/modules/agent/state/state.go similarity index 55% rename from modules/agent/common/state.go rename to modules/agent/state/state.go index de26a0ee..5a6b986c 100644 --- a/modules/agent/common/state.go +++ b/modules/agent/state/state.go @@ -2,51 +2,75 @@ * Web: https://infinilabs.com * Email: hello#infini.ltd */ -package common +package state import ( "context" "fmt" "github.com/buger/jsonparser" log "github.com/cihub/seelog" + "infini.sh/console/modules/agent/client" + "infini.sh/console/modules/agent/common" + "infini.sh/console/modules/agent/model" "infini.sh/framework/core/agent" - "infini.sh/framework/core/elastic" - "infini.sh/framework/core/event" "infini.sh/framework/core/host" "infini.sh/framework/core/kv" "infini.sh/framework/core/orm" "infini.sh/framework/core/util" + "infini.sh/framework/modules/elastic" "runtime" "runtime/debug" - "strings" + "src/gopkg.in/yaml.v2" "sync" "time" ) -const ( - StatusOnline string = "online" - StatusOffline = "offline" -) +var stateManager IStateManager + +func GetStateManager() IStateManager { + if stateManager == nil { + panic("agent state manager not init") + } + return stateManager +} + +func RegisterStateManager(sm IStateManager) { + stateManager = sm +} + +func IsEnabled() bool { + return stateManager != nil +} + +type IStateManager interface { + GetAgent(ID string) (*agent.Instance, error) + UpdateAgent(inst *agent.Instance, syncToES bool) (*agent.Instance, error) + GetTaskAgent(clusterID string) (*agent.Instance, error) + DeleteAgent(agentID string) error + LoopState() + Stop() + GetAgentClient() client.ClientAPI +} type StateManager struct { TTL time.Duration // kv ttl KVKey string stopC chan struct{} stopCompleteC chan struct{} - agentClient *Client + agentClient *client.Client agentIds map[string]string agentMutex sync.Mutex workerChan chan struct{} timestamps map[string]int64 } -func NewStateManager(TTL time.Duration, kvKey string, agentIds map[string]string) *StateManager { +func NewStateManager(TTL time.Duration, kvKey string, agentIds map[string]string, agentClient *client.Client) *StateManager { return &StateManager{ TTL: TTL, KVKey: kvKey, stopC: make(chan struct{}), stopCompleteC: make(chan struct{}), - agentClient: &Client{}, + agentClient: agentClient, agentIds: agentIds, workerChan: make(chan struct{}, runtime.NumCPU()), timestamps: map[string]int64{}, @@ -54,7 +78,7 @@ func NewStateManager(TTL time.Duration, kvKey string, agentIds map[string]string } func (sm *StateManager) checkAgentStatus() { - onlineAgentIDs, err := GetLatestOnlineAgentIDs(nil, int(sm.TTL.Seconds())) + onlineAgentIDs, err := common.GetLatestOnlineAgentIDs(nil, int(sm.TTL.Seconds())) if err != nil { log.Error(err) return @@ -64,32 +88,32 @@ func (sm *StateManager) checkAgentStatus() { for agentID := range onlineAgentIDs { if _, ok := sm.agentIds[agentID]; !ok { log.Infof("status of agent [%s] changed to online", agentID) - sm.agentIds[agentID] = StatusOnline + sm.agentIds[agentID] = model.StatusOnline } } sm.agentMutex.Unlock() for agentID, status := range sm.agentIds { if _, ok := onlineAgentIDs[agentID]; ok { sm.syncSettings(agentID) - host.UpdateHostAgentStatus(agentID, StatusOnline) - if status == StatusOnline { + host.UpdateHostAgentStatus(agentID, model.StatusOnline) + if status == model.StatusOnline { continue } // status change to online - sm.agentIds[agentID] = StatusOnline + sm.agentIds[agentID] = model.StatusOnline log.Infof("status of agent [%s] changed to online", agentID) //set timestamp equals 0 to create pipeline sm.timestamps[agentID] = 0 continue }else{ // already offline - if status == StatusOffline { + if status == model.StatusOffline { continue } } // status change to offline // todo validate whether agent is offline - sm.agentIds[agentID] = StatusOffline + sm.agentIds[agentID] = model.StatusOffline sm.workerChan <- struct{}{} go func(agentID string) { defer func() { @@ -101,10 +125,12 @@ func (sm *StateManager) checkAgentStatus() { }() ag, err := sm.GetAgent(agentID) if err != nil { - log.Error(err) + if err != elastic.ErrNotFound { + log.Error(err) + } return } - ag.Status = StatusOffline + ag.Status = model.StatusOffline log.Infof("agent [%s] is offline", ag.Endpoint) _, err = sm.UpdateAgent(ag, true) if err != nil { @@ -112,15 +138,22 @@ func (sm *StateManager) checkAgentStatus() { return } //update host agent status - host.UpdateHostAgentStatus(ag.ID, StatusOffline) + host.UpdateHostAgentStatus(ag.ID, model.StatusOffline) }(agentID) } } func (sm *StateManager) syncSettings(agentID string) { + ag, err := sm.GetAgent(agentID) + if err != nil { + if err != elastic.ErrNotFound { + log.Errorf("get agent error: %v", err) + } + return + } newTimestamp := time.Now().UnixMilli() - settings, err := GetAgentSettings(agentID, sm.timestamps[agentID]) + settings, err := common.GetAgentSettings(agentID, sm.timestamps[agentID]) if err != nil { log.Errorf("query agent settings error: %v", err) return @@ -129,46 +162,52 @@ func (sm *StateManager) syncSettings(agentID string) { log.Debugf("got no settings of agent [%s]", agentID) return } - parseResult, err := ParseAgentSettings(settings) + parseResult, err := common.ParseAgentSettings(settings) if err != nil { log.Errorf("parse agent settings error: %v", err) return } - ag, err := sm.GetAgent(agentID) + agClient := sm.GetAgentClient() + var clusterCfgs []util.MapStr + if len(parseResult.ClusterConfigs) > 0 { + for _, cfg := range parseResult.ClusterConfigs { + clusterCfg := util.MapStr{ + "name": cfg.ID, + "enabled": true, + "endpoint": cfg.Endpoint, + } + if cfg.BasicAuth != nil && cfg.BasicAuth.Password != ""{ + err = agClient.SetKeystoreValue(context.Background(), ag.GetEndpoint(), fmt.Sprintf("%s_password", cfg.ID), cfg.BasicAuth.Password) + if err != nil { + log.Errorf("set keystore value error: %v", err) + continue + } + clusterCfg["basic_auth"] = util.MapStr{ + "username": cfg.BasicAuth.Username, + "password": fmt.Sprintf("$[[keystore.%s_password]]", cfg.ID), + } + } + clusterCfgs = append(clusterCfgs, clusterCfg) + } + } + var dynamicCfg = util.MapStr{} + if len(clusterCfgs) > 0 { + dynamicCfg["elasticsearch"] = clusterCfgs + } + if len(parseResult.Pipelines) > 0 { + dynamicCfg["pipeline"] = parseResult.Pipelines + } + cfgBytes, err := yaml.Marshal(dynamicCfg) if err != nil { - log.Errorf("get agent error: %v", err) + log.Error("serialize config to yaml error: ", err) return } - agClient := sm.GetAgentClient() - if len(parseResult.ClusterConfigs) > 0 { - err = agClient.RegisterElasticsearch(nil, ag.GetEndpoint(), parseResult.ClusterConfigs) - if err != nil { - log.Errorf("register elasticsearch config error: %v", err) - return - } - } - for _, pipelineID := range parseResult.ToDeletePipelineNames { - err = agClient.DeletePipeline(context.Background(), ag.GetEndpoint(), pipelineID) - if err != nil { - if !strings.Contains(err.Error(), "not found") { - log.Errorf("delete pipeline error: %v", err) - continue - } - } - //todo update delete pipeline state - } - for _, pipeline := range parseResult.Pipelines { - err = agClient.CreatePipeline(context.Background(), ag.GetEndpoint(), util.MustToJSONBytes(pipeline)) - if err != nil { - log.Errorf("create pipeline error: %v", err) - return - } - } + err = agClient.SaveDynamicConfig(context.Background(), ag.GetEndpoint(), "dynamic_task", string(cfgBytes)) sm.timestamps[agentID] = newTimestamp } func (sm *StateManager) getAvailableAgent(clusterID string) (*agent.Instance, error) { - agents, err := LoadAgentsFromES(clusterID) + agents, err := common.LoadAgentsFromES(clusterID) if err != nil { return nil, err } @@ -216,7 +255,7 @@ func (sm *StateManager) GetAgent(ID string) (*agent.Instance, error) { if time.Since(timestamp) > sm.TTL { exists, err := orm.Get(inst) if err != nil { - return nil, fmt.Errorf("get agent [%s] error: %w", ID, err) + return nil, err } if !exists { return nil, fmt.Errorf("can not found agent [%s]", ID) @@ -261,112 +300,6 @@ func (sm *StateManager) DeleteAgent(agentID string) error { return kv.DeleteKey(sm.KVKey, []byte(agentID)) } -func (sm *StateManager) GetAgentClient() ClientAPI { +func (sm *StateManager) GetAgentClient() client.ClientAPI { return sm.agentClient } - -func LoadAgentsFromES(clusterID string) ([]agent.Instance, error) { - q := orm.Query{ - Size: 1000, - } - if clusterID != "" { - q.Conds = orm.And(orm.Eq("id", clusterID)) - } - err, result := orm.Search(agent.Instance{}, &q) - if err != nil { - return nil, fmt.Errorf("query agent error: %w", err) - } - - if len(result.Result) > 0 { - var agents = make([]agent.Instance, 0, len(result.Result)) - for _, row := range result.Result { - ag := agent.Instance{} - bytes := util.MustToJSONBytes(row) - err = util.FromJSONBytes(bytes, &ag) - if err != nil { - log.Errorf("got unexpected agent: %s, error: %v", string(bytes), err) - continue - } - agents = append(agents, ag) - } - return agents, nil - } - return nil, nil -} - -func GetLatestOnlineAgentIDs(agentIds []string, lastSeconds int) (map[string]struct{}, error) { - q := orm.Query{ - WildcardIndex: true, - } - mustQ := []util.MapStr{ - { - "term": util.MapStr{ - "metadata.name": util.MapStr{ - "value": "agent", - }, - }, - }, - { - "term": util.MapStr{ - "metadata.category": util.MapStr{ - "value": "instance", - }, - }, - }, - } - if len(agentIds) > 0 { - mustQ = append(mustQ, util.MapStr{ - "terms": util.MapStr{ - "agent.id": agentIds, - }, - }) - } - queryDSL := util.MapStr{ - "_source": "agent.id", - "sort": []util.MapStr{ - { - "timestamp": util.MapStr{ - "order": "desc", - }, - }, - }, - "collapse": util.MapStr{ - "field": "agent.id", - }, - "query": util.MapStr{ - "bool": util.MapStr{ - "filter": []util.MapStr{ - { - "range": util.MapStr{ - "timestamp": util.MapStr{ - "gte": fmt.Sprintf("now-%ds", lastSeconds), - }, - }, - }, - }, - "must": mustQ, - }, - }, - } - q.RawQuery = util.MustToJSONBytes(queryDSL) - err, result := orm.Search(event.Event{}, &q) - if err != nil { - return nil, fmt.Errorf("query agent instance metric error: %w", err) - } - agentIDs := map[string]struct{}{} - if len(result.Result) > 0 { - searchRes := elastic.SearchResponse{} - err = util.FromJSONBytes(result.Raw, &searchRes) - if err != nil { - return nil, err - } - agentIDKeyPath := []string{"agent", "id"} - for _, hit := range searchRes.Hits.Hits { - agentID, _ := util.GetMapValueByKeys(agentIDKeyPath, hit.Source) - if v, ok := agentID.(string); ok { - agentIDs[v] = struct{}{} - } - } - } - return agentIDs, nil -} \ No newline at end of file