From e15073e6f58b6a408b3949d8f36655580e155691 Mon Sep 17 00:00:00 2001 From: liugq Date: Tue, 30 May 2023 11:30:11 +0800 Subject: [PATCH 1/6] init --- modules/agent/api/init.go | 5 ++ modules/agent/api/instance.go | 114 +++++++++----------------- modules/agent/api/log.go | 7 ++ modules/agent/api/setup.go | 144 +++++++++++++++++++++++++++++++++ modules/agent/common/cert.go | 63 +++++++++++++++ modules/agent/common/client.go | 102 +++++++++++++++++++++-- modules/agent/common/config.go | 19 +++++ 7 files changed, 370 insertions(+), 84 deletions(-) create mode 100644 modules/agent/api/setup.go create mode 100644 modules/agent/common/cert.go diff --git a/modules/agent/api/init.go b/modules/agent/api/init.go index 24b02e94..7a5048fd 100644 --- a/modules/agent/api/init.go +++ b/modules/agent/api/init.go @@ -23,8 +23,13 @@ func Init() { api.HandleAPIMethod(api.POST, "/agent/instance/:instance_id/node/_auth", handler.RequirePermission(handler.authESNode, enum.PermissionAgentInstanceWrite)) api.HandleAPIMethod(api.DELETE, "/agent/instance/:instance_id/_nodes", handler.RequirePermission(handler.deleteESNode, enum.PermissionAgentInstanceWrite)) api.HandleAPIMethod(api.POST, "/agent/instance/:instance_id/node/_associate", handler.RequirePermission(handler.associateESNode, enum.PermissionAgentInstanceWrite)) + api.HandleAPIMethod(api.POST, "/agent/instance/try_connect", handler.RequireLogin(handler.tryConnect)) api.HandleAPIMethod(api.POST, "/host/_enroll", handler.enrollHost) api.HandleAPIMethod(api.GET, "/host/:host_id/agent/info",handler.GetHostAgentInfo) api.HandleAPIMethod(api.GET, "/host/:host_id/processes",handler.GetHostElasticProcess) + + + api.HandleAPIMethod(api.POST, "/agent/install_command", handler.RequireLogin(handler.generateInstallCommand)) + api.HandleAPIMethod(api.GET, "/agent/install.sh", handler.getInstallScript) } diff --git a/modules/agent/api/instance.go b/modules/agent/api/instance.go index 63e935f9..b85480ca 100644 --- a/modules/agent/api/instance.go +++ b/modules/agent/api/instance.go @@ -8,17 +8,15 @@ import ( "context" "fmt" log "github.com/cihub/seelog" + common2 "infini.sh/console/modules/agent/common" "infini.sh/framework/core/agent" "infini.sh/framework/core/api" httprouter "infini.sh/framework/core/api/router" "infini.sh/framework/core/elastic" "infini.sh/framework/core/event" - "infini.sh/framework/core/host" "infini.sh/framework/core/orm" "infini.sh/framework/core/util" - common2 "infini.sh/console/modules/agent/common" elastic2 "infini.sh/framework/modules/elastic" - "infini.sh/framework/modules/elastic/adapter" "infini.sh/framework/modules/elastic/common" "net/http" "strconv" @@ -89,38 +87,6 @@ func (h *APIHandler) createInstance(w http.ResponseWriter, req *http.Request, ps } -func bindAgentToHostByIP(ag *agent.Instance) error{ - err, result := orm.GetBy("ip", ag.MajorIP, host.HostInfo{}) - if err != nil { - return err - } - if len(result.Result) > 0 { - buf := util.MustToJSONBytes(result.Result[0]) - hostInfo := &host.HostInfo{} - err = util.FromJSONBytes(buf, hostInfo) - if err != nil { - return err - } - sm := common2.GetStateManager() - if ag.Status == "" { - _, err1 := sm.GetAgentClient().GetHostInfo(nil, ag.GetEndpoint()) - if err1 == nil { - ag.Status = "online" - }else{ - ag.Status = "offline" - } - } - - hostInfo.AgentStatus = ag.Status - hostInfo.AgentID = ag.ID - err = orm.Update(nil, hostInfo) - if err != nil { - return err - } - } - return nil -} - func (h *APIHandler) getInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { id := ps.MustGetParameter("instance_id") @@ -172,11 +138,23 @@ func (h *APIHandler) deleteInstance(w http.ResponseWriter, req *http.Request, ps if sm := common2.GetStateManager(); sm != nil { sm.DeleteAgent(obj.ID) } + queryDsl := util.MapStr{ + "query": util.MapStr{ + "term": util.MapStr{ + "agent_id": util.MapStr{ + "value": id, + }, + }, + }, + } + err = orm.DeleteBy(agent.ESNodeInfo{}, util.MustToJSONBytes(queryDsl)) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error("delete node info error: ", err) + return + } - h.WriteJSON(w, util.MapStr{ - "_id": obj.ID, - "result": "deleted", - }, 200) + h.WriteDeletedOKJSON(w, id) } func (h *APIHandler) getInstanceStats(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { @@ -523,6 +501,24 @@ func (h *APIHandler) deleteESNode(w http.ResponseWriter, req *http.Request, ps h h.WriteAckOKJSON(w) } +func (h *APIHandler) tryConnect(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + var reqBody = struct { + Endpoint string `json:"endpoint"` + BasicAuth agent.BasicAuth + }{} + err := h.DecodeJSON(req, &reqBody) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + connectRes, err := common2.GetClient().GetInstanceBasicInfo(context.Background(), reqBody.Endpoint) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + h.WriteJSON(w, connectRes, http.StatusOK) +} + func refreshNodesInfo(inst *agent.Instance) ([]agent.ESNodeInfo, error) { nodesInfo, err := common2.GetClient().GetElasticsearchNodes(context.Background(), inst.GetEndpoint()) if err != nil { @@ -534,7 +530,6 @@ func refreshNodesInfo(inst *agent.Instance) ([]agent.ESNodeInfo, error) { } oldPids := map[int]struct{}{} var resultNodes []agent.ESNodeInfo - //settings, err := common2.GetAgentSettings(inst.ID, 0) if err != nil { return nil, err } @@ -555,24 +550,6 @@ func refreshNodesInfo(inst *agent.Instance) ([]agent.ESNodeInfo, error) { if oldNode != nil && oldNode.ClusterID != "" { node.ClusterID = oldNode.ClusterID } - //else{ - // if cfg := clusterCfgs[node.ClusterUuid]; cfg != nil { - // node.ClusterID = cfg.ID - // setting := pickAgentSettings(settings, node) - // if setting == nil { - // setting, err = getAgentTaskSetting(inst.ID, node) - // if err != nil { - // log.Error() - // } - // err = orm.Create(nil, setting) - // if err != nil { - // log.Error("save agent task setting error: ", err) - // } - // } - // }else{ - // //cluster not registered in console - // } - //} } node.Status = "online" @@ -626,6 +603,7 @@ func getNodesInfoFromES(agentID string) (map[int]*agent.ESNodeInfo, error){ }, }, }, + } q := orm.Query{ RawQuery: util.MustToJSONBytes(query), @@ -645,26 +623,6 @@ func getNodesInfoFromES(agentID string) (map[int]*agent.ESNodeInfo, error){ return nodesInfo, nil } -func getClusterConfigs() map[string]*elastic.ElasticsearchConfig { - cfgs := map[string]*elastic.ElasticsearchConfig{} - elastic.WalkConfigs(func(key, value interface{}) bool { - if cfg, ok := value.(*elastic.ElasticsearchConfig); ok { - clusterUUID := cfg.ClusterUUID - if cfg.ClusterUUID == "" { - verInfo, err := adapter.ClusterVersion(elastic.GetMetadata(cfg.ID)) - if err != nil { - log.Error(err) - return true - } - clusterUUID = verInfo.ClusterUUID - } - cfgs[clusterUUID] = cfg - } - return true - }) - return cfgs -} - func pickAgentSettings(settings []agent.Setting, nodeInfo agent.ESNodeInfo) *agent.Setting { for _, setting := range settings { if setting.Metadata.Labels["node_uuid"] == nodeInfo.NodeUUID { diff --git a/modules/agent/api/log.go b/modules/agent/api/log.go index d564f521..f36e2be0 100644 --- a/modules/agent/api/log.go +++ b/modules/agent/api/log.go @@ -88,6 +88,13 @@ func getAgentByNodeID(nodeID string) (*agent.Instance, *agent.ESNodeInfo, error) }, }, }, + "sort": []util.MapStr{ + { + "timestamp": util.MapStr{ + "order": "desc", + }, + }, + }, } q := &orm.Query{ RawQuery: util.MustToJSONBytes(queryDsl), diff --git a/modules/agent/api/setup.go b/modules/agent/api/setup.go new file mode 100644 index 00000000..3f07ae7f --- /dev/null +++ b/modules/agent/api/setup.go @@ -0,0 +1,144 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package api + +import ( + "fmt" + log "github.com/cihub/seelog" + "github.com/valyala/fasttemplate" + "infini.sh/console/modules/agent/common" + "infini.sh/framework/core/api/rbac" + httprouter "infini.sh/framework/core/api/router" + "infini.sh/framework/core/elastic" + "infini.sh/framework/core/global" + "infini.sh/framework/core/util" + "os" + + "net/http" + "path" + "strings" + "sync" + "time" +) + +var tokens = sync.Map{} +type Token struct { + CreatedAt time.Time + UserID string +} + +const ExpiredIn = time.Millisecond * 1000 * 60 * 20 +func (h *APIHandler) generateInstallCommand(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + claims, ok := req.Context().Value("user").(*rbac.UserClaims) + if !ok { + h.WriteError(w, "user not found", http.StatusInternalServerError) + return + } + var ( + t *Token + tokenStr string + ) + tokens.Range(func(key, value any) bool { + if v, ok := value.(*Token); ok && claims.UserId == v.UserID { + t = v + tokenStr = key.(string) + return false + } + return true + }) + + if t == nil { + tokenStr = util.GetUUID() + t = &Token{ + CreatedAt: time.Now(), + UserID: claims.UserId, + } + }else{ + if t.CreatedAt.Add(ExpiredIn).Before(time.Now()){ + tokens.Delete(tokenStr) + tokenStr = util.GetUUID() + t = &Token{ + CreatedAt: time.Now(), + UserID: claims.UserId, + } + }else{ + t.CreatedAt = time.Now() + } + } + tokens.Store(tokenStr, t) + agCfg := common.GetAgentConfig() + scriptEndpoint := agCfg.Setup.ScriptEndpoint + if scriptEndpoint == "" { + scheme := "http" + if req.TLS != nil { + scheme = "https" + } + scriptEndpoint = fmt.Sprintf("%s://%s", scheme, req.Host) + } + h.WriteJSON(w, util.MapStr{ + "script": fmt.Sprintf(`sudo bash -c "$(curl -L '%s/agent/install.sh?token=%s')"`, scriptEndpoint, tokenStr), + "token": tokenStr, + "expired_at": t.CreatedAt.Add(ExpiredIn), + }, http.StatusOK) +} + +func (h *APIHandler) getInstallScript(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + tokenStr := h.GetParameter(req, "token") + if strings.TrimSpace(tokenStr) == "" { + h.WriteError(w, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized) + return + } + if v, ok := tokens.Load(tokenStr); !ok { + h.WriteError(w, "token is invalid", http.StatusUnauthorized) + return + }else{ + if t, ok := v.(*Token); !ok || t.CreatedAt.Add(ExpiredIn).Before(time.Now()) { + tokens.Delete(tokenStr) + h.WriteError(w, "token was expired", http.StatusUnauthorized) + return + } + } + agCfg := common.GetAgentConfig() + caCert, clientCertPEM, clientKeyPEM, err := common.GenerateServerCert(agCfg.Setup.CACertFile, agCfg.Setup.CAKeyFile) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + scriptTplPath := path.Join(global.Env().GetConfigDir(), "install_agent.tpl") + buf, err := os.ReadFile(scriptTplPath) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + tpl := fasttemplate.New(string(buf), "{{", "}}") + downloadURL := agCfg.Setup.DownloadURL + if downloadURL == "" { + downloadURL = "https://release.infinilabs.com/agent/stable/" + } + esCfg := elastic.GetConfig(global.MustLookupString(elastic.GlobalSystemElasticsearchID)) + var ( + loggingESUser string + loggingESPassword string + ) + if esCfg.BasicAuth != nil { + loggingESUser = esCfg.BasicAuth.Username + loggingESPassword = esCfg.BasicAuth.Password + } + tpl.Execute(w, map[string]interface{}{ + "base_url": agCfg.Setup.DownloadURL, + "agent_version": agCfg.Setup.Version, + //"console_endpoint": util.MustToJSON(util.MustToJSON(gatewayEndpoints)), + "client_crt": clientCertPEM, + "client_key": clientKeyPEM, + "ca_crt": caCert, + "logging_es_endpoint": esCfg.Endpoint, + "logging_es_user": loggingESUser, + "logging_es_password": loggingESPassword, + }) +} + diff --git a/modules/agent/common/cert.go b/modules/agent/common/cert.go new file mode 100644 index 00000000..7b9e718c --- /dev/null +++ b/modules/agent/common/cert.go @@ -0,0 +1,63 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package common + +import ( + "crypto/rsa" + "crypto/x509" + "encoding/pem" + "infini.sh/framework/core/global" + "infini.sh/framework/core/util" + "os" + "path" +) + +func GenerateClientCert(caFile, caKey string) (caCert, clientCertPEM, clientKeyPEM []byte, err error){ + return generateCert(caFile, caKey, false) +} + +func GenerateServerCert(caFile, caKey string) (caCert, serverCertPEM, serverKeyPEM []byte, err error){ + return generateCert(caFile, caKey, true) +} + +func generateCert(caFile, caKey string, isServer bool)(caCert, instanceCertPEM, instanceKeyPEM []byte, err error){ + pool := x509.NewCertPool() + if caFile == "" { + caFile = path.Join(global.Env().GetConfigDir(), "certs", "ca.crt") + } + caCert, err = os.ReadFile(caFile) + if err != nil { + return + } + pool.AppendCertsFromPEM(caCert) + b, _ := pem.Decode(caCert) + var rootCert *x509.Certificate + caCertBytes := b.Bytes + rootCert, err = x509.ParseCertificate(b.Bytes) + if err != nil { + return + } + if caKey == "" { + caKey = path.Join(global.Env().GetConfigDir(), "certs", "ca.key") + } + var keyBytes []byte + keyBytes, err = os.ReadFile(caKey) + if err != nil { + return + } + b, _ = pem.Decode(keyBytes) + certKey, err := util.ParsePrivateKey(b.Bytes) + if err != nil { + return + } + if isServer{ + b = &pem.Block{Type: "CERTIFICATE", Bytes: caCertBytes} + certPEM := pem.EncodeToMemory(b) + instanceCertPEM, instanceKeyPEM, err = util.GenerateServerCert(rootCert, certKey.(*rsa.PrivateKey), certPEM, nil) + }else{ + _, instanceCertPEM, instanceKeyPEM = util.GetClientCert(rootCert, certKey) + } + return caCert, instanceCertPEM, instanceKeyPEM, nil +} diff --git a/modules/agent/common/client.go b/modules/agent/common/client.go index 704fdda2..cdbdf944 100644 --- a/modules/agent/common/client.go +++ b/modules/agent/common/client.go @@ -5,13 +5,19 @@ package common import ( + "bytes" "context" "fmt" "infini.sh/framework/core/agent" "infini.sh/framework/core/elastic" + "infini.sh/framework/core/global" "infini.sh/framework/core/host" "infini.sh/framework/core/util" + "io" "net/http" + "os" + "path" + "sync" ) type Client struct { @@ -188,16 +194,100 @@ func (client *Client) DeletePipeline(ctx context.Context, agentBaseURL, pipeline return client.doRequest(req, nil) } +//func (client *Client) doRequest(req *util.Request, respObj interface{}) error { +// result, err := util.ExecuteRequest(req) +// if err != nil { +// return err +// } +// if result.StatusCode != 200 { +// return fmt.Errorf(string(result.Body)) +// } +// if respObj == nil { +// return nil +// } +// return util.FromJSONBytes(result.Body, respObj) +//} + +var( + hClient *http.Client + hClientOnce = sync.Once{} +) func (client *Client) doRequest(req *util.Request, respObj interface{}) error { - result, err := util.ExecuteRequest(req) + agCfg := GetAgentConfig() + var err error + hClientOnce.Do(func() { + var ( + instanceCrt string + instanceKey string + ) + instanceCrt, instanceKey, err = getAgentInstanceCerts(agCfg.Setup.CACertFile, agCfg.Setup.CAKeyFile) + hClient, err = util.NewMTLSClient(agCfg.Setup.CACertFile, instanceCrt, instanceKey) + }) if err != nil { return err } - if result.StatusCode != 200 { - return fmt.Errorf(string(result.Body)) + var reader io.Reader + if len(req.Body) > 0 { + reader = bytes.NewReader(req.Body) } - if respObj == nil { - return nil + + var hr *http.Request + if req.Context == nil { + hr, err = http.NewRequest(req.Method, req.Url, reader) + }else{ + hr, err = http.NewRequestWithContext(req.Context, req.Method, req.Url, reader) } - return util.FromJSONBytes(result.Body, respObj) + if err != nil { + return err + } + res, err := hClient.Do(hr) + if err != nil { + return err + } + if respObj != nil { + defer res.Body.Close() + buf, err := io.ReadAll(res.Body) + if err != nil { + return err + } + err = util.FromJSONBytes(buf, respObj) + if err != nil { + return err + } + } + return nil } + +func getAgentInstanceCerts(caFile, caKey string) (string, string, error) { + dataDir := global.Env().GetDataDir() + instanceCrt := path.Join(dataDir, "certs/agent/instance.crt") + instanceKey := path.Join(dataDir, "certs/agent/instance.key") + var ( + err error + clientCertPEM []byte + clientKeyPEM []byte + ) + if util.FileExists(instanceCrt) && util.FileExists(instanceKey) { + return instanceCrt, instanceKey, nil + } + _, clientCertPEM, clientKeyPEM, err = GenerateClientCert(caFile, caKey) + if err != nil { + return "", "", err + } + baseDir := path.Join(dataDir, "certs/agent") + if !util.IsExist(baseDir){ + err = os.MkdirAll(baseDir, 0775) + if err != nil { + return "", "", err + } + } + _, err = util.FilePutContentWithByte(instanceCrt, clientCertPEM) + if err != nil { + return "", "", err + } + _, err = util.FilePutContentWithByte(instanceKey, clientKeyPEM) + if err != nil { + return "", "", err + } + return instanceCrt, instanceKey, nil +} \ No newline at end of file diff --git a/modules/agent/common/config.go b/modules/agent/common/config.go index 36b5568c..71fc0a04 100644 --- a/modules/agent/common/config.go +++ b/modules/agent/common/config.go @@ -4,6 +4,11 @@ package common +import ( + "infini.sh/framework/core/env" + "sync" +) + type AgentConfig struct { Enabled bool `config:"enabled"` StateManager struct{ @@ -18,4 +23,18 @@ type SetupConfig struct { CACertFile string `config:"ca_cert"` CAKeyFile string `config:"ca_key"` ScriptEndpoint string `config:"script_endpoint"` +} + +var agentCfg *AgentConfig +var onceCfg = sync.Once{} + +func GetAgentConfig() *AgentConfig { + onceCfg.Do(func() { + agentCfg = &AgentConfig{} + _, err := env.ParseConfig("agent", agentCfg ) + if err != nil { + panic(err) + } + }) + return agentCfg } \ No newline at end of file From 4af66c386ff524a84973c54b2b6d2a037f7f617f Mon Sep 17 00:00:00 2001 From: liugq Date: Tue, 30 May 2023 11:32:32 +0800 Subject: [PATCH 2/6] init update --- config/install_agent.tpl | 337 +++++++++++++++++++++++++++++++++++++++ console.yml | 18 ++- 2 files changed, 354 insertions(+), 1 deletion(-) create mode 100644 config/install_agent.tpl diff --git a/config/install_agent.tpl b/config/install_agent.tpl new file mode 100644 index 00000000..971fe2b9 --- /dev/null +++ b/config/install_agent.tpl @@ -0,0 +1,337 @@ +#!/bin/bash +# Agent install script for UNIX-like OS +# Author: INFINI +# BASE_URL : need, download server address,eg: https://release.infinilabs.com/agent/stable +# AGENT_VER : need, Agent version, eg: 0.4.0-126 +# INSTALL_PATH : option, download path. eg: /home/user/infini default: /opt +# ES_NAME +# ES_PWD + +printf "\n* _ ___ __ __ _____ " +printf "\n* /_\\ / _ \\ /__\\/\\ \\ \\/__ \\" +printf "\n* //_\\\\ / /_\\//_\\ / \\/ / / /\\/" +printf "\n* / _ \\/ /_\\\\//__/ /\\ / / / " +printf "\n* \\_/ \\_/\\____/\\__/\\_\\ \\/ \\/ \n\n" +# detect root user +if [ "$(echo "$UID")" = "0" ]; then + sudo_cmd='' +else + sudo_cmd='sudo' +fi + +################## +# colors +################## +RED="\033[31m" +CLR="\033[0m" +GREEN="\033[32m" + +################## +# validate os & arch +################## + +arch= +case $(uname -m) in + + "x86_64") + arch="amd64" + ;; + + "i386" | "i686") + arch="386" + ;; + + "aarch64") + arch="arm64" + ;; + + "arm" | "armv7l") + arch="arm" + ;; + + "arm64") + arch="arm64" + ;; + + *) + # shellcheck disable=SC2059 + printf "${RED}[E] Unsupport arch $(uname -m) ${CLR}\n" + exit 1 + ;; +esac + +os="linux" + +if [[ "$OSTYPE" == "darwin"* ]]; then + if [[ $arch != "amd64" ]] && [[ $arch != "arm64" ]]; then # Darwin only support amd64 and arm64 + # shellcheck disable=SC2059 + printf "${RED}[E] Darwin only support amd64/arm64.${CLR}\n" + exit 1; + fi + + os="mac" + + # # NOTE: under darwin, for arm64 and amd64, both use amd64 + # arch="arm" +fi + +################## +# validate params +################## + +base_url="{{base_url}}" +if [ -n "$BASE_URL" ]; then + base_url=$BASE_URL +fi + +agent_ver="{{agent_version}}" +if [ -n "$AGENT_VER" ]; then + agent_ver=$AGENT_VER +fi + +ca_crt="{{ca_crt}}" +client_crt="{{client_crt}}" +client_key="{{client_key}}" + +################## +# download agent +################## + +suffix="tar.gz" +if [[ "$os" == "mac" ]]; then + suffix="zip" +fi + +download_url="${base_url}/agent-${agent_ver}-${os}-${arch}.${suffix}" + +install_path="/opt" +if [ -n "$INSTALL_PATH" ]; then + install_path=$INSTALL_PATH +fi + +file_name="agent-${agent_ver}-${os}-${arch}.${suffix}" #agent在服务器上的文件名 +agent="${install_path}/agent/${file_name}" #agent下载后保存的文件 +agent_exc="${install_path}/agent/agent-${os}-${arch}" #agent可执行文件 + +agent_exsit="true" +if [ ! -d "${install_path}/agent" ]; then + printf "\n* mkdir ${install_path}/agent" + $sudo_cmd mkdir "${install_path}/agent" + agent_exsit="false" +fi + +if [ $? -ne 0 ]; then + exit 1 +fi + +printf "\n* downloading ${download_url}\n" + +printf "\n* save to : ${agent}\n" + +cd "$install_path/agent" + +sudo curl -O --progress-bar $download_url + +if [ $? -ne 0 ]; then + exit 1 +fi + +printf "\n* downloaded: ${agent}" + +################## +# install agent +################## + +printf "\n* start install" + +if [[ "${suffix}" == "zip" ]]; then + printf "\n* uzip ${agent}\n" + $sudo_cmd unzip $agent +else + printf "\n* tar -xzvf ${agent}\n" + $sudo_cmd tar -xzvf $agent +fi + +if [ $? -ne 0 ]; then + exit 1 +fi + +################## +# save cert +################## +$sudo_cmd mkdir config +$sudo_cmd sh -c "echo '${ca_crt}' > ./config/ca.crt" +$sudo_cmd sh -c "echo '${client_crt}' > ./config/client.crt" +$sudo_cmd sh -c "echo '${client_key}' > ./config/client.key" +if [ $? -ne 0 ]; then + exit 1 +fi + +## generate agent.yml +agent_config="path.configs: "config" +configs.auto_reload: true +env: + LOGGING_ES_ENDPOINT: {{logging_es_endpoint}} + LOGGING_ES_USER: {{logging_es_user}} + LOGGING_ES_PASS: {{logging_es_password}} + API_BINDING: "0.0.0.0:8080" + +path.data: data +path.logs: log + +api: + enabled: true + tls: + enabled: true + cert_file: "${install_path}/agent/config/client.crt" + key_file: "${install_path}/agent/config/client.key" + ca_file: "${install_path}/agent/config/ca.crt" + skip_insecure_verify: false + network: + binding: \$[[env.API_BINDING]] + +badger: + value_log_max_entries: 1000000 + value_log_file_size: 104857600 + value_threshold: 1024 + +metrics: + enabled: true + queue: metrics + network: + enabled: true + summary: true + details: true + memory: + metrics: + - swap + - memory + disk: + metrics: + - iops + - usage + cpu: + metrics: + - idle + - system + - user + - iowait + - load + instance: + enabled: true + +elasticsearch: + - name: default + enabled: true + endpoint: \$[[env.LOGGING_ES_ENDPOINT]] + discovery: + enabled: true + basic_auth: + username: \$[[env.LOGGING_ES_USER]] + password: \$[[env.LOGGING_ES_PASS]] + +pipeline: + - name: logs_indexing_merge + auto_start: true + keep_running: true + processor: + - indexing_merge: + index_name: ".infini_logs" + elasticsearch: "default" + input_queue: "logs" + idle_timeout_in_seconds: 10 + output_queue: + name: "logs_requests" + label: + tag: "logs" + worker_size: 1 + bulk_size_in_mb: 10 + - name: ingest_logs + auto_start: true + keep_running: true + processor: + - bulk_indexing: + bulk: + compress: true + batch_size_in_mb: 5 + batch_size_in_docs: 5000 + consumer: + fetch_max_messages: 100 + queues: + type: indexing_merge + tag: "logs" + when: + cluster_available: ["default"] + - name: metrics_indexing_merge + auto_start: true + keep_running: true + processor: + - indexing_merge: + elasticsearch: "default" + index_name: ".infini_metrics" + input_queue: "metrics" + output_queue: + name: "metrics_requests" + label: + tag: "metrics" + worker_size: 1 + bulk_size_in_mb: 5 + - name: ingest_metrics + auto_start: true + keep_running: true + processor: + - bulk_indexing: + bulk: + compress: true + batch_size_in_mb: 5 + batch_size_in_docs: 5000 + consumer: + fetch_max_messages: 100 + queues: + type: indexing_merge + tag: "metrics" + when: + cluster_available: ["default"] +agent: + major_ip_pattern: "192.*" +" + +agent_yml_path="${install_path}/agent/agent.yml" + +$sudo_cmd rm $agent_yml_path +$sudo_cmd touch $agent_yml_path +$sudo_cmd sh -c "echo '${agent_config}' > $agent_yml_path" + +if [ $? -ne 0 ]; then + exit 1 +fi + +$sudo_cmd chmod +x $agent_exc + +#try to stop and uninstall service +if [[ "$agent_exsit" == "true" ]]; then + printf "\n* stop && uninstall service\n" + $sudo_cmd $agent_exc -service stop + $sudo_cmd $agent_exc -service uninstall +fi + +printf "\n* start install service\n" +$sudo_cmd $agent_exc -service install + +if [ $? -ne 0 ]; then + exit 1 +fi + +printf "\n* service installed\n" +printf "\n* service starting >>>>>>\n" +$sudo_cmd $agent_exc -service start + +if [ $? -ne 0 ]; then + exit 1 +fi + +printf "\n* agent service started" + +printf "\n* ${GREEN}Congratulations, install success!${CLR}\n\n" + + diff --git a/console.yml b/console.yml index 590a4206..587d64b7 100644 --- a/console.yml +++ b/console.yml @@ -1,11 +1,19 @@ path.configs: "config" configs.auto_reload: true +env: +# INFINI_CONSOLE_ENDPOINT: "https://play.infinilabs.com:64443" + WECHAT_WEBHOOK_ENDPOINT: "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=9f5989ae-e82f-4e39-b4f9-cab0638d0229" + SLACK_WEBHOOK_ENDPOINT: "https://hooks.slack.com/services/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" + DINGTALK_WEBHOOK_ENDPOINT: "https://oapi.dingtalk.com/robot/send?access_token=6a5c7c9454ff74537a6de493153b1da68860942d4b0aeb33797cb68b5111b077" web: enabled: true embedding_api: true auth: enabled: true + websocket: + skip_host_verify: true + permitted_hosts: ["localhost:8000"] ui: enabled: true path: .public @@ -64,4 +72,12 @@ badger: # authorize_url: "https://github.com/login/oauth/authorize" # token_url: "https://github.com/login/oauth/access_token" # redirect_url: "" -# scopes: [] \ No newline at end of file +# scopes: [] + +agent: + setup: + download_url: "https://release.infinilabs.com/agent/snapshot" + version: 0.5.0_NIGHTLY-151 + ca_cert: "/Users/liugq/go/src/infini.sh/console/config/certs/ca.crt" + ca_key: "/Users/liugq/go/src/infini.sh/console/config/certs/ca.key" + script_endpoint: "http://localhost:9000" \ No newline at end of file From e9f9d925f2386cbbf9dada565d38c4b1803ae750 Mon Sep 17 00:00:00 2001 From: liugq Date: Tue, 30 May 2023 14:10:22 +0800 Subject: [PATCH 3/6] refactor agent module --- modules/agent/agent.go | 13 +- modules/agent/api/host.go | 10 +- modules/agent/api/instance.go | 27 +-- modules/agent/api/log.go | 7 +- modules/agent/{common => client}/client.go | 32 +++- modules/agent/common/config.go | 23 +-- modules/agent/common/helper.go | 117 +++++++++++- modules/agent/common/interface.go | 65 ------- modules/agent/model/config.go | 21 +++ modules/agent/model/const.go | 10 + .../agent/{common/model.go => model/task.go} | 2 +- modules/agent/{common => state}/state.go | 173 +++++------------- 12 files changed, 255 insertions(+), 245 deletions(-) rename modules/agent/{common => client}/client.go (84%) delete mode 100644 modules/agent/common/interface.go create mode 100644 modules/agent/model/config.go create mode 100644 modules/agent/model/const.go rename modules/agent/{common/model.go => model/task.go} (98%) rename modules/agent/{common => state}/state.go (65%) diff --git a/modules/agent/agent.go b/modules/agent/agent.go index 1461856e..8352021a 100644 --- a/modules/agent/agent.go +++ b/modules/agent/agent.go @@ -7,7 +7,10 @@ package agent import ( log "github.com/cihub/seelog" "infini.sh/console/modules/agent/api" + "infini.sh/console/modules/agent/client" "infini.sh/console/modules/agent/common" + "infini.sh/console/modules/agent/model" + "infini.sh/console/modules/agent/state" "infini.sh/framework/core/agent" "infini.sh/framework/core/env" "infini.sh/framework/core/host" @@ -38,7 +41,7 @@ func (module *AgentModule) Start() error { orm.RegisterSchemaWithIndexName(agent.ESNodeInfo{}, "agent-node") orm.RegisterSchemaWithIndexName(host.HostInfo{}, "host") orm.RegisterSchemaWithIndexName(agent.Setting{}, "agent-setting") - common.RegisterClient(&common.Client{}) + client.RegisterClient(&client.Client{}) if module.AgentConfig.StateManager.Enabled { onlineAgentIDs, err := common.GetLatestOnlineAgentIDs(nil, 60) @@ -56,8 +59,8 @@ func (module *AgentModule) Start() error { } } - sm := common.NewStateManager(time.Second*30, "agent_state", agentIds) - common.RegisterStateManager(sm) + sm := state.NewStateManager(time.Second*30, "agent_state", agentIds) + state.RegisterStateManager(sm) go sm.LoopState() } return nil @@ -69,12 +72,12 @@ func (module *AgentModule) Stop() error { } log.Info("start to stop agent module") if module.AgentConfig.StateManager.Enabled { - common.GetStateManager().Stop() + state.GetStateManager().Stop() } log.Info("agent module was stopped") return nil } type AgentModule struct { - common.AgentConfig + model.AgentConfig } diff --git a/modules/agent/api/host.go b/modules/agent/api/host.go index c873501c..3d1b15a1 100644 --- a/modules/agent/api/host.go +++ b/modules/agent/api/host.go @@ -7,13 +7,13 @@ package api import ( "context" "fmt" - common2 "infini.sh/console/modules/agent/common" + log "github.com/cihub/seelog" + "infini.sh/console/modules/agent/state" httprouter "infini.sh/framework/core/api/router" "infini.sh/framework/core/host" "infini.sh/framework/core/orm" "infini.sh/framework/core/util" "net/http" - log "github.com/cihub/seelog" "time" ) @@ -107,7 +107,7 @@ func (h *APIHandler) GetHostAgentInfo(w http.ResponseWriter, req *http.Request, return } - sm := common2.GetStateManager() + sm := state.GetStateManager() ag, err := sm.GetAgent(hostInfo.AgentID) if err != nil { log.Error(err) @@ -159,7 +159,7 @@ func (h *APIHandler) GetHostElasticProcess(w http.ResponseWriter, req *http.Requ h.WriteJSON(w, util.MapStr{}, http.StatusOK) return } - sm := common2.GetStateManager() + sm := state.GetStateManager() ag, err := sm.GetAgent(hostInfo.AgentID) if err != nil { log.Error(err) @@ -193,7 +193,7 @@ func (h *APIHandler) GetHostElasticProcess(w http.ResponseWriter, req *http.Requ } func enrollHostFromAgent(agentID string) (*host.HostInfo, error){ - sm := common2.GetStateManager() + sm := state.GetStateManager() ag, err := sm.GetAgent(agentID) if err != nil { return nil, err diff --git a/modules/agent/api/instance.go b/modules/agent/api/instance.go index b85480ca..2b681a9e 100644 --- a/modules/agent/api/instance.go +++ b/modules/agent/api/instance.go @@ -8,7 +8,10 @@ import ( "context" "fmt" log "github.com/cihub/seelog" + "infini.sh/console/modules/agent/client" common2 "infini.sh/console/modules/agent/common" + "infini.sh/console/modules/agent/model" + "infini.sh/console/modules/agent/state" "infini.sh/framework/core/agent" "infini.sh/framework/core/api" httprouter "infini.sh/framework/core/api/router" @@ -51,7 +54,7 @@ func (h *APIHandler) createInstance(w http.ResponseWriter, req *http.Request, ps return } //fetch more information of agent instance - res, err := common2.GetClient().GetInstanceBasicInfo(context.Background(), obj.GetEndpoint()) + res, err := client.GetClient().GetInstanceBasicInfo(context.Background(), obj.GetEndpoint()) if err != nil { errStr := fmt.Sprintf("get agent instance basic info error: %s", err.Error()) h.WriteError(w,errStr , http.StatusInternalServerError) @@ -71,7 +74,7 @@ func (h *APIHandler) createInstance(w http.ResponseWriter, req *http.Request, ps obj.IPS = res.IPS } - obj.Status = common2.StatusOnline + obj.Status = model.StatusOnline err = orm.Create(nil, obj) if err != nil { h.WriteError(w, err.Error(), http.StatusInternalServerError) @@ -135,7 +138,7 @@ func (h *APIHandler) deleteInstance(w http.ResponseWriter, req *http.Request, ps log.Error(err) return } - if sm := common2.GetStateManager(); sm != nil { + if sm := state.GetStateManager(); sm != nil { sm.DeleteAgent(obj.ID) } queryDsl := util.MapStr{ @@ -388,7 +391,7 @@ func (h *APIHandler) authESNode(w http.ResponseWriter, req *http.Request, ps htt return } cfg.BasicAuth = &basicAuth - nodeInfo, err := common2.GetClient().AuthESNode(context.Background(), inst.GetEndpoint(), *cfg) + nodeInfo, err := client.GetClient().AuthESNode(context.Background(), inst.GetEndpoint(), *cfg) if err != nil { log.Error(err) h.WriteError(w, err.Error(), http.StatusInternalServerError) @@ -511,7 +514,7 @@ func (h *APIHandler) tryConnect(w http.ResponseWriter, req *http.Request, ps htt h.WriteError(w, err.Error(), http.StatusInternalServerError) return } - connectRes, err := common2.GetClient().GetInstanceBasicInfo(context.Background(), reqBody.Endpoint) + connectRes, err := client.GetClient().GetInstanceBasicInfo(context.Background(), reqBody.Endpoint) if err != nil { h.WriteError(w, err.Error(), http.StatusInternalServerError) return @@ -520,7 +523,7 @@ func (h *APIHandler) tryConnect(w http.ResponseWriter, req *http.Request, ps htt } func refreshNodesInfo(inst *agent.Instance) ([]agent.ESNodeInfo, error) { - nodesInfo, err := common2.GetClient().GetElasticsearchNodes(context.Background(), inst.GetEndpoint()) + nodesInfo, err := client.GetClient().GetElasticsearchNodes(context.Background(), inst.GetEndpoint()) if err != nil { return nil, fmt.Errorf("get elasticsearch nodes error: %w", err) } @@ -656,7 +659,7 @@ func getAgentTaskSetting(agentID string, node agent.ESNodeInfo) (*agent.Setting, } // getSettingsByClusterID query agent task settings with cluster id -func getSettingsByClusterID(clusterID string) (*common2.TaskSetting, error) { +func getSettingsByClusterID(clusterID string) (*model.TaskSetting, error) { queryDsl := util.MapStr{ "size": 200, "query": util.MapStr{ @@ -704,8 +707,8 @@ func getSettingsByClusterID(clusterID string) (*common2.TaskSetting, error) { return nil, err } - setting := &common2.TaskSetting{ - NodeStats: &common2.NodeStatsTask{ + setting := &model.TaskSetting{ + NodeStats: &model.NodeStatsTask{ Enabled: true, }, } @@ -734,17 +737,17 @@ func getSettingsByClusterID(clusterID string) (*common2.TaskSetting, error) { } } if clusterStats { - setting.ClusterStats = &common2.ClusterStatsTask{ + setting.ClusterStats = &model.ClusterStatsTask{ Enabled: true, } } if indexStats { - setting.IndexStats = &common2.IndexStatsTask{ + setting.IndexStats = &model.IndexStatsTask{ Enabled: true, } } if clusterHealth { - setting.ClusterHealth = &common2.ClusterHealthTask{ + setting.ClusterHealth = &model.ClusterHealthTask{ Enabled: true, } } diff --git a/modules/agent/api/log.go b/modules/agent/api/log.go index f36e2be0..0164fc5d 100644 --- a/modules/agent/api/log.go +++ b/modules/agent/api/log.go @@ -7,7 +7,8 @@ package api import ( "fmt" log "github.com/cihub/seelog" - "infini.sh/console/modules/agent/common" + "infini.sh/console/modules/agent/client" + "infini.sh/console/modules/agent/state" "infini.sh/framework/core/agent" httprouter "infini.sh/framework/core/api/router" "infini.sh/framework/core/orm" @@ -31,7 +32,7 @@ func (h *APIHandler) getLogFilesByNode(w http.ResponseWriter, req *http.Request, }, http.StatusOK) return } - logFiles, err := common.GetClient().GetElasticLogFiles(nil, inst.GetEndpoint(), node.Path.Logs) + logFiles, err := client.GetClient().GetElasticLogFiles(nil, inst.GetEndpoint(), node.Path.Logs) if err != nil { h.WriteError(w, err.Error(), http.StatusInternalServerError) log.Error(err) @@ -68,7 +69,7 @@ func (h *APIHandler) getLogFileContent(w http.ResponseWriter, req *http.Request, return } reqBody.LogsPath = node.Path.Logs - sm := common.GetStateManager() + sm := state.GetStateManager() res, err := sm.GetAgentClient().GetElasticLogFileContent(nil, inst.GetEndpoint(), reqBody) if err != nil { h.WriteError(w, err.Error(), http.StatusInternalServerError) diff --git a/modules/agent/common/client.go b/modules/agent/client/client.go similarity index 84% rename from modules/agent/common/client.go rename to modules/agent/client/client.go index cdbdf944..5db68a14 100644 --- a/modules/agent/common/client.go +++ b/modules/agent/client/client.go @@ -2,12 +2,13 @@ * Web: https://infinilabs.com * Email: hello#infini.ltd */ -package common +package client import ( "bytes" "context" "fmt" + "infini.sh/console/modules/agent/common" "infini.sh/framework/core/agent" "infini.sh/framework/core/elastic" "infini.sh/framework/core/global" @@ -20,6 +21,31 @@ import ( "sync" ) +var defaultClient ClientAPI + +func GetClient() ClientAPI { + if defaultClient == nil { + panic("agent client not init") + } + return defaultClient +} + +func RegisterClient(client ClientAPI) { + defaultClient = client +} +type ClientAPI interface { + GetHostInfo(ctx context.Context, agentBaseURL string) (*host.HostInfo, error) + GetElasticProcess(ctx context.Context, agentBaseURL string, agentID string)(interface{}, error) + GetElasticLogFiles(ctx context.Context, agentBaseURL string, logsPath string)(interface{}, error) + GetElasticLogFileContent(ctx context.Context, agentBaseURL string, body interface{})(interface{}, error) + GetInstanceBasicInfo(ctx context.Context, agentBaseURL string) (*agent.Instance, error) + RegisterElasticsearch(ctx context.Context, agentBaseURL string, cfgs []elastic.ElasticsearchConfig) error + GetElasticsearchNodes(ctx context.Context, agentBaseURL string) ([]agent.ESNodeInfo, error) + AuthESNode(ctx context.Context, agentBaseURL string, cfg elastic.ElasticsearchConfig) (*agent.ESNodeInfo, error) + CreatePipeline(ctx context.Context, agentBaseURL string, body []byte) error + DeletePipeline(ctx context.Context, agentBaseURL, pipelineID string) error +} + type Client struct { } @@ -213,7 +239,7 @@ var( hClientOnce = sync.Once{} ) func (client *Client) doRequest(req *util.Request, respObj interface{}) error { - agCfg := GetAgentConfig() + agCfg := common.GetAgentConfig() var err error hClientOnce.Do(func() { var ( @@ -270,7 +296,7 @@ func getAgentInstanceCerts(caFile, caKey string) (string, string, error) { if util.FileExists(instanceCrt) && util.FileExists(instanceKey) { return instanceCrt, instanceKey, nil } - _, clientCertPEM, clientKeyPEM, err = GenerateClientCert(caFile, caKey) + _, clientCertPEM, clientKeyPEM, err = common.GenerateClientCert(caFile, caKey) if err != nil { return "", "", err } diff --git a/modules/agent/common/config.go b/modules/agent/common/config.go index 71fc0a04..643b67fe 100644 --- a/modules/agent/common/config.go +++ b/modules/agent/common/config.go @@ -5,32 +5,17 @@ package common import ( + "infini.sh/console/modules/agent/model" "infini.sh/framework/core/env" "sync" ) -type AgentConfig struct { - Enabled bool `config:"enabled"` - StateManager struct{ - Enabled bool `config:"enabled"` - } `config:"state_manager"` - Setup SetupConfig `config:"setup"` -} - -type SetupConfig struct { - DownloadURL string `config:"download_url"` - Version string `config:"version"` - CACertFile string `config:"ca_cert"` - CAKeyFile string `config:"ca_key"` - ScriptEndpoint string `config:"script_endpoint"` -} - -var agentCfg *AgentConfig +var agentCfg *model.AgentConfig var onceCfg = sync.Once{} -func GetAgentConfig() *AgentConfig { +func GetAgentConfig() *model.AgentConfig { onceCfg.Do(func() { - agentCfg = &AgentConfig{} + agentCfg = &model.AgentConfig{} _, err := env.ParseConfig("agent", agentCfg ) if err != nil { panic(err) diff --git a/modules/agent/common/helper.go b/modules/agent/common/helper.go index 5763fa7c..8e760104 100644 --- a/modules/agent/common/helper.go +++ b/modules/agent/common/helper.go @@ -6,13 +6,16 @@ package common import ( "fmt" + "infini.sh/console/modules/agent/model" "infini.sh/framework/core/agent" "infini.sh/framework/core/elastic" + "infini.sh/framework/core/event" "infini.sh/framework/core/orm" "infini.sh/framework/core/util" + log "src/github.com/cihub/seelog" ) -func ParseAgentSettings(settings []agent.Setting)(*ParseAgentSettingsResult, error){ +func ParseAgentSettings(settings []agent.Setting)(*model.ParseAgentSettingsResult, error){ var clusterCfgs []elastic.ElasticsearchConfig var ( pipelines []util.MapStr @@ -49,7 +52,7 @@ func ParseAgentSettings(settings []agent.Setting)(*ParseAgentSettingsResult, err if err != nil { return nil, err } - taskSetting := TaskSetting{} + taskSetting := model.TaskSetting{} err = util.FromJSONBytes(vBytes, &taskSetting) if err != nil { return nil, err @@ -61,7 +64,7 @@ func ParseAgentSettings(settings []agent.Setting)(*ParseAgentSettingsResult, err pipelines = append(pipelines, partPipelines...) toDeletePipelineNames = append(toDeletePipelineNames, partDeletePipelineNames...) } - return &ParseAgentSettingsResult{ + return &model.ParseAgentSettingsResult{ ClusterConfigs: clusterCfgs, Pipelines: pipelines, ToDeletePipelineNames: toDeletePipelineNames, @@ -132,7 +135,7 @@ func GetAgentSettings(agentID string, timestamp int64) ([]agent.Setting, error) return settings, nil } -func TransformSettingsToConfig(setting *TaskSetting, clusterID, nodeUUID string) ([]util.MapStr, []string, error) { +func TransformSettingsToConfig(setting *model.TaskSetting, clusterID, nodeUUID string) ([]util.MapStr, []string, error) { if setting == nil { return nil, nil, fmt.Errorf("empty setting") } @@ -227,3 +230,109 @@ func getMetricPipelineName(clusterID, processorName string) string{ return fmt.Sprintf("collect_%s_%s", clusterID, processorName) } + +func LoadAgentsFromES(clusterID string) ([]agent.Instance, error) { + q := orm.Query{ + Size: 1000, + } + if clusterID != "" { + q.Conds = orm.And(orm.Eq("id", clusterID)) + } + err, result := orm.Search(agent.Instance{}, &q) + if err != nil { + return nil, fmt.Errorf("query agent error: %w", err) + } + + if len(result.Result) > 0 { + var agents = make([]agent.Instance, 0, len(result.Result)) + for _, row := range result.Result { + ag := agent.Instance{} + bytes := util.MustToJSONBytes(row) + err = util.FromJSONBytes(bytes, &ag) + if err != nil { + log.Errorf("got unexpected agent: %s, error: %v", string(bytes), err) + continue + } + agents = append(agents, ag) + } + return agents, nil + } + return nil, nil +} + +func GetLatestOnlineAgentIDs(agentIds []string, lastSeconds int) (map[string]struct{}, error) { + q := orm.Query{ + WildcardIndex: true, + } + mustQ := []util.MapStr{ + { + "term": util.MapStr{ + "metadata.name": util.MapStr{ + "value": "agent", + }, + }, + }, + { + "term": util.MapStr{ + "metadata.category": util.MapStr{ + "value": "instance", + }, + }, + }, + } + if len(agentIds) > 0 { + mustQ = append(mustQ, util.MapStr{ + "terms": util.MapStr{ + "agent.id": agentIds, + }, + }) + } + queryDSL := util.MapStr{ + "_source": "agent.id", + "sort": []util.MapStr{ + { + "timestamp": util.MapStr{ + "order": "desc", + }, + }, + }, + "collapse": util.MapStr{ + "field": "agent.id", + }, + "query": util.MapStr{ + "bool": util.MapStr{ + "filter": []util.MapStr{ + { + "range": util.MapStr{ + "timestamp": util.MapStr{ + "gte": fmt.Sprintf("now-%ds", lastSeconds), + }, + }, + }, + }, + "must": mustQ, + }, + }, + } + q.RawQuery = util.MustToJSONBytes(queryDSL) + err, result := orm.Search(event.Event{}, &q) + if err != nil { + return nil, fmt.Errorf("query agent instance metric error: %w", err) + } + agentIDs := map[string]struct{}{} + if len(result.Result) > 0 { + searchRes := elastic.SearchResponse{} + err = util.FromJSONBytes(result.Raw, &searchRes) + if err != nil { + return nil, err + } + agentIDKeyPath := []string{"agent", "id"} + for _, hit := range searchRes.Hits.Hits { + agentID, _ := util.GetMapValueByKeys(agentIDKeyPath, hit.Source) + if v, ok := agentID.(string); ok { + agentIDs[v] = struct{}{} + } + } + } + return agentIDs, nil +} \ No newline at end of file diff --git a/modules/agent/common/interface.go b/modules/agent/common/interface.go deleted file mode 100644 index 6407358f..00000000 --- a/modules/agent/common/interface.go +++ /dev/null @@ -1,65 +0,0 @@ -/* Copyright © INFINI Ltd. All rights reserved. - * Web: https://infinilabs.com - * Email: hello#infini.ltd */ - -package common - -import ( - "context" - "infini.sh/framework/core/agent" - "infini.sh/framework/core/elastic" - "infini.sh/framework/core/host" -) - -var defaultClient ClientAPI - -func GetClient() ClientAPI { - if defaultClient == nil { - panic("agent client not init") - } - return defaultClient -} - -func RegisterClient(client ClientAPI) { - defaultClient = client -} -type ClientAPI interface { - GetHostInfo(ctx context.Context, agentBaseURL string) (*host.HostInfo, error) - GetElasticProcess(ctx context.Context, agentBaseURL string, agentID string)(interface{}, error) - GetElasticLogFiles(ctx context.Context, agentBaseURL string, logsPath string)(interface{}, error) - GetElasticLogFileContent(ctx context.Context, agentBaseURL string, body interface{})(interface{}, error) - GetInstanceBasicInfo(ctx context.Context, agentBaseURL string) (*agent.Instance, error) - RegisterElasticsearch(ctx context.Context, agentBaseURL string, cfgs []elastic.ElasticsearchConfig) error - GetElasticsearchNodes(ctx context.Context, agentBaseURL string) ([]agent.ESNodeInfo, error) - AuthESNode(ctx context.Context, agentBaseURL string, cfg elastic.ElasticsearchConfig) (*agent.ESNodeInfo, error) - CreatePipeline(ctx context.Context, agentBaseURL string, body []byte) error - DeletePipeline(ctx context.Context, agentBaseURL, pipelineID string) error -} - - -var stateManager IStateManager - -func GetStateManager() IStateManager { - if stateManager == nil { - panic("agent state manager not init") - } - return stateManager -} - -func RegisterStateManager(sm IStateManager) { - stateManager = sm -} - -func IsEnabled() bool { - return stateManager != nil -} - -type IStateManager interface { - GetAgent(ID string) (*agent.Instance, error) - UpdateAgent(inst *agent.Instance, syncToES bool) (*agent.Instance, error) - GetTaskAgent(clusterID string) (*agent.Instance, error) - DeleteAgent(agentID string) error - LoopState() - Stop() - GetAgentClient() ClientAPI -} \ No newline at end of file diff --git a/modules/agent/model/config.go b/modules/agent/model/config.go new file mode 100644 index 00000000..465e8f52 --- /dev/null +++ b/modules/agent/model/config.go @@ -0,0 +1,21 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package model + +type AgentConfig struct { + Enabled bool `config:"enabled"` + StateManager struct{ + Enabled bool `config:"enabled"` + } `config:"state_manager"` + Setup SetupConfig `config:"setup"` +} + +type SetupConfig struct { + DownloadURL string `config:"download_url"` + Version string `config:"version"` + CACertFile string `config:"ca_cert"` + CAKeyFile string `config:"ca_key"` + ScriptEndpoint string `config:"script_endpoint"` +} diff --git a/modules/agent/model/const.go b/modules/agent/model/const.go new file mode 100644 index 00000000..6115f841 --- /dev/null +++ b/modules/agent/model/const.go @@ -0,0 +1,10 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package model + +const ( + StatusOnline string = "online" + StatusOffline = "offline" +) diff --git a/modules/agent/common/model.go b/modules/agent/model/task.go similarity index 98% rename from modules/agent/common/model.go rename to modules/agent/model/task.go index e46452be..d8b42006 100644 --- a/modules/agent/common/model.go +++ b/modules/agent/model/task.go @@ -2,7 +2,7 @@ * Web: https://infinilabs.com * Email: hello#infini.ltd */ -package common +package model import ( "infini.sh/framework/core/elastic" diff --git a/modules/agent/common/state.go b/modules/agent/state/state.go similarity index 65% rename from modules/agent/common/state.go rename to modules/agent/state/state.go index de26a0ee..27507948 100644 --- a/modules/agent/common/state.go +++ b/modules/agent/state/state.go @@ -2,16 +2,17 @@ * Web: https://infinilabs.com * Email: hello#infini.ltd */ -package common +package state import ( "context" "fmt" "github.com/buger/jsonparser" log "github.com/cihub/seelog" + "infini.sh/console/modules/agent/client" + "infini.sh/console/modules/agent/common" + "infini.sh/console/modules/agent/model" "infini.sh/framework/core/agent" - "infini.sh/framework/core/elastic" - "infini.sh/framework/core/event" "infini.sh/framework/core/host" "infini.sh/framework/core/kv" "infini.sh/framework/core/orm" @@ -23,17 +24,39 @@ import ( "time" ) -const ( - StatusOnline string = "online" - StatusOffline = "offline" -) +var stateManager IStateManager + +func GetStateManager() IStateManager { + if stateManager == nil { + panic("agent state manager not init") + } + return stateManager +} + +func RegisterStateManager(sm IStateManager) { + stateManager = sm +} + +func IsEnabled() bool { + return stateManager != nil +} + +type IStateManager interface { + GetAgent(ID string) (*agent.Instance, error) + UpdateAgent(inst *agent.Instance, syncToES bool) (*agent.Instance, error) + GetTaskAgent(clusterID string) (*agent.Instance, error) + DeleteAgent(agentID string) error + LoopState() + Stop() + GetAgentClient() client.ClientAPI +} type StateManager struct { TTL time.Duration // kv ttl KVKey string stopC chan struct{} stopCompleteC chan struct{} - agentClient *Client + agentClient *client.Client agentIds map[string]string agentMutex sync.Mutex workerChan chan struct{} @@ -46,7 +69,7 @@ func NewStateManager(TTL time.Duration, kvKey string, agentIds map[string]string KVKey: kvKey, stopC: make(chan struct{}), stopCompleteC: make(chan struct{}), - agentClient: &Client{}, + agentClient: &client.Client{}, agentIds: agentIds, workerChan: make(chan struct{}, runtime.NumCPU()), timestamps: map[string]int64{}, @@ -54,7 +77,7 @@ func NewStateManager(TTL time.Duration, kvKey string, agentIds map[string]string } func (sm *StateManager) checkAgentStatus() { - onlineAgentIDs, err := GetLatestOnlineAgentIDs(nil, int(sm.TTL.Seconds())) + onlineAgentIDs, err := common.GetLatestOnlineAgentIDs(nil, int(sm.TTL.Seconds())) if err != nil { log.Error(err) return @@ -64,32 +87,32 @@ func (sm *StateManager) checkAgentStatus() { for agentID := range onlineAgentIDs { if _, ok := sm.agentIds[agentID]; !ok { log.Infof("status of agent [%s] changed to online", agentID) - sm.agentIds[agentID] = StatusOnline + sm.agentIds[agentID] = model.StatusOnline } } sm.agentMutex.Unlock() for agentID, status := range sm.agentIds { if _, ok := onlineAgentIDs[agentID]; ok { sm.syncSettings(agentID) - host.UpdateHostAgentStatus(agentID, StatusOnline) - if status == StatusOnline { + host.UpdateHostAgentStatus(agentID, model.StatusOnline) + if status == model.StatusOnline { continue } // status change to online - sm.agentIds[agentID] = StatusOnline + sm.agentIds[agentID] = model.StatusOnline log.Infof("status of agent [%s] changed to online", agentID) //set timestamp equals 0 to create pipeline sm.timestamps[agentID] = 0 continue }else{ // already offline - if status == StatusOffline { + if status == model.StatusOffline { continue } } // status change to offline // todo validate whether agent is offline - sm.agentIds[agentID] = StatusOffline + sm.agentIds[agentID] = model.StatusOffline sm.workerChan <- struct{}{} go func(agentID string) { defer func() { @@ -104,7 +127,7 @@ func (sm *StateManager) checkAgentStatus() { log.Error(err) return } - ag.Status = StatusOffline + ag.Status = model.StatusOffline log.Infof("agent [%s] is offline", ag.Endpoint) _, err = sm.UpdateAgent(ag, true) if err != nil { @@ -112,7 +135,7 @@ func (sm *StateManager) checkAgentStatus() { return } //update host agent status - host.UpdateHostAgentStatus(ag.ID, StatusOffline) + host.UpdateHostAgentStatus(ag.ID, model.StatusOffline) }(agentID) } @@ -120,7 +143,7 @@ func (sm *StateManager) checkAgentStatus() { func (sm *StateManager) syncSettings(agentID string) { newTimestamp := time.Now().UnixMilli() - settings, err := GetAgentSettings(agentID, sm.timestamps[agentID]) + settings, err := common.GetAgentSettings(agentID, sm.timestamps[agentID]) if err != nil { log.Errorf("query agent settings error: %v", err) return @@ -129,7 +152,7 @@ func (sm *StateManager) syncSettings(agentID string) { log.Debugf("got no settings of agent [%s]", agentID) return } - parseResult, err := ParseAgentSettings(settings) + parseResult, err := common.ParseAgentSettings(settings) if err != nil { log.Errorf("parse agent settings error: %v", err) return @@ -168,7 +191,7 @@ func (sm *StateManager) syncSettings(agentID string) { } func (sm *StateManager) getAvailableAgent(clusterID string) (*agent.Instance, error) { - agents, err := LoadAgentsFromES(clusterID) + agents, err := common.LoadAgentsFromES(clusterID) if err != nil { return nil, err } @@ -261,112 +284,6 @@ func (sm *StateManager) DeleteAgent(agentID string) error { return kv.DeleteKey(sm.KVKey, []byte(agentID)) } -func (sm *StateManager) GetAgentClient() ClientAPI { +func (sm *StateManager) GetAgentClient() client.ClientAPI { return sm.agentClient } - -func LoadAgentsFromES(clusterID string) ([]agent.Instance, error) { - q := orm.Query{ - Size: 1000, - } - if clusterID != "" { - q.Conds = orm.And(orm.Eq("id", clusterID)) - } - err, result := orm.Search(agent.Instance{}, &q) - if err != nil { - return nil, fmt.Errorf("query agent error: %w", err) - } - - if len(result.Result) > 0 { - var agents = make([]agent.Instance, 0, len(result.Result)) - for _, row := range result.Result { - ag := agent.Instance{} - bytes := util.MustToJSONBytes(row) - err = util.FromJSONBytes(bytes, &ag) - if err != nil { - log.Errorf("got unexpected agent: %s, error: %v", string(bytes), err) - continue - } - agents = append(agents, ag) - } - return agents, nil - } - return nil, nil -} - -func GetLatestOnlineAgentIDs(agentIds []string, lastSeconds int) (map[string]struct{}, error) { - q := orm.Query{ - WildcardIndex: true, - } - mustQ := []util.MapStr{ - { - "term": util.MapStr{ - "metadata.name": util.MapStr{ - "value": "agent", - }, - }, - }, - { - "term": util.MapStr{ - "metadata.category": util.MapStr{ - "value": "instance", - }, - }, - }, - } - if len(agentIds) > 0 { - mustQ = append(mustQ, util.MapStr{ - "terms": util.MapStr{ - "agent.id": agentIds, - }, - }) - } - queryDSL := util.MapStr{ - "_source": "agent.id", - "sort": []util.MapStr{ - { - "timestamp": util.MapStr{ - "order": "desc", - }, - }, - }, - "collapse": util.MapStr{ - "field": "agent.id", - }, - "query": util.MapStr{ - "bool": util.MapStr{ - "filter": []util.MapStr{ - { - "range": util.MapStr{ - "timestamp": util.MapStr{ - "gte": fmt.Sprintf("now-%ds", lastSeconds), - }, - }, - }, - }, - "must": mustQ, - }, - }, - } - q.RawQuery = util.MustToJSONBytes(queryDSL) - err, result := orm.Search(event.Event{}, &q) - if err != nil { - return nil, fmt.Errorf("query agent instance metric error: %w", err) - } - agentIDs := map[string]struct{}{} - if len(result.Result) > 0 { - searchRes := elastic.SearchResponse{} - err = util.FromJSONBytes(result.Raw, &searchRes) - if err != nil { - return nil, err - } - agentIDKeyPath := []string{"agent", "id"} - for _, hit := range searchRes.Hits.Hits { - agentID, _ := util.GetMapValueByKeys(agentIDKeyPath, hit.Source) - if v, ok := agentID.(string); ok { - agentIDs[v] = struct{}{} - } - } - } - return agentIDs, nil -} \ No newline at end of file From 0cd8aac95629eca796560ad96a3c24ca5abee8ee Mon Sep 17 00:00:00 2001 From: liugq Date: Thu, 1 Jun 2023 10:20:56 +0800 Subject: [PATCH 4/6] push config to agent through saving file --- config/install_agent.tpl | 117 +++-------------- modules/agent/agent.go | 19 ++- modules/agent/api/instance.go | 115 ++++++++++++++--- modules/agent/api/setup.go | 39 +++--- modules/agent/client/client.go | 130 +++++-------------- modules/agent/client/executor.go | 100 +++++++++++++++ modules/agent/common/cert.go | 34 +++++ modules/agent/common/config.go | 15 +-- modules/agent/common/helper.go | 207 ++++++++++++++++++++++++++----- modules/agent/model/config.go | 7 +- modules/agent/state/state.go | 96 +++++++++----- 11 files changed, 569 insertions(+), 310 deletions(-) create mode 100644 modules/agent/client/executor.go diff --git a/config/install_agent.tpl b/config/install_agent.tpl index 971fe2b9..e9fa4676 100644 --- a/config/install_agent.tpl +++ b/config/install_agent.tpl @@ -156,6 +156,8 @@ if [ $? -ne 0 ]; then exit 1 fi +rm -f ${agent} + ################## # save cert ################## @@ -167,14 +169,13 @@ if [ $? -ne 0 ]; then exit 1 fi +port="{{port}}" + ## generate agent.yml agent_config="path.configs: "config" configs.auto_reload: true env: - LOGGING_ES_ENDPOINT: {{logging_es_endpoint}} - LOGGING_ES_USER: {{logging_es_user}} - LOGGING_ES_PASS: {{logging_es_password}} - API_BINDING: "0.0.0.0:8080" + API_BINDING: "0.0.0.0:${port}" path.data: data path.logs: log @@ -194,104 +195,6 @@ badger: value_log_max_entries: 1000000 value_log_file_size: 104857600 value_threshold: 1024 - -metrics: - enabled: true - queue: metrics - network: - enabled: true - summary: true - details: true - memory: - metrics: - - swap - - memory - disk: - metrics: - - iops - - usage - cpu: - metrics: - - idle - - system - - user - - iowait - - load - instance: - enabled: true - -elasticsearch: - - name: default - enabled: true - endpoint: \$[[env.LOGGING_ES_ENDPOINT]] - discovery: - enabled: true - basic_auth: - username: \$[[env.LOGGING_ES_USER]] - password: \$[[env.LOGGING_ES_PASS]] - -pipeline: - - name: logs_indexing_merge - auto_start: true - keep_running: true - processor: - - indexing_merge: - index_name: ".infini_logs" - elasticsearch: "default" - input_queue: "logs" - idle_timeout_in_seconds: 10 - output_queue: - name: "logs_requests" - label: - tag: "logs" - worker_size: 1 - bulk_size_in_mb: 10 - - name: ingest_logs - auto_start: true - keep_running: true - processor: - - bulk_indexing: - bulk: - compress: true - batch_size_in_mb: 5 - batch_size_in_docs: 5000 - consumer: - fetch_max_messages: 100 - queues: - type: indexing_merge - tag: "logs" - when: - cluster_available: ["default"] - - name: metrics_indexing_merge - auto_start: true - keep_running: true - processor: - - indexing_merge: - elasticsearch: "default" - index_name: ".infini_metrics" - input_queue: "metrics" - output_queue: - name: "metrics_requests" - label: - tag: "metrics" - worker_size: 1 - bulk_size_in_mb: 5 - - name: ingest_metrics - auto_start: true - keep_running: true - processor: - - bulk_indexing: - bulk: - compress: true - batch_size_in_mb: 5 - batch_size_in_docs: 5000 - consumer: - fetch_max_messages: 100 - queues: - type: indexing_merge - tag: "metrics" - when: - cluster_available: ["default"] agent: major_ip_pattern: "192.*" " @@ -331,6 +234,16 @@ if [ $? -ne 0 ]; then fi printf "\n* agent service started" +console_endpoint="{{console_endpoint}}" +sleep 3 +printf "\n* start register\n" +token={{token}} +curl -X POST ${console_endpoint}/agent/instance?token=${token} + +if [ $? -ne 0 ]; then + exit 1 +fi +printf "\n* agent registered\n" printf "\n* ${GREEN}Congratulations, install success!${CLR}\n\n" diff --git a/modules/agent/agent.go b/modules/agent/agent.go index 8352021a..9e8c3ee5 100644 --- a/modules/agent/agent.go +++ b/modules/agent/agent.go @@ -41,7 +41,22 @@ func (module *AgentModule) Start() error { orm.RegisterSchemaWithIndexName(agent.ESNodeInfo{}, "agent-node") orm.RegisterSchemaWithIndexName(host.HostInfo{}, "host") orm.RegisterSchemaWithIndexName(agent.Setting{}, "agent-setting") - client.RegisterClient(&client.Client{}) + var ( + executor client.Executor + err error + ) + if module.AgentConfig.Setup == nil { + executor = &client.HttpExecutor{} + }else{ + executor, err = client.NewMTLSExecutor(module.AgentConfig.Setup.CACertFile, module.AgentConfig.Setup.CAKeyFile) + if err != nil { + panic(err) + } + } + agClient := &client.Client{ + Executor: executor, + } + client.RegisterClient(agClient) if module.AgentConfig.StateManager.Enabled { onlineAgentIDs, err := common.GetLatestOnlineAgentIDs(nil, 60) @@ -59,7 +74,7 @@ func (module *AgentModule) Start() error { } } - sm := state.NewStateManager(time.Second*30, "agent_state", agentIds) + sm := state.NewStateManager(time.Second*30, "agent_state", agentIds, agClient) state.RegisterStateManager(sm) go sm.LoopState() } diff --git a/modules/agent/api/instance.go b/modules/agent/api/instance.go index 2b681a9e..eab6e74f 100644 --- a/modules/agent/api/instance.go +++ b/modules/agent/api/instance.go @@ -23,6 +23,7 @@ import ( "infini.sh/framework/modules/elastic/common" "net/http" "strconv" + "time" ) type APIHandler struct { @@ -37,22 +38,30 @@ func (h *APIHandler) createInstance(w http.ResponseWriter, req *http.Request, ps log.Error(err) return } - - oldInst := &agent.Instance{} - oldInst.ID = obj.ID - exists, err := orm.Get(oldInst) - - if err != nil && err != elastic2.ErrNotFound { - h.WriteError(w, err.Error(), http.StatusInternalServerError) - log.Error(err) - return - } - if exists { - errMsg := fmt.Sprintf("agent [%s] already exists", obj.ID) - h.WriteError(w, errMsg, http.StatusInternalServerError) - log.Error(errMsg) - return + //validate token for auto register + token := h.GetParameter(req, "token") + if token != "" { + if v, ok := tokens.Load(token); !ok { + h.WriteError(w, "token is invalid", http.StatusUnauthorized) + return + }else{ + if t, ok := v.(*Token); !ok || t.CreatedAt.Add(ExpiredIn).Before(time.Now()) { + tokens.Delete(token) + h.WriteError(w, "token was expired", http.StatusUnauthorized) + return + } + } + remoteIP := util.ClientIP(req) + agCfg := common2.GetAgentConfig() + port := agCfg.Setup.Port + if port == "" { + port = "8080" + } + obj.Endpoint = fmt.Sprintf("https://%s:%s", remoteIP, port) + obj.Tags = append(obj.Tags, "mtls") + obj.Tags = append(obj.Tags, "auto") } + //fetch more information of agent instance res, err := client.GetClient().GetInstanceBasicInfo(context.Background(), obj.GetEndpoint()) if err != nil { @@ -72,6 +81,24 @@ func (h *APIHandler) createInstance(w http.ResponseWriter, req *http.Request, ps obj.MajorIP = res.MajorIP obj.Host = res.Host obj.IPS = res.IPS + if obj.Name == "" { + obj.Name = res.Name + } + } + oldInst := &agent.Instance{} + oldInst.ID = obj.ID + exists, err := orm.Get(oldInst) + + if err != nil && err != elastic2.ErrNotFound { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + if exists { + errMsg := fmt.Sprintf("agent [%s] already exists", obj.ID) + h.WriteError(w, errMsg, http.StatusInternalServerError) + log.Error(errMsg) + return } obj.Status = model.StatusOnline @@ -85,11 +112,33 @@ func (h *APIHandler) createInstance(w http.ResponseWriter, req *http.Request, ps if err != nil { log.Error(err) } + err = pushIngestConfigToAgent(obj) + if err != nil { + log.Error(err) + } h.WriteCreatedOKJSON(w, obj.ID) } +func pushIngestConfigToAgent(inst *agent.Instance) error{ + ingestCfg, basicAuth, err := common2.GetAgentIngestConfig() + if err != nil { + return err + } + if basicAuth != nil && basicAuth.Password != "" { + err = client.GetClient().SetKeystoreValue(context.Background(), inst.GetEndpoint(), "ingest_cluster_password", basicAuth.Password) + if err != nil { + return fmt.Errorf("set keystore value to agent error: %w", err) + } + } + err = client.GetClient().SaveDynamicConfig(context.Background(), inst.GetEndpoint(), "ingest", ingestCfg ) + if err != nil { + fmt.Errorf("save dynamic config to agent error: %w", err) + } + return nil +} + func (h *APIHandler) getInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { id := ps.MustGetParameter("instance_id") @@ -157,6 +206,22 @@ func (h *APIHandler) deleteInstance(w http.ResponseWriter, req *http.Request, ps return } + queryDsl = util.MapStr{ + "query": util.MapStr{ + "term": util.MapStr{ + "metadata.labels.agent_id": util.MapStr{ + "value": id, + }, + }, + }, + } + err = orm.DeleteBy(agent.Setting{}, util.MustToJSONBytes(queryDsl)) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error("delete agent settings error: ", err) + return + } + h.WriteDeletedOKJSON(w, id) } @@ -500,6 +565,26 @@ func (h *APIHandler) deleteESNode(w http.ResponseWriter, req *http.Request, ps h h.WriteError(w, err.Error(), http.StatusInternalServerError) return } + q = util.MapStr{ + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "terms": util.MapStr{ + "metadata.labels.node_uuid": nodeIDs, + }, + }, + { + "term": util.MapStr{ + "metadata.labels.agent_id": util.MapStr{ + "value": id, + }, + }, + }, + }, + }, + }, + } } h.WriteAckOKJSON(w) } diff --git a/modules/agent/api/setup.go b/modules/agent/api/setup.go index 3f07ae7f..433377d6 100644 --- a/modules/agent/api/setup.go +++ b/modules/agent/api/setup.go @@ -11,7 +11,6 @@ import ( "infini.sh/console/modules/agent/common" "infini.sh/framework/core/api/rbac" httprouter "infini.sh/framework/core/api/router" - "infini.sh/framework/core/elastic" "infini.sh/framework/core/global" "infini.sh/framework/core/util" "os" @@ -69,16 +68,16 @@ func (h *APIHandler) generateInstallCommand(w http.ResponseWriter, req *http.Req } tokens.Store(tokenStr, t) agCfg := common.GetAgentConfig() - scriptEndpoint := agCfg.Setup.ScriptEndpoint - if scriptEndpoint == "" { + consoleEndpoint := agCfg.Setup.ConsoleEndpoint + if consoleEndpoint == "" { scheme := "http" if req.TLS != nil { scheme = "https" } - scriptEndpoint = fmt.Sprintf("%s://%s", scheme, req.Host) + consoleEndpoint = fmt.Sprintf("%s://%s", scheme, req.Host) } h.WriteJSON(w, util.MapStr{ - "script": fmt.Sprintf(`sudo bash -c "$(curl -L '%s/agent/install.sh?token=%s')"`, scriptEndpoint, tokenStr), + "script": fmt.Sprintf(`sudo bash -c "$(curl -L '%s/agent/install.sh?token=%s')"`, consoleEndpoint, tokenStr), "token": tokenStr, "expired_at": t.CreatedAt.Add(ExpiredIn), }, http.StatusOK) @@ -120,25 +119,31 @@ func (h *APIHandler) getInstallScript(w http.ResponseWriter, req *http.Request, if downloadURL == "" { downloadURL = "https://release.infinilabs.com/agent/stable/" } - esCfg := elastic.GetConfig(global.MustLookupString(elastic.GlobalSystemElasticsearchID)) - var ( - loggingESUser string - loggingESPassword string - ) - if esCfg.BasicAuth != nil { - loggingESUser = esCfg.BasicAuth.Username - loggingESPassword = esCfg.BasicAuth.Password + //esCfg := elastic.GetConfig(global.MustLookupString(elastic.GlobalSystemElasticsearchID)) + //var ( + // loggingESUser string + // loggingESPassword string + //) + //if esCfg.BasicAuth != nil { + // loggingESUser = esCfg.BasicAuth.Username + // loggingESPassword = esCfg.BasicAuth.Password + //} + port := agCfg.Setup.Port + if port == "" { + port = "8080" } tpl.Execute(w, map[string]interface{}{ "base_url": agCfg.Setup.DownloadURL, "agent_version": agCfg.Setup.Version, - //"console_endpoint": util.MustToJSON(util.MustToJSON(gatewayEndpoints)), + "console_endpoint": agCfg.Setup.ConsoleEndpoint, "client_crt": clientCertPEM, "client_key": clientKeyPEM, "ca_crt": caCert, - "logging_es_endpoint": esCfg.Endpoint, - "logging_es_user": loggingESUser, - "logging_es_password": loggingESPassword, + "port": port, + "token": tokenStr, + //"logging_es_endpoint": esCfg.Endpoint, + //"logging_es_user": loggingESUser, + //"logging_es_password": loggingESPassword, }) } diff --git a/modules/agent/client/client.go b/modules/agent/client/client.go index 5db68a14..9f35870b 100644 --- a/modules/agent/client/client.go +++ b/modules/agent/client/client.go @@ -5,20 +5,13 @@ package client import ( - "bytes" "context" "fmt" - "infini.sh/console/modules/agent/common" "infini.sh/framework/core/agent" "infini.sh/framework/core/elastic" - "infini.sh/framework/core/global" "infini.sh/framework/core/host" "infini.sh/framework/core/util" - "io" "net/http" - "os" - "path" - "sync" ) var defaultClient ClientAPI @@ -44,11 +37,15 @@ type ClientAPI interface { AuthESNode(ctx context.Context, agentBaseURL string, cfg elastic.ElasticsearchConfig) (*agent.ESNodeInfo, error) CreatePipeline(ctx context.Context, agentBaseURL string, body []byte) error DeletePipeline(ctx context.Context, agentBaseURL, pipelineID string) error + SetKeystoreValue(ctx context.Context, agentBaseURL string, key, value string) error + SaveDynamicConfig(ctx context.Context, agentBaseURL string, name, content string) error } type Client struct { + Executor Executor } + func (client *Client) GetHostInfo(ctx context.Context, agentBaseURL string) (*host.HostInfo, error) { req := &util.Request{ Method: http.MethodGet, @@ -220,100 +217,37 @@ func (client *Client) DeletePipeline(ctx context.Context, agentBaseURL, pipeline return client.doRequest(req, nil) } -//func (client *Client) doRequest(req *util.Request, respObj interface{}) error { -// result, err := util.ExecuteRequest(req) -// if err != nil { -// return err -// } -// if result.StatusCode != 200 { -// return fmt.Errorf(string(result.Body)) -// } -// if respObj == nil { -// return nil -// } -// return util.FromJSONBytes(result.Body, respObj) -//} - -var( - hClient *http.Client - hClientOnce = sync.Once{} -) -func (client *Client) doRequest(req *util.Request, respObj interface{}) error { - agCfg := common.GetAgentConfig() - var err error - hClientOnce.Do(func() { - var ( - instanceCrt string - instanceKey string - ) - instanceCrt, instanceKey, err = getAgentInstanceCerts(agCfg.Setup.CACertFile, agCfg.Setup.CAKeyFile) - hClient, err = util.NewMTLSClient(agCfg.Setup.CACertFile, instanceCrt, instanceKey) - }) - if err != nil { - return err +func (client *Client) SetKeystoreValue(ctx context.Context, agentBaseURL string, key, value string) error{ + body := util.MapStr{ + "key": key, + "value": value, } - var reader io.Reader - if len(req.Body) > 0 { - reader = bytes.NewReader(req.Body) + req := &util.Request{ + Method: http.MethodPost, + Url: fmt.Sprintf("%s/_framework/keystore", agentBaseURL), + Context: ctx, + Body: util.MustToJSONBytes(body), } - - var hr *http.Request - if req.Context == nil { - hr, err = http.NewRequest(req.Method, req.Url, reader) - }else{ - hr, err = http.NewRequestWithContext(req.Context, req.Method, req.Url, reader) - } - if err != nil { - return err - } - res, err := hClient.Do(hr) - if err != nil { - return err - } - if respObj != nil { - defer res.Body.Close() - buf, err := io.ReadAll(res.Body) - if err != nil { - return err - } - err = util.FromJSONBytes(buf, respObj) - if err != nil { - return err - } - } - return nil + return client.doRequest(req, nil) } -func getAgentInstanceCerts(caFile, caKey string) (string, string, error) { - dataDir := global.Env().GetDataDir() - instanceCrt := path.Join(dataDir, "certs/agent/instance.crt") - instanceKey := path.Join(dataDir, "certs/agent/instance.key") - var ( - err error - clientCertPEM []byte - clientKeyPEM []byte - ) - if util.FileExists(instanceCrt) && util.FileExists(instanceKey) { - return instanceCrt, instanceKey, nil +func (client *Client) SaveDynamicConfig(ctx context.Context, agentBaseURL string, name, content string) error{ + body := util.MapStr{ + "configs": util.MapStr{ + name: content, + }, } - _, clientCertPEM, clientKeyPEM, err = common.GenerateClientCert(caFile, caKey) - if err != nil { - return "", "", err + req := &util.Request{ + Method: http.MethodPost, + Url: fmt.Sprintf("%s/agent/config", agentBaseURL), + Context: ctx, + Body: util.MustToJSONBytes(body), } - baseDir := path.Join(dataDir, "certs/agent") - if !util.IsExist(baseDir){ - err = os.MkdirAll(baseDir, 0775) - if err != nil { - return "", "", err - } - } - _, err = util.FilePutContentWithByte(instanceCrt, clientCertPEM) - if err != nil { - return "", "", err - } - _, err = util.FilePutContentWithByte(instanceKey, clientKeyPEM) - if err != nil { - return "", "", err - } - return instanceCrt, instanceKey, nil -} \ No newline at end of file + return client.doRequest(req, nil) +} + + +func (client *Client) doRequest(req *util.Request, respObj interface{}) error { + return client.Executor.DoRequest(req, respObj) +} + diff --git a/modules/agent/client/executor.go b/modules/agent/client/executor.go new file mode 100644 index 00000000..f756f3a1 --- /dev/null +++ b/modules/agent/client/executor.go @@ -0,0 +1,100 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package client + +import ( + "bytes" + "fmt" + "infini.sh/console/modules/agent/common" + "infini.sh/framework/core/util" + "io" + "net/http" +) + +type Executor interface { + DoRequest(req *util.Request, respObj interface{}) error +} + +type HttpExecutor struct { +} + +func (executor *HttpExecutor) DoRequest(req *util.Request, respObj interface{}) error { + result, err := util.ExecuteRequest(req) + if err != nil { + return err + } + if result.StatusCode != 200 { + return fmt.Errorf(string(result.Body)) + } + if respObj == nil { + return nil + } + return util.FromJSONBytes(result.Body, respObj) +} + +func NewMTLSExecutor(caCertFile, caKeyFile string) (*MTLSExecutor, error){ + var ( + instanceCrt string + instanceKey string + ) + instanceCrt, instanceKey, err := common.GetAgentInstanceCerts(caCertFile, caKeyFile) + if err != nil { + return nil, fmt.Errorf("generate tls cert error: %w", err) + } + hClient, err := util.NewMTLSClient(caCertFile, instanceCrt, instanceKey) + if err != nil { + return nil, err + } + return &MTLSExecutor{ + CaCertFile: caCertFile, + CAKeyFile: caKeyFile, + client: hClient, + }, nil +} + +type MTLSExecutor struct { + CaCertFile string + CAKeyFile string + client *http.Client +} + + +func (executor *MTLSExecutor) DoRequest(req *util.Request, respObj interface{}) error { + var reader io.Reader + if len(req.Body) > 0 { + reader = bytes.NewReader(req.Body) + } + var ( + hr *http.Request + err error + ) + if req.Context == nil { + hr, err = http.NewRequest(req.Method, req.Url, reader) + }else{ + hr, err = http.NewRequestWithContext(req.Context, req.Method, req.Url, reader) + } + if err != nil { + return err + } + res, err := executor.client.Do(hr) + if err != nil { + return err + } + defer res.Body.Close() + buf, err := io.ReadAll(res.Body) + if err != nil { + return err + } + if res.StatusCode != 200 { + return fmt.Errorf(string(buf)) + } + if respObj != nil { + err = util.FromJSONBytes(buf, respObj) + if err != nil { + return err + } + } + return nil +} \ No newline at end of file diff --git a/modules/agent/common/cert.go b/modules/agent/common/cert.go index 7b9e718c..61521509 100644 --- a/modules/agent/common/cert.go +++ b/modules/agent/common/cert.go @@ -61,3 +61,37 @@ func generateCert(caFile, caKey string, isServer bool)(caCert, instanceCertPEM, } return caCert, instanceCertPEM, instanceKeyPEM, nil } + +func GetAgentInstanceCerts(caFile, caKey string) (string, string, error) { + dataDir := global.Env().GetDataDir() + instanceCrt := path.Join(dataDir, "certs/agent/instance.crt") + instanceKey := path.Join(dataDir, "certs/agent/instance.key") + var ( + err error + clientCertPEM []byte + clientKeyPEM []byte + ) + if util.FileExists(instanceCrt) && util.FileExists(instanceKey) { + return instanceCrt, instanceKey, nil + } + _, clientCertPEM, clientKeyPEM, err = GenerateClientCert(caFile, caKey) + if err != nil { + return "", "", err + } + baseDir := path.Join(dataDir, "certs/agent") + if !util.IsExist(baseDir){ + err = os.MkdirAll(baseDir, 0775) + if err != nil { + return "", "", err + } + } + _, err = util.FilePutContentWithByte(instanceCrt, clientCertPEM) + if err != nil { + return "", "", err + } + _, err = util.FilePutContentWithByte(instanceKey, clientKeyPEM) + if err != nil { + return "", "", err + } + return instanceCrt, instanceKey, nil +} \ No newline at end of file diff --git a/modules/agent/common/config.go b/modules/agent/common/config.go index 643b67fe..ac7e255e 100644 --- a/modules/agent/common/config.go +++ b/modules/agent/common/config.go @@ -7,19 +7,14 @@ package common import ( "infini.sh/console/modules/agent/model" "infini.sh/framework/core/env" - "sync" ) -var agentCfg *model.AgentConfig -var onceCfg = sync.Once{} func GetAgentConfig() *model.AgentConfig { - onceCfg.Do(func() { - agentCfg = &model.AgentConfig{} - _, err := env.ParseConfig("agent", agentCfg ) - if err != nil { - panic(err) - } - }) + agentCfg := &model.AgentConfig{} + _, err := env.ParseConfig("agent", agentCfg ) + if err != nil { + panic(err) + } return agentCfg } \ No newline at end of file diff --git a/modules/agent/common/helper.go b/modules/agent/common/helper.go index 8e760104..cc7ea55d 100644 --- a/modules/agent/common/helper.go +++ b/modules/agent/common/helper.go @@ -8,11 +8,13 @@ import ( "fmt" "infini.sh/console/modules/agent/model" "infini.sh/framework/core/agent" + "infini.sh/framework/core/credential" "infini.sh/framework/core/elastic" "infini.sh/framework/core/event" "infini.sh/framework/core/orm" "infini.sh/framework/core/util" log "src/github.com/cihub/seelog" + "strings" ) func ParseAgentSettings(settings []agent.Setting)(*model.ParseAgentSettingsResult, error){ @@ -71,44 +73,48 @@ func ParseAgentSettings(settings []agent.Setting)(*model.ParseAgentSettingsResul }, nil } +// GetAgentSettings query agent setting by agent id and updated timestamp, +// if there has any setting was updated, then return setting list includes settings not changed, +// otherwise return empty setting list func GetAgentSettings(agentID string, timestamp int64) ([]agent.Setting, error) { - queryDsl := util.MapStr{ - "size": 500, - "query": util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "metadata.category": util.MapStr{ - "value": "agent", - }, - }, - }, - { - "term": util.MapStr{ - "metadata.name": util.MapStr{ - "value": "task", - }, - }, - }, - { - "term": util.MapStr{ - "metadata.labels.agent_id": util.MapStr{ - "value": agentID, - }, - }, - }, - { - "range": util.MapStr{ - "updated": util.MapStr{ - "gt": timestamp, - }, + query := util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "metadata.category": util.MapStr{ + "value": "agent", }, }, }, + { + "term": util.MapStr{ + "metadata.name": util.MapStr{ + "value": "task", + }, + }, + }, + { + "term": util.MapStr{ + "metadata.labels.agent_id": util.MapStr{ + "value": agentID, + }, + }, + }, + //{ + // "range": util.MapStr{ + // "updated": util.MapStr{ + // "gt": timestamp, + // }, + // }, + //}, }, }, } + queryDsl := util.MapStr{ + "size": 500, + "query": query, + } q := orm.Query{ RawQuery: util.MustToJSONBytes(queryDsl), } @@ -119,7 +125,10 @@ func GetAgentSettings(agentID string, timestamp int64) ([]agent.Setting, error) if len(result.Result) == 0 { return nil, nil } - var settings []agent.Setting + var ( + settings []agent.Setting + hasUpdated bool + ) for _, row := range result.Result { setting := agent.Setting{} buf, err := util.ToJSONBytes(row) @@ -130,8 +139,14 @@ func GetAgentSettings(agentID string, timestamp int64) ([]agent.Setting, error) if err != nil { return nil, err } + if setting.Updated != nil && setting.Updated.UnixMilli() > timestamp { + hasUpdated = true + } settings = append(settings, setting) } + if !hasUpdated { + return nil, nil + } return settings, nil } @@ -335,4 +350,132 @@ func GetLatestOnlineAgentIDs(agentIds []string, lastSeconds int) (map[string]str } } return agentIDs, nil +} + +func GetAgentIngestConfig() (string, *elastic.BasicAuth, error) { + agCfg := GetAgentConfig() + var ( + endpoint string + ok bool + ) + if endpoint, ok = agCfg.Setup.IngestClusterEndpoint.(string);ok { + if endpoint = strings.TrimSpace(endpoint); endpoint == "" { + return "", nil, fmt.Errorf("config ingest_cluster_endpoint must not be empty") + } + } + var ( + basicAuth elastic.BasicAuth + ) + if agCfg.Setup.IngestClusterCredentialID != "" { + cred := credential.Credential{} + cred.ID = agCfg.Setup.IngestClusterCredentialID + _, err := orm.Get(&cred) + if err != nil { + return "", nil, fmt.Errorf("query credential [%s] error: %w", cred.ID, err) + } + info, err := cred.Decode() + if err != nil { + return "", nil, fmt.Errorf("decode credential [%s] error: %w", cred.ID, err) + } + if basicAuth, ok = info.(elastic.BasicAuth); !ok { + log.Debug("invalid credential: ", cred) + } + } + tpl := `elasticsearch: + - name: default + enabled: true + endpoint: %s + discovery: + enabled: true + basic_auth: + username: %s + password: $[[keystore.ingest_cluster_password]] +metrics: + enabled: true + queue: metrics + network: + enabled: true + summary: true + details: true + memory: + metrics: + - swap + - memory + disk: + metrics: + - iops + - usage + cpu: + metrics: + - idle + - system + - user + - iowait + - load + instance: + enabled: true +pipeline: + - name: logs_indexing_merge + auto_start: true + keep_running: true + processor: + - indexing_merge: + index_name: ".infini_logs" + elasticsearch: "default" + input_queue: "logs" + idle_timeout_in_seconds: 10 + output_queue: + name: "logs_requests" + label: + tag: "logs" + worker_size: 1 + bulk_size_in_mb: 10 + - name: ingest_logs + auto_start: true + keep_running: true + processor: + - bulk_indexing: + bulk: + compress: true + batch_size_in_mb: 5 + batch_size_in_docs: 5000 + consumer: + fetch_max_messages: 100 + queues: + type: indexing_merge + tag: "logs" + when: + cluster_available: ["default"] + - name: metrics_indexing_merge + auto_start: true + keep_running: true + processor: + - indexing_merge: + elasticsearch: "default" + index_name: ".infini_metrics" + input_queue: "metrics" + output_queue: + name: "metrics_requests" + label: + tag: "metrics" + worker_size: 1 + bulk_size_in_mb: 5 + - name: ingest_metrics + auto_start: true + keep_running: true + processor: + - bulk_indexing: + bulk: + compress: true + batch_size_in_mb: 5 + batch_size_in_docs: 5000 + consumer: + fetch_max_messages: 100 + queues: + type: indexing_merge + tag: "metrics" + when: + cluster_available: ["default"]` + tpl = fmt.Sprintf(tpl, endpoint, basicAuth.Username) + return tpl, &basicAuth, nil } \ No newline at end of file diff --git a/modules/agent/model/config.go b/modules/agent/model/config.go index 465e8f52..f4bb4a2d 100644 --- a/modules/agent/model/config.go +++ b/modules/agent/model/config.go @@ -9,7 +9,7 @@ type AgentConfig struct { StateManager struct{ Enabled bool `config:"enabled"` } `config:"state_manager"` - Setup SetupConfig `config:"setup"` + Setup *SetupConfig `config:"setup"` } type SetupConfig struct { @@ -17,5 +17,8 @@ type SetupConfig struct { Version string `config:"version"` CACertFile string `config:"ca_cert"` CAKeyFile string `config:"ca_key"` - ScriptEndpoint string `config:"script_endpoint"` + ConsoleEndpoint string `config:"console_endpoint"` + IngestClusterEndpoint interface{} `config:"ingest_cluster_endpoint"` + IngestClusterCredentialID string `config:"ingest_cluster_credential_id"` + Port string `config:"port"` } diff --git a/modules/agent/state/state.go b/modules/agent/state/state.go index 27507948..9a2d4a78 100644 --- a/modules/agent/state/state.go +++ b/modules/agent/state/state.go @@ -17,9 +17,10 @@ import ( "infini.sh/framework/core/kv" "infini.sh/framework/core/orm" "infini.sh/framework/core/util" + "infini.sh/framework/modules/elastic" "runtime" "runtime/debug" - "strings" + "src/gopkg.in/yaml.v2" "sync" "time" ) @@ -63,13 +64,13 @@ type StateManager struct { timestamps map[string]int64 } -func NewStateManager(TTL time.Duration, kvKey string, agentIds map[string]string) *StateManager { +func NewStateManager(TTL time.Duration, kvKey string, agentIds map[string]string, agentClient *client.Client) *StateManager { return &StateManager{ TTL: TTL, KVKey: kvKey, stopC: make(chan struct{}), stopCompleteC: make(chan struct{}), - agentClient: &client.Client{}, + agentClient: agentClient, agentIds: agentIds, workerChan: make(chan struct{}, runtime.NumCPU()), timestamps: map[string]int64{}, @@ -124,7 +125,9 @@ func (sm *StateManager) checkAgentStatus() { }() ag, err := sm.GetAgent(agentID) if err != nil { - log.Error(err) + if err != elastic.ErrNotFound { + log.Error(err) + } return } ag.Status = model.StatusOffline @@ -142,6 +145,13 @@ func (sm *StateManager) checkAgentStatus() { } func (sm *StateManager) syncSettings(agentID string) { + ag, err := sm.GetAgent(agentID) + if err != nil { + if err != elastic.ErrNotFound { + log.Errorf("get agent error: %v", err) + } + return + } newTimestamp := time.Now().UnixMilli() settings, err := common.GetAgentSettings(agentID, sm.timestamps[agentID]) if err != nil { @@ -157,36 +167,58 @@ func (sm *StateManager) syncSettings(agentID string) { log.Errorf("parse agent settings error: %v", err) return } - ag, err := sm.GetAgent(agentID) + agClient := sm.GetAgentClient() + var clusterCfgs []util.MapStr + if len(parseResult.ClusterConfigs) > 0 { + for _, cfg := range parseResult.ClusterConfigs { + clusterCfg := util.MapStr{ + "name": cfg.ID, + "enabled": true, + "endpoint": cfg.Endpoint, + } + if cfg.BasicAuth != nil && cfg.BasicAuth.Password != ""{ + err = agClient.SetKeystoreValue(context.Background(), ag.GetEndpoint(), fmt.Sprintf("%s_password", cfg.ID), cfg.BasicAuth.Password) + if err != nil { + log.Errorf("set keystore value error: %v", err) + continue + } + clusterCfg["basic_auth"] = util.MapStr{ + "username": cfg.BasicAuth.Username, + "password": fmt.Sprintf("$[[keystore.%s_password]]", cfg.ID), + } + } + clusterCfgs = append(clusterCfgs, clusterCfg) + } + } + var dynamicCfg = util.MapStr{} + if len(clusterCfgs) > 0 { + dynamicCfg["elasticsearch"] = clusterCfgs + } + if len(parseResult.Pipelines) > 0 { + dynamicCfg["pipeline"] = parseResult.Pipelines + } + cfgBytes, err := yaml.Marshal(dynamicCfg) if err != nil { - log.Errorf("get agent error: %v", err) + log.Error("serialize config to yaml error: ", err) return } - agClient := sm.GetAgentClient() - if len(parseResult.ClusterConfigs) > 0 { - err = agClient.RegisterElasticsearch(nil, ag.GetEndpoint(), parseResult.ClusterConfigs) - if err != nil { - log.Errorf("register elasticsearch config error: %v", err) - return - } - } - for _, pipelineID := range parseResult.ToDeletePipelineNames { - err = agClient.DeletePipeline(context.Background(), ag.GetEndpoint(), pipelineID) - if err != nil { - if !strings.Contains(err.Error(), "not found") { - log.Errorf("delete pipeline error: %v", err) - continue - } - } - //todo update delete pipeline state - } - for _, pipeline := range parseResult.Pipelines { - err = agClient.CreatePipeline(context.Background(), ag.GetEndpoint(), util.MustToJSONBytes(pipeline)) - if err != nil { - log.Errorf("create pipeline error: %v", err) - return - } - } + err = agClient.SaveDynamicConfig(context.Background(), ag.GetEndpoint(), "dynamic_task", string(cfgBytes)) + //for _, pipelineID := range parseResult.ToDeletePipelineNames { + // err = agClient.DeletePipeline(context.Background(), ag.GetEndpoint(), pipelineID) + // if err != nil { + // if !strings.Contains(err.Error(), "not found") { + // log.Errorf("delete pipeline error: %v", err) + // continue + // } + // } + //} + //for _, pipeline := range parseResult.Pipelines { + // err = agClient.CreatePipeline(context.Background(), ag.GetEndpoint(), util.MustToJSONBytes(pipeline)) + // if err != nil { + // log.Errorf("create pipeline error: %v", err) + // return + // } + //} sm.timestamps[agentID] = newTimestamp } @@ -239,7 +271,7 @@ func (sm *StateManager) GetAgent(ID string) (*agent.Instance, error) { if time.Since(timestamp) > sm.TTL { exists, err := orm.Get(inst) if err != nil { - return nil, fmt.Errorf("get agent [%s] error: %w", ID, err) + return nil, err } if !exists { return nil, fmt.Errorf("can not found agent [%s]", ID) From c2f1869334f0162e4a546323a5a73b87d87dd176 Mon Sep 17 00:00:00 2001 From: liugq Date: Thu, 1 Jun 2023 10:47:03 +0800 Subject: [PATCH 5/6] revert console.yml --- console.yml | 18 +----------------- 1 file changed, 1 insertion(+), 17 deletions(-) diff --git a/console.yml b/console.yml index 587d64b7..590a4206 100644 --- a/console.yml +++ b/console.yml @@ -1,19 +1,11 @@ path.configs: "config" configs.auto_reload: true -env: -# INFINI_CONSOLE_ENDPOINT: "https://play.infinilabs.com:64443" - WECHAT_WEBHOOK_ENDPOINT: "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=9f5989ae-e82f-4e39-b4f9-cab0638d0229" - SLACK_WEBHOOK_ENDPOINT: "https://hooks.slack.com/services/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" - DINGTALK_WEBHOOK_ENDPOINT: "https://oapi.dingtalk.com/robot/send?access_token=6a5c7c9454ff74537a6de493153b1da68860942d4b0aeb33797cb68b5111b077" web: enabled: true embedding_api: true auth: enabled: true - websocket: - skip_host_verify: true - permitted_hosts: ["localhost:8000"] ui: enabled: true path: .public @@ -72,12 +64,4 @@ badger: # authorize_url: "https://github.com/login/oauth/authorize" # token_url: "https://github.com/login/oauth/access_token" # redirect_url: "" -# scopes: [] - -agent: - setup: - download_url: "https://release.infinilabs.com/agent/snapshot" - version: 0.5.0_NIGHTLY-151 - ca_cert: "/Users/liugq/go/src/infini.sh/console/config/certs/ca.crt" - ca_key: "/Users/liugq/go/src/infini.sh/console/config/certs/ca.key" - script_endpoint: "http://localhost:9000" \ No newline at end of file +# scopes: [] \ No newline at end of file From b1961bc9268fd825e58d194871b5be3013540acd Mon Sep 17 00:00:00 2001 From: liugq Date: Thu, 1 Jun 2023 10:47:43 +0800 Subject: [PATCH 6/6] remove unused code --- modules/agent/api/setup.go | 17 ++++------------- modules/agent/state/state.go | 16 ---------------- 2 files changed, 4 insertions(+), 29 deletions(-) diff --git a/modules/agent/api/setup.go b/modules/agent/api/setup.go index 433377d6..bcda07b5 100644 --- a/modules/agent/api/setup.go +++ b/modules/agent/api/setup.go @@ -119,20 +119,11 @@ func (h *APIHandler) getInstallScript(w http.ResponseWriter, req *http.Request, if downloadURL == "" { downloadURL = "https://release.infinilabs.com/agent/stable/" } - //esCfg := elastic.GetConfig(global.MustLookupString(elastic.GlobalSystemElasticsearchID)) - //var ( - // loggingESUser string - // loggingESPassword string - //) - //if esCfg.BasicAuth != nil { - // loggingESUser = esCfg.BasicAuth.Username - // loggingESPassword = esCfg.BasicAuth.Password - //} port := agCfg.Setup.Port if port == "" { port = "8080" } - tpl.Execute(w, map[string]interface{}{ + _, err = tpl.Execute(w, map[string]interface{}{ "base_url": agCfg.Setup.DownloadURL, "agent_version": agCfg.Setup.Version, "console_endpoint": agCfg.Setup.ConsoleEndpoint, @@ -141,9 +132,9 @@ func (h *APIHandler) getInstallScript(w http.ResponseWriter, req *http.Request, "ca_crt": caCert, "port": port, "token": tokenStr, - //"logging_es_endpoint": esCfg.Endpoint, - //"logging_es_user": loggingESUser, - //"logging_es_password": loggingESPassword, }) + if err != nil { + log.Error(err) + } } diff --git a/modules/agent/state/state.go b/modules/agent/state/state.go index 9a2d4a78..5a6b986c 100644 --- a/modules/agent/state/state.go +++ b/modules/agent/state/state.go @@ -203,22 +203,6 @@ func (sm *StateManager) syncSettings(agentID string) { return } err = agClient.SaveDynamicConfig(context.Background(), ag.GetEndpoint(), "dynamic_task", string(cfgBytes)) - //for _, pipelineID := range parseResult.ToDeletePipelineNames { - // err = agClient.DeletePipeline(context.Background(), ag.GetEndpoint(), pipelineID) - // if err != nil { - // if !strings.Contains(err.Error(), "not found") { - // log.Errorf("delete pipeline error: %v", err) - // continue - // } - // } - //} - //for _, pipeline := range parseResult.Pipelines { - // err = agClient.CreatePipeline(context.Background(), ag.GetEndpoint(), util.MustToJSONBytes(pipeline)) - // if err != nil { - // log.Errorf("create pipeline error: %v", err) - // return - // } - //} sm.timestamps[agentID] = newTimestamp }