diff --git a/modules/agent/api/init.go b/modules/agent/api/init.go index 24b02e94..7a5048fd 100644 --- a/modules/agent/api/init.go +++ b/modules/agent/api/init.go @@ -23,8 +23,13 @@ func Init() { api.HandleAPIMethod(api.POST, "/agent/instance/:instance_id/node/_auth", handler.RequirePermission(handler.authESNode, enum.PermissionAgentInstanceWrite)) api.HandleAPIMethod(api.DELETE, "/agent/instance/:instance_id/_nodes", handler.RequirePermission(handler.deleteESNode, enum.PermissionAgentInstanceWrite)) api.HandleAPIMethod(api.POST, "/agent/instance/:instance_id/node/_associate", handler.RequirePermission(handler.associateESNode, enum.PermissionAgentInstanceWrite)) + api.HandleAPIMethod(api.POST, "/agent/instance/try_connect", handler.RequireLogin(handler.tryConnect)) api.HandleAPIMethod(api.POST, "/host/_enroll", handler.enrollHost) api.HandleAPIMethod(api.GET, "/host/:host_id/agent/info",handler.GetHostAgentInfo) api.HandleAPIMethod(api.GET, "/host/:host_id/processes",handler.GetHostElasticProcess) + + + api.HandleAPIMethod(api.POST, "/agent/install_command", handler.RequireLogin(handler.generateInstallCommand)) + api.HandleAPIMethod(api.GET, "/agent/install.sh", handler.getInstallScript) } diff --git a/modules/agent/api/instance.go b/modules/agent/api/instance.go index 63e935f9..b85480ca 100644 --- a/modules/agent/api/instance.go +++ b/modules/agent/api/instance.go @@ -8,17 +8,15 @@ import ( "context" "fmt" log "github.com/cihub/seelog" + common2 "infini.sh/console/modules/agent/common" "infini.sh/framework/core/agent" "infini.sh/framework/core/api" httprouter "infini.sh/framework/core/api/router" "infini.sh/framework/core/elastic" "infini.sh/framework/core/event" - "infini.sh/framework/core/host" "infini.sh/framework/core/orm" "infini.sh/framework/core/util" - common2 "infini.sh/console/modules/agent/common" elastic2 "infini.sh/framework/modules/elastic" - "infini.sh/framework/modules/elastic/adapter" "infini.sh/framework/modules/elastic/common" "net/http" "strconv" @@ -89,38 +87,6 @@ func (h *APIHandler) createInstance(w http.ResponseWriter, req *http.Request, ps } -func bindAgentToHostByIP(ag *agent.Instance) error{ - err, result := orm.GetBy("ip", ag.MajorIP, host.HostInfo{}) - if err != nil { - return err - } - if len(result.Result) > 0 { - buf := util.MustToJSONBytes(result.Result[0]) - hostInfo := &host.HostInfo{} - err = util.FromJSONBytes(buf, hostInfo) - if err != nil { - return err - } - sm := common2.GetStateManager() - if ag.Status == "" { - _, err1 := sm.GetAgentClient().GetHostInfo(nil, ag.GetEndpoint()) - if err1 == nil { - ag.Status = "online" - }else{ - ag.Status = "offline" - } - } - - hostInfo.AgentStatus = ag.Status - hostInfo.AgentID = ag.ID - err = orm.Update(nil, hostInfo) - if err != nil { - return err - } - } - return nil -} - func (h *APIHandler) getInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { id := ps.MustGetParameter("instance_id") @@ -172,11 +138,23 @@ func (h *APIHandler) deleteInstance(w http.ResponseWriter, req *http.Request, ps if sm := common2.GetStateManager(); sm != nil { sm.DeleteAgent(obj.ID) } + queryDsl := util.MapStr{ + "query": util.MapStr{ + "term": util.MapStr{ + "agent_id": util.MapStr{ + "value": id, + }, + }, + }, + } + err = orm.DeleteBy(agent.ESNodeInfo{}, util.MustToJSONBytes(queryDsl)) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error("delete node info error: ", err) + return + } - h.WriteJSON(w, util.MapStr{ - "_id": obj.ID, - "result": "deleted", - }, 200) + h.WriteDeletedOKJSON(w, id) } func (h *APIHandler) getInstanceStats(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { @@ -523,6 +501,24 @@ func (h *APIHandler) deleteESNode(w http.ResponseWriter, req *http.Request, ps h h.WriteAckOKJSON(w) } +func (h *APIHandler) tryConnect(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + var reqBody = struct { + Endpoint string `json:"endpoint"` + BasicAuth agent.BasicAuth + }{} + err := h.DecodeJSON(req, &reqBody) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + connectRes, err := common2.GetClient().GetInstanceBasicInfo(context.Background(), reqBody.Endpoint) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + h.WriteJSON(w, connectRes, http.StatusOK) +} + func refreshNodesInfo(inst *agent.Instance) ([]agent.ESNodeInfo, error) { nodesInfo, err := common2.GetClient().GetElasticsearchNodes(context.Background(), inst.GetEndpoint()) if err != nil { @@ -534,7 +530,6 @@ func refreshNodesInfo(inst *agent.Instance) ([]agent.ESNodeInfo, error) { } oldPids := map[int]struct{}{} var resultNodes []agent.ESNodeInfo - //settings, err := common2.GetAgentSettings(inst.ID, 0) if err != nil { return nil, err } @@ -555,24 +550,6 @@ func refreshNodesInfo(inst *agent.Instance) ([]agent.ESNodeInfo, error) { if oldNode != nil && oldNode.ClusterID != "" { node.ClusterID = oldNode.ClusterID } - //else{ - // if cfg := clusterCfgs[node.ClusterUuid]; cfg != nil { - // node.ClusterID = cfg.ID - // setting := pickAgentSettings(settings, node) - // if setting == nil { - // setting, err = getAgentTaskSetting(inst.ID, node) - // if err != nil { - // log.Error() - // } - // err = orm.Create(nil, setting) - // if err != nil { - // log.Error("save agent task setting error: ", err) - // } - // } - // }else{ - // //cluster not registered in console - // } - //} } node.Status = "online" @@ -626,6 +603,7 @@ func getNodesInfoFromES(agentID string) (map[int]*agent.ESNodeInfo, error){ }, }, }, + } q := orm.Query{ RawQuery: util.MustToJSONBytes(query), @@ -645,26 +623,6 @@ func getNodesInfoFromES(agentID string) (map[int]*agent.ESNodeInfo, error){ return nodesInfo, nil } -func getClusterConfigs() map[string]*elastic.ElasticsearchConfig { - cfgs := map[string]*elastic.ElasticsearchConfig{} - elastic.WalkConfigs(func(key, value interface{}) bool { - if cfg, ok := value.(*elastic.ElasticsearchConfig); ok { - clusterUUID := cfg.ClusterUUID - if cfg.ClusterUUID == "" { - verInfo, err := adapter.ClusterVersion(elastic.GetMetadata(cfg.ID)) - if err != nil { - log.Error(err) - return true - } - clusterUUID = verInfo.ClusterUUID - } - cfgs[clusterUUID] = cfg - } - return true - }) - return cfgs -} - func pickAgentSettings(settings []agent.Setting, nodeInfo agent.ESNodeInfo) *agent.Setting { for _, setting := range settings { if setting.Metadata.Labels["node_uuid"] == nodeInfo.NodeUUID { diff --git a/modules/agent/api/log.go b/modules/agent/api/log.go index d564f521..f36e2be0 100644 --- a/modules/agent/api/log.go +++ b/modules/agent/api/log.go @@ -88,6 +88,13 @@ func getAgentByNodeID(nodeID string) (*agent.Instance, *agent.ESNodeInfo, error) }, }, }, + "sort": []util.MapStr{ + { + "timestamp": util.MapStr{ + "order": "desc", + }, + }, + }, } q := &orm.Query{ RawQuery: util.MustToJSONBytes(queryDsl), diff --git a/modules/agent/api/setup.go b/modules/agent/api/setup.go new file mode 100644 index 00000000..3f07ae7f --- /dev/null +++ b/modules/agent/api/setup.go @@ -0,0 +1,144 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package api + +import ( + "fmt" + log "github.com/cihub/seelog" + "github.com/valyala/fasttemplate" + "infini.sh/console/modules/agent/common" + "infini.sh/framework/core/api/rbac" + httprouter "infini.sh/framework/core/api/router" + "infini.sh/framework/core/elastic" + "infini.sh/framework/core/global" + "infini.sh/framework/core/util" + "os" + + "net/http" + "path" + "strings" + "sync" + "time" +) + +var tokens = sync.Map{} +type Token struct { + CreatedAt time.Time + UserID string +} + +const ExpiredIn = time.Millisecond * 1000 * 60 * 20 +func (h *APIHandler) generateInstallCommand(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + claims, ok := req.Context().Value("user").(*rbac.UserClaims) + if !ok { + h.WriteError(w, "user not found", http.StatusInternalServerError) + return + } + var ( + t *Token + tokenStr string + ) + tokens.Range(func(key, value any) bool { + if v, ok := value.(*Token); ok && claims.UserId == v.UserID { + t = v + tokenStr = key.(string) + return false + } + return true + }) + + if t == nil { + tokenStr = util.GetUUID() + t = &Token{ + CreatedAt: time.Now(), + UserID: claims.UserId, + } + }else{ + if t.CreatedAt.Add(ExpiredIn).Before(time.Now()){ + tokens.Delete(tokenStr) + tokenStr = util.GetUUID() + t = &Token{ + CreatedAt: time.Now(), + UserID: claims.UserId, + } + }else{ + t.CreatedAt = time.Now() + } + } + tokens.Store(tokenStr, t) + agCfg := common.GetAgentConfig() + scriptEndpoint := agCfg.Setup.ScriptEndpoint + if scriptEndpoint == "" { + scheme := "http" + if req.TLS != nil { + scheme = "https" + } + scriptEndpoint = fmt.Sprintf("%s://%s", scheme, req.Host) + } + h.WriteJSON(w, util.MapStr{ + "script": fmt.Sprintf(`sudo bash -c "$(curl -L '%s/agent/install.sh?token=%s')"`, scriptEndpoint, tokenStr), + "token": tokenStr, + "expired_at": t.CreatedAt.Add(ExpiredIn), + }, http.StatusOK) +} + +func (h *APIHandler) getInstallScript(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + tokenStr := h.GetParameter(req, "token") + if strings.TrimSpace(tokenStr) == "" { + h.WriteError(w, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized) + return + } + if v, ok := tokens.Load(tokenStr); !ok { + h.WriteError(w, "token is invalid", http.StatusUnauthorized) + return + }else{ + if t, ok := v.(*Token); !ok || t.CreatedAt.Add(ExpiredIn).Before(time.Now()) { + tokens.Delete(tokenStr) + h.WriteError(w, "token was expired", http.StatusUnauthorized) + return + } + } + agCfg := common.GetAgentConfig() + caCert, clientCertPEM, clientKeyPEM, err := common.GenerateServerCert(agCfg.Setup.CACertFile, agCfg.Setup.CAKeyFile) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + scriptTplPath := path.Join(global.Env().GetConfigDir(), "install_agent.tpl") + buf, err := os.ReadFile(scriptTplPath) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + tpl := fasttemplate.New(string(buf), "{{", "}}") + downloadURL := agCfg.Setup.DownloadURL + if downloadURL == "" { + downloadURL = "https://release.infinilabs.com/agent/stable/" + } + esCfg := elastic.GetConfig(global.MustLookupString(elastic.GlobalSystemElasticsearchID)) + var ( + loggingESUser string + loggingESPassword string + ) + if esCfg.BasicAuth != nil { + loggingESUser = esCfg.BasicAuth.Username + loggingESPassword = esCfg.BasicAuth.Password + } + tpl.Execute(w, map[string]interface{}{ + "base_url": agCfg.Setup.DownloadURL, + "agent_version": agCfg.Setup.Version, + //"console_endpoint": util.MustToJSON(util.MustToJSON(gatewayEndpoints)), + "client_crt": clientCertPEM, + "client_key": clientKeyPEM, + "ca_crt": caCert, + "logging_es_endpoint": esCfg.Endpoint, + "logging_es_user": loggingESUser, + "logging_es_password": loggingESPassword, + }) +} + diff --git a/modules/agent/common/cert.go b/modules/agent/common/cert.go new file mode 100644 index 00000000..7b9e718c --- /dev/null +++ b/modules/agent/common/cert.go @@ -0,0 +1,63 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package common + +import ( + "crypto/rsa" + "crypto/x509" + "encoding/pem" + "infini.sh/framework/core/global" + "infini.sh/framework/core/util" + "os" + "path" +) + +func GenerateClientCert(caFile, caKey string) (caCert, clientCertPEM, clientKeyPEM []byte, err error){ + return generateCert(caFile, caKey, false) +} + +func GenerateServerCert(caFile, caKey string) (caCert, serverCertPEM, serverKeyPEM []byte, err error){ + return generateCert(caFile, caKey, true) +} + +func generateCert(caFile, caKey string, isServer bool)(caCert, instanceCertPEM, instanceKeyPEM []byte, err error){ + pool := x509.NewCertPool() + if caFile == "" { + caFile = path.Join(global.Env().GetConfigDir(), "certs", "ca.crt") + } + caCert, err = os.ReadFile(caFile) + if err != nil { + return + } + pool.AppendCertsFromPEM(caCert) + b, _ := pem.Decode(caCert) + var rootCert *x509.Certificate + caCertBytes := b.Bytes + rootCert, err = x509.ParseCertificate(b.Bytes) + if err != nil { + return + } + if caKey == "" { + caKey = path.Join(global.Env().GetConfigDir(), "certs", "ca.key") + } + var keyBytes []byte + keyBytes, err = os.ReadFile(caKey) + if err != nil { + return + } + b, _ = pem.Decode(keyBytes) + certKey, err := util.ParsePrivateKey(b.Bytes) + if err != nil { + return + } + if isServer{ + b = &pem.Block{Type: "CERTIFICATE", Bytes: caCertBytes} + certPEM := pem.EncodeToMemory(b) + instanceCertPEM, instanceKeyPEM, err = util.GenerateServerCert(rootCert, certKey.(*rsa.PrivateKey), certPEM, nil) + }else{ + _, instanceCertPEM, instanceKeyPEM = util.GetClientCert(rootCert, certKey) + } + return caCert, instanceCertPEM, instanceKeyPEM, nil +} diff --git a/modules/agent/common/client.go b/modules/agent/common/client.go index 704fdda2..cdbdf944 100644 --- a/modules/agent/common/client.go +++ b/modules/agent/common/client.go @@ -5,13 +5,19 @@ package common import ( + "bytes" "context" "fmt" "infini.sh/framework/core/agent" "infini.sh/framework/core/elastic" + "infini.sh/framework/core/global" "infini.sh/framework/core/host" "infini.sh/framework/core/util" + "io" "net/http" + "os" + "path" + "sync" ) type Client struct { @@ -188,16 +194,100 @@ func (client *Client) DeletePipeline(ctx context.Context, agentBaseURL, pipeline return client.doRequest(req, nil) } +//func (client *Client) doRequest(req *util.Request, respObj interface{}) error { +// result, err := util.ExecuteRequest(req) +// if err != nil { +// return err +// } +// if result.StatusCode != 200 { +// return fmt.Errorf(string(result.Body)) +// } +// if respObj == nil { +// return nil +// } +// return util.FromJSONBytes(result.Body, respObj) +//} + +var( + hClient *http.Client + hClientOnce = sync.Once{} +) func (client *Client) doRequest(req *util.Request, respObj interface{}) error { - result, err := util.ExecuteRequest(req) + agCfg := GetAgentConfig() + var err error + hClientOnce.Do(func() { + var ( + instanceCrt string + instanceKey string + ) + instanceCrt, instanceKey, err = getAgentInstanceCerts(agCfg.Setup.CACertFile, agCfg.Setup.CAKeyFile) + hClient, err = util.NewMTLSClient(agCfg.Setup.CACertFile, instanceCrt, instanceKey) + }) if err != nil { return err } - if result.StatusCode != 200 { - return fmt.Errorf(string(result.Body)) + var reader io.Reader + if len(req.Body) > 0 { + reader = bytes.NewReader(req.Body) } - if respObj == nil { - return nil + + var hr *http.Request + if req.Context == nil { + hr, err = http.NewRequest(req.Method, req.Url, reader) + }else{ + hr, err = http.NewRequestWithContext(req.Context, req.Method, req.Url, reader) } - return util.FromJSONBytes(result.Body, respObj) + if err != nil { + return err + } + res, err := hClient.Do(hr) + if err != nil { + return err + } + if respObj != nil { + defer res.Body.Close() + buf, err := io.ReadAll(res.Body) + if err != nil { + return err + } + err = util.FromJSONBytes(buf, respObj) + if err != nil { + return err + } + } + return nil } + +func getAgentInstanceCerts(caFile, caKey string) (string, string, error) { + dataDir := global.Env().GetDataDir() + instanceCrt := path.Join(dataDir, "certs/agent/instance.crt") + instanceKey := path.Join(dataDir, "certs/agent/instance.key") + var ( + err error + clientCertPEM []byte + clientKeyPEM []byte + ) + if util.FileExists(instanceCrt) && util.FileExists(instanceKey) { + return instanceCrt, instanceKey, nil + } + _, clientCertPEM, clientKeyPEM, err = GenerateClientCert(caFile, caKey) + if err != nil { + return "", "", err + } + baseDir := path.Join(dataDir, "certs/agent") + if !util.IsExist(baseDir){ + err = os.MkdirAll(baseDir, 0775) + if err != nil { + return "", "", err + } + } + _, err = util.FilePutContentWithByte(instanceCrt, clientCertPEM) + if err != nil { + return "", "", err + } + _, err = util.FilePutContentWithByte(instanceKey, clientKeyPEM) + if err != nil { + return "", "", err + } + return instanceCrt, instanceKey, nil +} \ No newline at end of file diff --git a/modules/agent/common/config.go b/modules/agent/common/config.go index 36b5568c..71fc0a04 100644 --- a/modules/agent/common/config.go +++ b/modules/agent/common/config.go @@ -4,6 +4,11 @@ package common +import ( + "infini.sh/framework/core/env" + "sync" +) + type AgentConfig struct { Enabled bool `config:"enabled"` StateManager struct{ @@ -18,4 +23,18 @@ type SetupConfig struct { CACertFile string `config:"ca_cert"` CAKeyFile string `config:"ca_key"` ScriptEndpoint string `config:"script_endpoint"` +} + +var agentCfg *AgentConfig +var onceCfg = sync.Once{} + +func GetAgentConfig() *AgentConfig { + onceCfg.Do(func() { + agentCfg = &AgentConfig{} + _, err := env.ParseConfig("agent", agentCfg ) + if err != nil { + panic(err) + } + }) + return agentCfg } \ No newline at end of file