diff --git a/config/install_agent.tpl b/config/install_agent.tpl index 971fe2b9..e9fa4676 100644 --- a/config/install_agent.tpl +++ b/config/install_agent.tpl @@ -156,6 +156,8 @@ if [ $? -ne 0 ]; then exit 1 fi +rm -f ${agent} + ################## # save cert ################## @@ -167,14 +169,13 @@ if [ $? -ne 0 ]; then exit 1 fi +port="{{port}}" + ## generate agent.yml agent_config="path.configs: "config" configs.auto_reload: true env: - LOGGING_ES_ENDPOINT: {{logging_es_endpoint}} - LOGGING_ES_USER: {{logging_es_user}} - LOGGING_ES_PASS: {{logging_es_password}} - API_BINDING: "0.0.0.0:8080" + API_BINDING: "0.0.0.0:${port}" path.data: data path.logs: log @@ -194,104 +195,6 @@ badger: value_log_max_entries: 1000000 value_log_file_size: 104857600 value_threshold: 1024 - -metrics: - enabled: true - queue: metrics - network: - enabled: true - summary: true - details: true - memory: - metrics: - - swap - - memory - disk: - metrics: - - iops - - usage - cpu: - metrics: - - idle - - system - - user - - iowait - - load - instance: - enabled: true - -elasticsearch: - - name: default - enabled: true - endpoint: \$[[env.LOGGING_ES_ENDPOINT]] - discovery: - enabled: true - basic_auth: - username: \$[[env.LOGGING_ES_USER]] - password: \$[[env.LOGGING_ES_PASS]] - -pipeline: - - name: logs_indexing_merge - auto_start: true - keep_running: true - processor: - - indexing_merge: - index_name: ".infini_logs" - elasticsearch: "default" - input_queue: "logs" - idle_timeout_in_seconds: 10 - output_queue: - name: "logs_requests" - label: - tag: "logs" - worker_size: 1 - bulk_size_in_mb: 10 - - name: ingest_logs - auto_start: true - keep_running: true - processor: - - bulk_indexing: - bulk: - compress: true - batch_size_in_mb: 5 - batch_size_in_docs: 5000 - consumer: - fetch_max_messages: 100 - queues: - type: indexing_merge - tag: "logs" - when: - cluster_available: ["default"] - - name: metrics_indexing_merge - auto_start: true - keep_running: true - processor: - - indexing_merge: - elasticsearch: "default" - index_name: ".infini_metrics" - input_queue: "metrics" - output_queue: - name: "metrics_requests" - label: - tag: "metrics" - worker_size: 1 - bulk_size_in_mb: 5 - - name: ingest_metrics - auto_start: true - keep_running: true - processor: - - bulk_indexing: - bulk: - compress: true - batch_size_in_mb: 5 - batch_size_in_docs: 5000 - consumer: - fetch_max_messages: 100 - queues: - type: indexing_merge - tag: "metrics" - when: - cluster_available: ["default"] agent: major_ip_pattern: "192.*" " @@ -331,6 +234,16 @@ if [ $? -ne 0 ]; then fi printf "\n* agent service started" +console_endpoint="{{console_endpoint}}" +sleep 3 +printf "\n* start register\n" +token={{token}} +curl -X POST ${console_endpoint}/agent/instance?token=${token} + +if [ $? -ne 0 ]; then + exit 1 +fi +printf "\n* agent registered\n" printf "\n* ${GREEN}Congratulations, install success!${CLR}\n\n" diff --git a/modules/agent/agent.go b/modules/agent/agent.go index 8352021a..9e8c3ee5 100644 --- a/modules/agent/agent.go +++ b/modules/agent/agent.go @@ -41,7 +41,22 @@ func (module *AgentModule) Start() error { orm.RegisterSchemaWithIndexName(agent.ESNodeInfo{}, "agent-node") orm.RegisterSchemaWithIndexName(host.HostInfo{}, "host") orm.RegisterSchemaWithIndexName(agent.Setting{}, "agent-setting") - client.RegisterClient(&client.Client{}) + var ( + executor client.Executor + err error + ) + if module.AgentConfig.Setup == nil { + executor = &client.HttpExecutor{} + }else{ + executor, err = client.NewMTLSExecutor(module.AgentConfig.Setup.CACertFile, module.AgentConfig.Setup.CAKeyFile) + if err != nil { + panic(err) + } + } + agClient := &client.Client{ + Executor: executor, + } + client.RegisterClient(agClient) if module.AgentConfig.StateManager.Enabled { onlineAgentIDs, err := common.GetLatestOnlineAgentIDs(nil, 60) @@ -59,7 +74,7 @@ func (module *AgentModule) Start() error { } } - sm := state.NewStateManager(time.Second*30, "agent_state", agentIds) + sm := state.NewStateManager(time.Second*30, "agent_state", agentIds, agClient) state.RegisterStateManager(sm) go sm.LoopState() } diff --git a/modules/agent/api/instance.go b/modules/agent/api/instance.go index 2b681a9e..eab6e74f 100644 --- a/modules/agent/api/instance.go +++ b/modules/agent/api/instance.go @@ -23,6 +23,7 @@ import ( "infini.sh/framework/modules/elastic/common" "net/http" "strconv" + "time" ) type APIHandler struct { @@ -37,22 +38,30 @@ func (h *APIHandler) createInstance(w http.ResponseWriter, req *http.Request, ps log.Error(err) return } - - oldInst := &agent.Instance{} - oldInst.ID = obj.ID - exists, err := orm.Get(oldInst) - - if err != nil && err != elastic2.ErrNotFound { - h.WriteError(w, err.Error(), http.StatusInternalServerError) - log.Error(err) - return - } - if exists { - errMsg := fmt.Sprintf("agent [%s] already exists", obj.ID) - h.WriteError(w, errMsg, http.StatusInternalServerError) - log.Error(errMsg) - return + //validate token for auto register + token := h.GetParameter(req, "token") + if token != "" { + if v, ok := tokens.Load(token); !ok { + h.WriteError(w, "token is invalid", http.StatusUnauthorized) + return + }else{ + if t, ok := v.(*Token); !ok || t.CreatedAt.Add(ExpiredIn).Before(time.Now()) { + tokens.Delete(token) + h.WriteError(w, "token was expired", http.StatusUnauthorized) + return + } + } + remoteIP := util.ClientIP(req) + agCfg := common2.GetAgentConfig() + port := agCfg.Setup.Port + if port == "" { + port = "8080" + } + obj.Endpoint = fmt.Sprintf("https://%s:%s", remoteIP, port) + obj.Tags = append(obj.Tags, "mtls") + obj.Tags = append(obj.Tags, "auto") } + //fetch more information of agent instance res, err := client.GetClient().GetInstanceBasicInfo(context.Background(), obj.GetEndpoint()) if err != nil { @@ -72,6 +81,24 @@ func (h *APIHandler) createInstance(w http.ResponseWriter, req *http.Request, ps obj.MajorIP = res.MajorIP obj.Host = res.Host obj.IPS = res.IPS + if obj.Name == "" { + obj.Name = res.Name + } + } + oldInst := &agent.Instance{} + oldInst.ID = obj.ID + exists, err := orm.Get(oldInst) + + if err != nil && err != elastic2.ErrNotFound { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + if exists { + errMsg := fmt.Sprintf("agent [%s] already exists", obj.ID) + h.WriteError(w, errMsg, http.StatusInternalServerError) + log.Error(errMsg) + return } obj.Status = model.StatusOnline @@ -85,11 +112,33 @@ func (h *APIHandler) createInstance(w http.ResponseWriter, req *http.Request, ps if err != nil { log.Error(err) } + err = pushIngestConfigToAgent(obj) + if err != nil { + log.Error(err) + } h.WriteCreatedOKJSON(w, obj.ID) } +func pushIngestConfigToAgent(inst *agent.Instance) error{ + ingestCfg, basicAuth, err := common2.GetAgentIngestConfig() + if err != nil { + return err + } + if basicAuth != nil && basicAuth.Password != "" { + err = client.GetClient().SetKeystoreValue(context.Background(), inst.GetEndpoint(), "ingest_cluster_password", basicAuth.Password) + if err != nil { + return fmt.Errorf("set keystore value to agent error: %w", err) + } + } + err = client.GetClient().SaveDynamicConfig(context.Background(), inst.GetEndpoint(), "ingest", ingestCfg ) + if err != nil { + fmt.Errorf("save dynamic config to agent error: %w", err) + } + return nil +} + func (h *APIHandler) getInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { id := ps.MustGetParameter("instance_id") @@ -157,6 +206,22 @@ func (h *APIHandler) deleteInstance(w http.ResponseWriter, req *http.Request, ps return } + queryDsl = util.MapStr{ + "query": util.MapStr{ + "term": util.MapStr{ + "metadata.labels.agent_id": util.MapStr{ + "value": id, + }, + }, + }, + } + err = orm.DeleteBy(agent.Setting{}, util.MustToJSONBytes(queryDsl)) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error("delete agent settings error: ", err) + return + } + h.WriteDeletedOKJSON(w, id) } @@ -500,6 +565,26 @@ func (h *APIHandler) deleteESNode(w http.ResponseWriter, req *http.Request, ps h h.WriteError(w, err.Error(), http.StatusInternalServerError) return } + q = util.MapStr{ + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "terms": util.MapStr{ + "metadata.labels.node_uuid": nodeIDs, + }, + }, + { + "term": util.MapStr{ + "metadata.labels.agent_id": util.MapStr{ + "value": id, + }, + }, + }, + }, + }, + }, + } } h.WriteAckOKJSON(w) } diff --git a/modules/agent/api/setup.go b/modules/agent/api/setup.go index 3f07ae7f..433377d6 100644 --- a/modules/agent/api/setup.go +++ b/modules/agent/api/setup.go @@ -11,7 +11,6 @@ import ( "infini.sh/console/modules/agent/common" "infini.sh/framework/core/api/rbac" httprouter "infini.sh/framework/core/api/router" - "infini.sh/framework/core/elastic" "infini.sh/framework/core/global" "infini.sh/framework/core/util" "os" @@ -69,16 +68,16 @@ func (h *APIHandler) generateInstallCommand(w http.ResponseWriter, req *http.Req } tokens.Store(tokenStr, t) agCfg := common.GetAgentConfig() - scriptEndpoint := agCfg.Setup.ScriptEndpoint - if scriptEndpoint == "" { + consoleEndpoint := agCfg.Setup.ConsoleEndpoint + if consoleEndpoint == "" { scheme := "http" if req.TLS != nil { scheme = "https" } - scriptEndpoint = fmt.Sprintf("%s://%s", scheme, req.Host) + consoleEndpoint = fmt.Sprintf("%s://%s", scheme, req.Host) } h.WriteJSON(w, util.MapStr{ - "script": fmt.Sprintf(`sudo bash -c "$(curl -L '%s/agent/install.sh?token=%s')"`, scriptEndpoint, tokenStr), + "script": fmt.Sprintf(`sudo bash -c "$(curl -L '%s/agent/install.sh?token=%s')"`, consoleEndpoint, tokenStr), "token": tokenStr, "expired_at": t.CreatedAt.Add(ExpiredIn), }, http.StatusOK) @@ -120,25 +119,31 @@ func (h *APIHandler) getInstallScript(w http.ResponseWriter, req *http.Request, if downloadURL == "" { downloadURL = "https://release.infinilabs.com/agent/stable/" } - esCfg := elastic.GetConfig(global.MustLookupString(elastic.GlobalSystemElasticsearchID)) - var ( - loggingESUser string - loggingESPassword string - ) - if esCfg.BasicAuth != nil { - loggingESUser = esCfg.BasicAuth.Username - loggingESPassword = esCfg.BasicAuth.Password + //esCfg := elastic.GetConfig(global.MustLookupString(elastic.GlobalSystemElasticsearchID)) + //var ( + // loggingESUser string + // loggingESPassword string + //) + //if esCfg.BasicAuth != nil { + // loggingESUser = esCfg.BasicAuth.Username + // loggingESPassword = esCfg.BasicAuth.Password + //} + port := agCfg.Setup.Port + if port == "" { + port = "8080" } tpl.Execute(w, map[string]interface{}{ "base_url": agCfg.Setup.DownloadURL, "agent_version": agCfg.Setup.Version, - //"console_endpoint": util.MustToJSON(util.MustToJSON(gatewayEndpoints)), + "console_endpoint": agCfg.Setup.ConsoleEndpoint, "client_crt": clientCertPEM, "client_key": clientKeyPEM, "ca_crt": caCert, - "logging_es_endpoint": esCfg.Endpoint, - "logging_es_user": loggingESUser, - "logging_es_password": loggingESPassword, + "port": port, + "token": tokenStr, + //"logging_es_endpoint": esCfg.Endpoint, + //"logging_es_user": loggingESUser, + //"logging_es_password": loggingESPassword, }) } diff --git a/modules/agent/client/client.go b/modules/agent/client/client.go index 5db68a14..9f35870b 100644 --- a/modules/agent/client/client.go +++ b/modules/agent/client/client.go @@ -5,20 +5,13 @@ package client import ( - "bytes" "context" "fmt" - "infini.sh/console/modules/agent/common" "infini.sh/framework/core/agent" "infini.sh/framework/core/elastic" - "infini.sh/framework/core/global" "infini.sh/framework/core/host" "infini.sh/framework/core/util" - "io" "net/http" - "os" - "path" - "sync" ) var defaultClient ClientAPI @@ -44,11 +37,15 @@ type ClientAPI interface { AuthESNode(ctx context.Context, agentBaseURL string, cfg elastic.ElasticsearchConfig) (*agent.ESNodeInfo, error) CreatePipeline(ctx context.Context, agentBaseURL string, body []byte) error DeletePipeline(ctx context.Context, agentBaseURL, pipelineID string) error + SetKeystoreValue(ctx context.Context, agentBaseURL string, key, value string) error + SaveDynamicConfig(ctx context.Context, agentBaseURL string, name, content string) error } type Client struct { + Executor Executor } + func (client *Client) GetHostInfo(ctx context.Context, agentBaseURL string) (*host.HostInfo, error) { req := &util.Request{ Method: http.MethodGet, @@ -220,100 +217,37 @@ func (client *Client) DeletePipeline(ctx context.Context, agentBaseURL, pipeline return client.doRequest(req, nil) } -//func (client *Client) doRequest(req *util.Request, respObj interface{}) error { -// result, err := util.ExecuteRequest(req) -// if err != nil { -// return err -// } -// if result.StatusCode != 200 { -// return fmt.Errorf(string(result.Body)) -// } -// if respObj == nil { -// return nil -// } -// return util.FromJSONBytes(result.Body, respObj) -//} - -var( - hClient *http.Client - hClientOnce = sync.Once{} -) -func (client *Client) doRequest(req *util.Request, respObj interface{}) error { - agCfg := common.GetAgentConfig() - var err error - hClientOnce.Do(func() { - var ( - instanceCrt string - instanceKey string - ) - instanceCrt, instanceKey, err = getAgentInstanceCerts(agCfg.Setup.CACertFile, agCfg.Setup.CAKeyFile) - hClient, err = util.NewMTLSClient(agCfg.Setup.CACertFile, instanceCrt, instanceKey) - }) - if err != nil { - return err +func (client *Client) SetKeystoreValue(ctx context.Context, agentBaseURL string, key, value string) error{ + body := util.MapStr{ + "key": key, + "value": value, } - var reader io.Reader - if len(req.Body) > 0 { - reader = bytes.NewReader(req.Body) + req := &util.Request{ + Method: http.MethodPost, + Url: fmt.Sprintf("%s/_framework/keystore", agentBaseURL), + Context: ctx, + Body: util.MustToJSONBytes(body), } - - var hr *http.Request - if req.Context == nil { - hr, err = http.NewRequest(req.Method, req.Url, reader) - }else{ - hr, err = http.NewRequestWithContext(req.Context, req.Method, req.Url, reader) - } - if err != nil { - return err - } - res, err := hClient.Do(hr) - if err != nil { - return err - } - if respObj != nil { - defer res.Body.Close() - buf, err := io.ReadAll(res.Body) - if err != nil { - return err - } - err = util.FromJSONBytes(buf, respObj) - if err != nil { - return err - } - } - return nil + return client.doRequest(req, nil) } -func getAgentInstanceCerts(caFile, caKey string) (string, string, error) { - dataDir := global.Env().GetDataDir() - instanceCrt := path.Join(dataDir, "certs/agent/instance.crt") - instanceKey := path.Join(dataDir, "certs/agent/instance.key") - var ( - err error - clientCertPEM []byte - clientKeyPEM []byte - ) - if util.FileExists(instanceCrt) && util.FileExists(instanceKey) { - return instanceCrt, instanceKey, nil +func (client *Client) SaveDynamicConfig(ctx context.Context, agentBaseURL string, name, content string) error{ + body := util.MapStr{ + "configs": util.MapStr{ + name: content, + }, } - _, clientCertPEM, clientKeyPEM, err = common.GenerateClientCert(caFile, caKey) - if err != nil { - return "", "", err + req := &util.Request{ + Method: http.MethodPost, + Url: fmt.Sprintf("%s/agent/config", agentBaseURL), + Context: ctx, + Body: util.MustToJSONBytes(body), } - baseDir := path.Join(dataDir, "certs/agent") - if !util.IsExist(baseDir){ - err = os.MkdirAll(baseDir, 0775) - if err != nil { - return "", "", err - } - } - _, err = util.FilePutContentWithByte(instanceCrt, clientCertPEM) - if err != nil { - return "", "", err - } - _, err = util.FilePutContentWithByte(instanceKey, clientKeyPEM) - if err != nil { - return "", "", err - } - return instanceCrt, instanceKey, nil -} \ No newline at end of file + return client.doRequest(req, nil) +} + + +func (client *Client) doRequest(req *util.Request, respObj interface{}) error { + return client.Executor.DoRequest(req, respObj) +} + diff --git a/modules/agent/client/executor.go b/modules/agent/client/executor.go new file mode 100644 index 00000000..f756f3a1 --- /dev/null +++ b/modules/agent/client/executor.go @@ -0,0 +1,100 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package client + +import ( + "bytes" + "fmt" + "infini.sh/console/modules/agent/common" + "infini.sh/framework/core/util" + "io" + "net/http" +) + +type Executor interface { + DoRequest(req *util.Request, respObj interface{}) error +} + +type HttpExecutor struct { +} + +func (executor *HttpExecutor) DoRequest(req *util.Request, respObj interface{}) error { + result, err := util.ExecuteRequest(req) + if err != nil { + return err + } + if result.StatusCode != 200 { + return fmt.Errorf(string(result.Body)) + } + if respObj == nil { + return nil + } + return util.FromJSONBytes(result.Body, respObj) +} + +func NewMTLSExecutor(caCertFile, caKeyFile string) (*MTLSExecutor, error){ + var ( + instanceCrt string + instanceKey string + ) + instanceCrt, instanceKey, err := common.GetAgentInstanceCerts(caCertFile, caKeyFile) + if err != nil { + return nil, fmt.Errorf("generate tls cert error: %w", err) + } + hClient, err := util.NewMTLSClient(caCertFile, instanceCrt, instanceKey) + if err != nil { + return nil, err + } + return &MTLSExecutor{ + CaCertFile: caCertFile, + CAKeyFile: caKeyFile, + client: hClient, + }, nil +} + +type MTLSExecutor struct { + CaCertFile string + CAKeyFile string + client *http.Client +} + + +func (executor *MTLSExecutor) DoRequest(req *util.Request, respObj interface{}) error { + var reader io.Reader + if len(req.Body) > 0 { + reader = bytes.NewReader(req.Body) + } + var ( + hr *http.Request + err error + ) + if req.Context == nil { + hr, err = http.NewRequest(req.Method, req.Url, reader) + }else{ + hr, err = http.NewRequestWithContext(req.Context, req.Method, req.Url, reader) + } + if err != nil { + return err + } + res, err := executor.client.Do(hr) + if err != nil { + return err + } + defer res.Body.Close() + buf, err := io.ReadAll(res.Body) + if err != nil { + return err + } + if res.StatusCode != 200 { + return fmt.Errorf(string(buf)) + } + if respObj != nil { + err = util.FromJSONBytes(buf, respObj) + if err != nil { + return err + } + } + return nil +} \ No newline at end of file diff --git a/modules/agent/common/cert.go b/modules/agent/common/cert.go index 7b9e718c..61521509 100644 --- a/modules/agent/common/cert.go +++ b/modules/agent/common/cert.go @@ -61,3 +61,37 @@ func generateCert(caFile, caKey string, isServer bool)(caCert, instanceCertPEM, } return caCert, instanceCertPEM, instanceKeyPEM, nil } + +func GetAgentInstanceCerts(caFile, caKey string) (string, string, error) { + dataDir := global.Env().GetDataDir() + instanceCrt := path.Join(dataDir, "certs/agent/instance.crt") + instanceKey := path.Join(dataDir, "certs/agent/instance.key") + var ( + err error + clientCertPEM []byte + clientKeyPEM []byte + ) + if util.FileExists(instanceCrt) && util.FileExists(instanceKey) { + return instanceCrt, instanceKey, nil + } + _, clientCertPEM, clientKeyPEM, err = GenerateClientCert(caFile, caKey) + if err != nil { + return "", "", err + } + baseDir := path.Join(dataDir, "certs/agent") + if !util.IsExist(baseDir){ + err = os.MkdirAll(baseDir, 0775) + if err != nil { + return "", "", err + } + } + _, err = util.FilePutContentWithByte(instanceCrt, clientCertPEM) + if err != nil { + return "", "", err + } + _, err = util.FilePutContentWithByte(instanceKey, clientKeyPEM) + if err != nil { + return "", "", err + } + return instanceCrt, instanceKey, nil +} \ No newline at end of file diff --git a/modules/agent/common/config.go b/modules/agent/common/config.go index 643b67fe..ac7e255e 100644 --- a/modules/agent/common/config.go +++ b/modules/agent/common/config.go @@ -7,19 +7,14 @@ package common import ( "infini.sh/console/modules/agent/model" "infini.sh/framework/core/env" - "sync" ) -var agentCfg *model.AgentConfig -var onceCfg = sync.Once{} func GetAgentConfig() *model.AgentConfig { - onceCfg.Do(func() { - agentCfg = &model.AgentConfig{} - _, err := env.ParseConfig("agent", agentCfg ) - if err != nil { - panic(err) - } - }) + agentCfg := &model.AgentConfig{} + _, err := env.ParseConfig("agent", agentCfg ) + if err != nil { + panic(err) + } return agentCfg } \ No newline at end of file diff --git a/modules/agent/common/helper.go b/modules/agent/common/helper.go index 8e760104..cc7ea55d 100644 --- a/modules/agent/common/helper.go +++ b/modules/agent/common/helper.go @@ -8,11 +8,13 @@ import ( "fmt" "infini.sh/console/modules/agent/model" "infini.sh/framework/core/agent" + "infini.sh/framework/core/credential" "infini.sh/framework/core/elastic" "infini.sh/framework/core/event" "infini.sh/framework/core/orm" "infini.sh/framework/core/util" log "src/github.com/cihub/seelog" + "strings" ) func ParseAgentSettings(settings []agent.Setting)(*model.ParseAgentSettingsResult, error){ @@ -71,44 +73,48 @@ func ParseAgentSettings(settings []agent.Setting)(*model.ParseAgentSettingsResul }, nil } +// GetAgentSettings query agent setting by agent id and updated timestamp, +// if there has any setting was updated, then return setting list includes settings not changed, +// otherwise return empty setting list func GetAgentSettings(agentID string, timestamp int64) ([]agent.Setting, error) { - queryDsl := util.MapStr{ - "size": 500, - "query": util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "metadata.category": util.MapStr{ - "value": "agent", - }, - }, - }, - { - "term": util.MapStr{ - "metadata.name": util.MapStr{ - "value": "task", - }, - }, - }, - { - "term": util.MapStr{ - "metadata.labels.agent_id": util.MapStr{ - "value": agentID, - }, - }, - }, - { - "range": util.MapStr{ - "updated": util.MapStr{ - "gt": timestamp, - }, + query := util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "metadata.category": util.MapStr{ + "value": "agent", }, }, }, + { + "term": util.MapStr{ + "metadata.name": util.MapStr{ + "value": "task", + }, + }, + }, + { + "term": util.MapStr{ + "metadata.labels.agent_id": util.MapStr{ + "value": agentID, + }, + }, + }, + //{ + // "range": util.MapStr{ + // "updated": util.MapStr{ + // "gt": timestamp, + // }, + // }, + //}, }, }, } + queryDsl := util.MapStr{ + "size": 500, + "query": query, + } q := orm.Query{ RawQuery: util.MustToJSONBytes(queryDsl), } @@ -119,7 +125,10 @@ func GetAgentSettings(agentID string, timestamp int64) ([]agent.Setting, error) if len(result.Result) == 0 { return nil, nil } - var settings []agent.Setting + var ( + settings []agent.Setting + hasUpdated bool + ) for _, row := range result.Result { setting := agent.Setting{} buf, err := util.ToJSONBytes(row) @@ -130,8 +139,14 @@ func GetAgentSettings(agentID string, timestamp int64) ([]agent.Setting, error) if err != nil { return nil, err } + if setting.Updated != nil && setting.Updated.UnixMilli() > timestamp { + hasUpdated = true + } settings = append(settings, setting) } + if !hasUpdated { + return nil, nil + } return settings, nil } @@ -335,4 +350,132 @@ func GetLatestOnlineAgentIDs(agentIds []string, lastSeconds int) (map[string]str } } return agentIDs, nil +} + +func GetAgentIngestConfig() (string, *elastic.BasicAuth, error) { + agCfg := GetAgentConfig() + var ( + endpoint string + ok bool + ) + if endpoint, ok = agCfg.Setup.IngestClusterEndpoint.(string);ok { + if endpoint = strings.TrimSpace(endpoint); endpoint == "" { + return "", nil, fmt.Errorf("config ingest_cluster_endpoint must not be empty") + } + } + var ( + basicAuth elastic.BasicAuth + ) + if agCfg.Setup.IngestClusterCredentialID != "" { + cred := credential.Credential{} + cred.ID = agCfg.Setup.IngestClusterCredentialID + _, err := orm.Get(&cred) + if err != nil { + return "", nil, fmt.Errorf("query credential [%s] error: %w", cred.ID, err) + } + info, err := cred.Decode() + if err != nil { + return "", nil, fmt.Errorf("decode credential [%s] error: %w", cred.ID, err) + } + if basicAuth, ok = info.(elastic.BasicAuth); !ok { + log.Debug("invalid credential: ", cred) + } + } + tpl := `elasticsearch: + - name: default + enabled: true + endpoint: %s + discovery: + enabled: true + basic_auth: + username: %s + password: $[[keystore.ingest_cluster_password]] +metrics: + enabled: true + queue: metrics + network: + enabled: true + summary: true + details: true + memory: + metrics: + - swap + - memory + disk: + metrics: + - iops + - usage + cpu: + metrics: + - idle + - system + - user + - iowait + - load + instance: + enabled: true +pipeline: + - name: logs_indexing_merge + auto_start: true + keep_running: true + processor: + - indexing_merge: + index_name: ".infini_logs" + elasticsearch: "default" + input_queue: "logs" + idle_timeout_in_seconds: 10 + output_queue: + name: "logs_requests" + label: + tag: "logs" + worker_size: 1 + bulk_size_in_mb: 10 + - name: ingest_logs + auto_start: true + keep_running: true + processor: + - bulk_indexing: + bulk: + compress: true + batch_size_in_mb: 5 + batch_size_in_docs: 5000 + consumer: + fetch_max_messages: 100 + queues: + type: indexing_merge + tag: "logs" + when: + cluster_available: ["default"] + - name: metrics_indexing_merge + auto_start: true + keep_running: true + processor: + - indexing_merge: + elasticsearch: "default" + index_name: ".infini_metrics" + input_queue: "metrics" + output_queue: + name: "metrics_requests" + label: + tag: "metrics" + worker_size: 1 + bulk_size_in_mb: 5 + - name: ingest_metrics + auto_start: true + keep_running: true + processor: + - bulk_indexing: + bulk: + compress: true + batch_size_in_mb: 5 + batch_size_in_docs: 5000 + consumer: + fetch_max_messages: 100 + queues: + type: indexing_merge + tag: "metrics" + when: + cluster_available: ["default"]` + tpl = fmt.Sprintf(tpl, endpoint, basicAuth.Username) + return tpl, &basicAuth, nil } \ No newline at end of file diff --git a/modules/agent/model/config.go b/modules/agent/model/config.go index 465e8f52..f4bb4a2d 100644 --- a/modules/agent/model/config.go +++ b/modules/agent/model/config.go @@ -9,7 +9,7 @@ type AgentConfig struct { StateManager struct{ Enabled bool `config:"enabled"` } `config:"state_manager"` - Setup SetupConfig `config:"setup"` + Setup *SetupConfig `config:"setup"` } type SetupConfig struct { @@ -17,5 +17,8 @@ type SetupConfig struct { Version string `config:"version"` CACertFile string `config:"ca_cert"` CAKeyFile string `config:"ca_key"` - ScriptEndpoint string `config:"script_endpoint"` + ConsoleEndpoint string `config:"console_endpoint"` + IngestClusterEndpoint interface{} `config:"ingest_cluster_endpoint"` + IngestClusterCredentialID string `config:"ingest_cluster_credential_id"` + Port string `config:"port"` } diff --git a/modules/agent/state/state.go b/modules/agent/state/state.go index 27507948..9a2d4a78 100644 --- a/modules/agent/state/state.go +++ b/modules/agent/state/state.go @@ -17,9 +17,10 @@ import ( "infini.sh/framework/core/kv" "infini.sh/framework/core/orm" "infini.sh/framework/core/util" + "infini.sh/framework/modules/elastic" "runtime" "runtime/debug" - "strings" + "src/gopkg.in/yaml.v2" "sync" "time" ) @@ -63,13 +64,13 @@ type StateManager struct { timestamps map[string]int64 } -func NewStateManager(TTL time.Duration, kvKey string, agentIds map[string]string) *StateManager { +func NewStateManager(TTL time.Duration, kvKey string, agentIds map[string]string, agentClient *client.Client) *StateManager { return &StateManager{ TTL: TTL, KVKey: kvKey, stopC: make(chan struct{}), stopCompleteC: make(chan struct{}), - agentClient: &client.Client{}, + agentClient: agentClient, agentIds: agentIds, workerChan: make(chan struct{}, runtime.NumCPU()), timestamps: map[string]int64{}, @@ -124,7 +125,9 @@ func (sm *StateManager) checkAgentStatus() { }() ag, err := sm.GetAgent(agentID) if err != nil { - log.Error(err) + if err != elastic.ErrNotFound { + log.Error(err) + } return } ag.Status = model.StatusOffline @@ -142,6 +145,13 @@ func (sm *StateManager) checkAgentStatus() { } func (sm *StateManager) syncSettings(agentID string) { + ag, err := sm.GetAgent(agentID) + if err != nil { + if err != elastic.ErrNotFound { + log.Errorf("get agent error: %v", err) + } + return + } newTimestamp := time.Now().UnixMilli() settings, err := common.GetAgentSettings(agentID, sm.timestamps[agentID]) if err != nil { @@ -157,36 +167,58 @@ func (sm *StateManager) syncSettings(agentID string) { log.Errorf("parse agent settings error: %v", err) return } - ag, err := sm.GetAgent(agentID) + agClient := sm.GetAgentClient() + var clusterCfgs []util.MapStr + if len(parseResult.ClusterConfigs) > 0 { + for _, cfg := range parseResult.ClusterConfigs { + clusterCfg := util.MapStr{ + "name": cfg.ID, + "enabled": true, + "endpoint": cfg.Endpoint, + } + if cfg.BasicAuth != nil && cfg.BasicAuth.Password != ""{ + err = agClient.SetKeystoreValue(context.Background(), ag.GetEndpoint(), fmt.Sprintf("%s_password", cfg.ID), cfg.BasicAuth.Password) + if err != nil { + log.Errorf("set keystore value error: %v", err) + continue + } + clusterCfg["basic_auth"] = util.MapStr{ + "username": cfg.BasicAuth.Username, + "password": fmt.Sprintf("$[[keystore.%s_password]]", cfg.ID), + } + } + clusterCfgs = append(clusterCfgs, clusterCfg) + } + } + var dynamicCfg = util.MapStr{} + if len(clusterCfgs) > 0 { + dynamicCfg["elasticsearch"] = clusterCfgs + } + if len(parseResult.Pipelines) > 0 { + dynamicCfg["pipeline"] = parseResult.Pipelines + } + cfgBytes, err := yaml.Marshal(dynamicCfg) if err != nil { - log.Errorf("get agent error: %v", err) + log.Error("serialize config to yaml error: ", err) return } - agClient := sm.GetAgentClient() - if len(parseResult.ClusterConfigs) > 0 { - err = agClient.RegisterElasticsearch(nil, ag.GetEndpoint(), parseResult.ClusterConfigs) - if err != nil { - log.Errorf("register elasticsearch config error: %v", err) - return - } - } - for _, pipelineID := range parseResult.ToDeletePipelineNames { - err = agClient.DeletePipeline(context.Background(), ag.GetEndpoint(), pipelineID) - if err != nil { - if !strings.Contains(err.Error(), "not found") { - log.Errorf("delete pipeline error: %v", err) - continue - } - } - //todo update delete pipeline state - } - for _, pipeline := range parseResult.Pipelines { - err = agClient.CreatePipeline(context.Background(), ag.GetEndpoint(), util.MustToJSONBytes(pipeline)) - if err != nil { - log.Errorf("create pipeline error: %v", err) - return - } - } + err = agClient.SaveDynamicConfig(context.Background(), ag.GetEndpoint(), "dynamic_task", string(cfgBytes)) + //for _, pipelineID := range parseResult.ToDeletePipelineNames { + // err = agClient.DeletePipeline(context.Background(), ag.GetEndpoint(), pipelineID) + // if err != nil { + // if !strings.Contains(err.Error(), "not found") { + // log.Errorf("delete pipeline error: %v", err) + // continue + // } + // } + //} + //for _, pipeline := range parseResult.Pipelines { + // err = agClient.CreatePipeline(context.Background(), ag.GetEndpoint(), util.MustToJSONBytes(pipeline)) + // if err != nil { + // log.Errorf("create pipeline error: %v", err) + // return + // } + //} sm.timestamps[agentID] = newTimestamp } @@ -239,7 +271,7 @@ func (sm *StateManager) GetAgent(ID string) (*agent.Instance, error) { if time.Since(timestamp) > sm.TTL { exists, err := orm.Get(inst) if err != nil { - return nil, fmt.Errorf("get agent [%s] error: %w", ID, err) + return nil, err } if !exists { return nil, fmt.Errorf("can not found agent [%s]", ID)