This commit is contained in:
liugq 2023-05-30 11:30:11 +08:00
parent dd98ba7dff
commit e15073e6f5
7 changed files with 370 additions and 84 deletions

View File

@ -23,8 +23,13 @@ func Init() {
api.HandleAPIMethod(api.POST, "/agent/instance/:instance_id/node/_auth", handler.RequirePermission(handler.authESNode, enum.PermissionAgentInstanceWrite))
api.HandleAPIMethod(api.DELETE, "/agent/instance/:instance_id/_nodes", handler.RequirePermission(handler.deleteESNode, enum.PermissionAgentInstanceWrite))
api.HandleAPIMethod(api.POST, "/agent/instance/:instance_id/node/_associate", handler.RequirePermission(handler.associateESNode, enum.PermissionAgentInstanceWrite))
api.HandleAPIMethod(api.POST, "/agent/instance/try_connect", handler.RequireLogin(handler.tryConnect))
api.HandleAPIMethod(api.POST, "/host/_enroll", handler.enrollHost)
api.HandleAPIMethod(api.GET, "/host/:host_id/agent/info",handler.GetHostAgentInfo)
api.HandleAPIMethod(api.GET, "/host/:host_id/processes",handler.GetHostElasticProcess)
api.HandleAPIMethod(api.POST, "/agent/install_command", handler.RequireLogin(handler.generateInstallCommand))
api.HandleAPIMethod(api.GET, "/agent/install.sh", handler.getInstallScript)
}

View File

@ -8,17 +8,15 @@ import (
"context"
"fmt"
log "github.com/cihub/seelog"
common2 "infini.sh/console/modules/agent/common"
"infini.sh/framework/core/agent"
"infini.sh/framework/core/api"
httprouter "infini.sh/framework/core/api/router"
"infini.sh/framework/core/elastic"
"infini.sh/framework/core/event"
"infini.sh/framework/core/host"
"infini.sh/framework/core/orm"
"infini.sh/framework/core/util"
common2 "infini.sh/console/modules/agent/common"
elastic2 "infini.sh/framework/modules/elastic"
"infini.sh/framework/modules/elastic/adapter"
"infini.sh/framework/modules/elastic/common"
"net/http"
"strconv"
@ -89,38 +87,6 @@ func (h *APIHandler) createInstance(w http.ResponseWriter, req *http.Request, ps
}
func bindAgentToHostByIP(ag *agent.Instance) error{
err, result := orm.GetBy("ip", ag.MajorIP, host.HostInfo{})
if err != nil {
return err
}
if len(result.Result) > 0 {
buf := util.MustToJSONBytes(result.Result[0])
hostInfo := &host.HostInfo{}
err = util.FromJSONBytes(buf, hostInfo)
if err != nil {
return err
}
sm := common2.GetStateManager()
if ag.Status == "" {
_, err1 := sm.GetAgentClient().GetHostInfo(nil, ag.GetEndpoint())
if err1 == nil {
ag.Status = "online"
}else{
ag.Status = "offline"
}
}
hostInfo.AgentStatus = ag.Status
hostInfo.AgentID = ag.ID
err = orm.Update(nil, hostInfo)
if err != nil {
return err
}
}
return nil
}
func (h *APIHandler) getInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
id := ps.MustGetParameter("instance_id")
@ -172,11 +138,23 @@ func (h *APIHandler) deleteInstance(w http.ResponseWriter, req *http.Request, ps
if sm := common2.GetStateManager(); sm != nil {
sm.DeleteAgent(obj.ID)
}
queryDsl := util.MapStr{
"query": util.MapStr{
"term": util.MapStr{
"agent_id": util.MapStr{
"value": id,
},
},
},
}
err = orm.DeleteBy(agent.ESNodeInfo{}, util.MustToJSONBytes(queryDsl))
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error("delete node info error: ", err)
return
}
h.WriteJSON(w, util.MapStr{
"_id": obj.ID,
"result": "deleted",
}, 200)
h.WriteDeletedOKJSON(w, id)
}
func (h *APIHandler) getInstanceStats(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
@ -523,6 +501,24 @@ func (h *APIHandler) deleteESNode(w http.ResponseWriter, req *http.Request, ps h
h.WriteAckOKJSON(w)
}
func (h *APIHandler) tryConnect(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
var reqBody = struct {
Endpoint string `json:"endpoint"`
BasicAuth agent.BasicAuth
}{}
err := h.DecodeJSON(req, &reqBody)
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
connectRes, err := common2.GetClient().GetInstanceBasicInfo(context.Background(), reqBody.Endpoint)
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
h.WriteJSON(w, connectRes, http.StatusOK)
}
func refreshNodesInfo(inst *agent.Instance) ([]agent.ESNodeInfo, error) {
nodesInfo, err := common2.GetClient().GetElasticsearchNodes(context.Background(), inst.GetEndpoint())
if err != nil {
@ -534,7 +530,6 @@ func refreshNodesInfo(inst *agent.Instance) ([]agent.ESNodeInfo, error) {
}
oldPids := map[int]struct{}{}
var resultNodes []agent.ESNodeInfo
//settings, err := common2.GetAgentSettings(inst.ID, 0)
if err != nil {
return nil, err
}
@ -555,24 +550,6 @@ func refreshNodesInfo(inst *agent.Instance) ([]agent.ESNodeInfo, error) {
if oldNode != nil && oldNode.ClusterID != "" {
node.ClusterID = oldNode.ClusterID
}
//else{
// if cfg := clusterCfgs[node.ClusterUuid]; cfg != nil {
// node.ClusterID = cfg.ID
// setting := pickAgentSettings(settings, node)
// if setting == nil {
// setting, err = getAgentTaskSetting(inst.ID, node)
// if err != nil {
// log.Error()
// }
// err = orm.Create(nil, setting)
// if err != nil {
// log.Error("save agent task setting error: ", err)
// }
// }
// }else{
// //cluster not registered in console
// }
//}
}
node.Status = "online"
@ -626,6 +603,7 @@ func getNodesInfoFromES(agentID string) (map[int]*agent.ESNodeInfo, error){
},
},
},
}
q := orm.Query{
RawQuery: util.MustToJSONBytes(query),
@ -645,26 +623,6 @@ func getNodesInfoFromES(agentID string) (map[int]*agent.ESNodeInfo, error){
return nodesInfo, nil
}
func getClusterConfigs() map[string]*elastic.ElasticsearchConfig {
cfgs := map[string]*elastic.ElasticsearchConfig{}
elastic.WalkConfigs(func(key, value interface{}) bool {
if cfg, ok := value.(*elastic.ElasticsearchConfig); ok {
clusterUUID := cfg.ClusterUUID
if cfg.ClusterUUID == "" {
verInfo, err := adapter.ClusterVersion(elastic.GetMetadata(cfg.ID))
if err != nil {
log.Error(err)
return true
}
clusterUUID = verInfo.ClusterUUID
}
cfgs[clusterUUID] = cfg
}
return true
})
return cfgs
}
func pickAgentSettings(settings []agent.Setting, nodeInfo agent.ESNodeInfo) *agent.Setting {
for _, setting := range settings {
if setting.Metadata.Labels["node_uuid"] == nodeInfo.NodeUUID {

View File

@ -88,6 +88,13 @@ func getAgentByNodeID(nodeID string) (*agent.Instance, *agent.ESNodeInfo, error)
},
},
},
"sort": []util.MapStr{
{
"timestamp": util.MapStr{
"order": "desc",
},
},
},
}
q := &orm.Query{
RawQuery: util.MustToJSONBytes(queryDsl),

144
modules/agent/api/setup.go Normal file
View File

@ -0,0 +1,144 @@
/* Copyright © INFINI Ltd. All rights reserved.
* Web: https://infinilabs.com
* Email: hello#infini.ltd */
package api
import (
"fmt"
log "github.com/cihub/seelog"
"github.com/valyala/fasttemplate"
"infini.sh/console/modules/agent/common"
"infini.sh/framework/core/api/rbac"
httprouter "infini.sh/framework/core/api/router"
"infini.sh/framework/core/elastic"
"infini.sh/framework/core/global"
"infini.sh/framework/core/util"
"os"
"net/http"
"path"
"strings"
"sync"
"time"
)
var tokens = sync.Map{}
type Token struct {
CreatedAt time.Time
UserID string
}
const ExpiredIn = time.Millisecond * 1000 * 60 * 20
func (h *APIHandler) generateInstallCommand(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
claims, ok := req.Context().Value("user").(*rbac.UserClaims)
if !ok {
h.WriteError(w, "user not found", http.StatusInternalServerError)
return
}
var (
t *Token
tokenStr string
)
tokens.Range(func(key, value any) bool {
if v, ok := value.(*Token); ok && claims.UserId == v.UserID {
t = v
tokenStr = key.(string)
return false
}
return true
})
if t == nil {
tokenStr = util.GetUUID()
t = &Token{
CreatedAt: time.Now(),
UserID: claims.UserId,
}
}else{
if t.CreatedAt.Add(ExpiredIn).Before(time.Now()){
tokens.Delete(tokenStr)
tokenStr = util.GetUUID()
t = &Token{
CreatedAt: time.Now(),
UserID: claims.UserId,
}
}else{
t.CreatedAt = time.Now()
}
}
tokens.Store(tokenStr, t)
agCfg := common.GetAgentConfig()
scriptEndpoint := agCfg.Setup.ScriptEndpoint
if scriptEndpoint == "" {
scheme := "http"
if req.TLS != nil {
scheme = "https"
}
scriptEndpoint = fmt.Sprintf("%s://%s", scheme, req.Host)
}
h.WriteJSON(w, util.MapStr{
"script": fmt.Sprintf(`sudo bash -c "$(curl -L '%s/agent/install.sh?token=%s')"`, scriptEndpoint, tokenStr),
"token": tokenStr,
"expired_at": t.CreatedAt.Add(ExpiredIn),
}, http.StatusOK)
}
func (h *APIHandler) getInstallScript(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
tokenStr := h.GetParameter(req, "token")
if strings.TrimSpace(tokenStr) == "" {
h.WriteError(w, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized)
return
}
if v, ok := tokens.Load(tokenStr); !ok {
h.WriteError(w, "token is invalid", http.StatusUnauthorized)
return
}else{
if t, ok := v.(*Token); !ok || t.CreatedAt.Add(ExpiredIn).Before(time.Now()) {
tokens.Delete(tokenStr)
h.WriteError(w, "token was expired", http.StatusUnauthorized)
return
}
}
agCfg := common.GetAgentConfig()
caCert, clientCertPEM, clientKeyPEM, err := common.GenerateServerCert(agCfg.Setup.CACertFile, agCfg.Setup.CAKeyFile)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
scriptTplPath := path.Join(global.Env().GetConfigDir(), "install_agent.tpl")
buf, err := os.ReadFile(scriptTplPath)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
tpl := fasttemplate.New(string(buf), "{{", "}}")
downloadURL := agCfg.Setup.DownloadURL
if downloadURL == "" {
downloadURL = "https://release.infinilabs.com/agent/stable/"
}
esCfg := elastic.GetConfig(global.MustLookupString(elastic.GlobalSystemElasticsearchID))
var (
loggingESUser string
loggingESPassword string
)
if esCfg.BasicAuth != nil {
loggingESUser = esCfg.BasicAuth.Username
loggingESPassword = esCfg.BasicAuth.Password
}
tpl.Execute(w, map[string]interface{}{
"base_url": agCfg.Setup.DownloadURL,
"agent_version": agCfg.Setup.Version,
//"console_endpoint": util.MustToJSON(util.MustToJSON(gatewayEndpoints)),
"client_crt": clientCertPEM,
"client_key": clientKeyPEM,
"ca_crt": caCert,
"logging_es_endpoint": esCfg.Endpoint,
"logging_es_user": loggingESUser,
"logging_es_password": loggingESPassword,
})
}

View File

@ -0,0 +1,63 @@
/* Copyright © INFINI Ltd. All rights reserved.
* Web: https://infinilabs.com
* Email: hello#infini.ltd */
package common
import (
"crypto/rsa"
"crypto/x509"
"encoding/pem"
"infini.sh/framework/core/global"
"infini.sh/framework/core/util"
"os"
"path"
)
func GenerateClientCert(caFile, caKey string) (caCert, clientCertPEM, clientKeyPEM []byte, err error){
return generateCert(caFile, caKey, false)
}
func GenerateServerCert(caFile, caKey string) (caCert, serverCertPEM, serverKeyPEM []byte, err error){
return generateCert(caFile, caKey, true)
}
func generateCert(caFile, caKey string, isServer bool)(caCert, instanceCertPEM, instanceKeyPEM []byte, err error){
pool := x509.NewCertPool()
if caFile == "" {
caFile = path.Join(global.Env().GetConfigDir(), "certs", "ca.crt")
}
caCert, err = os.ReadFile(caFile)
if err != nil {
return
}
pool.AppendCertsFromPEM(caCert)
b, _ := pem.Decode(caCert)
var rootCert *x509.Certificate
caCertBytes := b.Bytes
rootCert, err = x509.ParseCertificate(b.Bytes)
if err != nil {
return
}
if caKey == "" {
caKey = path.Join(global.Env().GetConfigDir(), "certs", "ca.key")
}
var keyBytes []byte
keyBytes, err = os.ReadFile(caKey)
if err != nil {
return
}
b, _ = pem.Decode(keyBytes)
certKey, err := util.ParsePrivateKey(b.Bytes)
if err != nil {
return
}
if isServer{
b = &pem.Block{Type: "CERTIFICATE", Bytes: caCertBytes}
certPEM := pem.EncodeToMemory(b)
instanceCertPEM, instanceKeyPEM, err = util.GenerateServerCert(rootCert, certKey.(*rsa.PrivateKey), certPEM, nil)
}else{
_, instanceCertPEM, instanceKeyPEM = util.GetClientCert(rootCert, certKey)
}
return caCert, instanceCertPEM, instanceKeyPEM, nil
}

View File

@ -5,13 +5,19 @@
package common
import (
"bytes"
"context"
"fmt"
"infini.sh/framework/core/agent"
"infini.sh/framework/core/elastic"
"infini.sh/framework/core/global"
"infini.sh/framework/core/host"
"infini.sh/framework/core/util"
"io"
"net/http"
"os"
"path"
"sync"
)
type Client struct {
@ -188,16 +194,100 @@ func (client *Client) DeletePipeline(ctx context.Context, agentBaseURL, pipeline
return client.doRequest(req, nil)
}
//func (client *Client) doRequest(req *util.Request, respObj interface{}) error {
// result, err := util.ExecuteRequest(req)
// if err != nil {
// return err
// }
// if result.StatusCode != 200 {
// return fmt.Errorf(string(result.Body))
// }
// if respObj == nil {
// return nil
// }
// return util.FromJSONBytes(result.Body, respObj)
//}
var(
hClient *http.Client
hClientOnce = sync.Once{}
)
func (client *Client) doRequest(req *util.Request, respObj interface{}) error {
result, err := util.ExecuteRequest(req)
agCfg := GetAgentConfig()
var err error
hClientOnce.Do(func() {
var (
instanceCrt string
instanceKey string
)
instanceCrt, instanceKey, err = getAgentInstanceCerts(agCfg.Setup.CACertFile, agCfg.Setup.CAKeyFile)
hClient, err = util.NewMTLSClient(agCfg.Setup.CACertFile, instanceCrt, instanceKey)
})
if err != nil {
return err
}
var reader io.Reader
if len(req.Body) > 0 {
reader = bytes.NewReader(req.Body)
}
var hr *http.Request
if req.Context == nil {
hr, err = http.NewRequest(req.Method, req.Url, reader)
}else{
hr, err = http.NewRequestWithContext(req.Context, req.Method, req.Url, reader)
}
if err != nil {
return err
}
res, err := hClient.Do(hr)
if err != nil {
return err
}
if respObj != nil {
defer res.Body.Close()
buf, err := io.ReadAll(res.Body)
if err != nil {
return err
}
err = util.FromJSONBytes(buf, respObj)
if err != nil {
return err
}
if result.StatusCode != 200 {
return fmt.Errorf(string(result.Body))
}
if respObj == nil {
return nil
}
return util.FromJSONBytes(result.Body, respObj)
func getAgentInstanceCerts(caFile, caKey string) (string, string, error) {
dataDir := global.Env().GetDataDir()
instanceCrt := path.Join(dataDir, "certs/agent/instance.crt")
instanceKey := path.Join(dataDir, "certs/agent/instance.key")
var (
err error
clientCertPEM []byte
clientKeyPEM []byte
)
if util.FileExists(instanceCrt) && util.FileExists(instanceKey) {
return instanceCrt, instanceKey, nil
}
_, clientCertPEM, clientKeyPEM, err = GenerateClientCert(caFile, caKey)
if err != nil {
return "", "", err
}
baseDir := path.Join(dataDir, "certs/agent")
if !util.IsExist(baseDir){
err = os.MkdirAll(baseDir, 0775)
if err != nil {
return "", "", err
}
}
_, err = util.FilePutContentWithByte(instanceCrt, clientCertPEM)
if err != nil {
return "", "", err
}
_, err = util.FilePutContentWithByte(instanceKey, clientKeyPEM)
if err != nil {
return "", "", err
}
return instanceCrt, instanceKey, nil
}

View File

@ -4,6 +4,11 @@
package common
import (
"infini.sh/framework/core/env"
"sync"
)
type AgentConfig struct {
Enabled bool `config:"enabled"`
StateManager struct{
@ -19,3 +24,17 @@ type SetupConfig struct {
CAKeyFile string `config:"ca_key"`
ScriptEndpoint string `config:"script_endpoint"`
}
var agentCfg *AgentConfig
var onceCfg = sync.Once{}
func GetAgentConfig() *AgentConfig {
onceCfg.Do(func() {
agentCfg = &AgentConfig{}
_, err := env.ParseConfig("agent", agentCfg )
if err != nil {
panic(err)
}
})
return agentCfg
}