push config to agent through saving file

This commit is contained in:
liugq 2023-06-01 10:20:56 +08:00
parent e9f9d925f2
commit 0cd8aac956
11 changed files with 569 additions and 310 deletions

View File

@ -156,6 +156,8 @@ if [ $? -ne 0 ]; then
exit 1
fi
rm -f ${agent}
##################
# save cert
##################
@ -167,14 +169,13 @@ if [ $? -ne 0 ]; then
exit 1
fi
port="{{port}}"
## generate agent.yml
agent_config="path.configs: "config"
configs.auto_reload: true
env:
LOGGING_ES_ENDPOINT: {{logging_es_endpoint}}
LOGGING_ES_USER: {{logging_es_user}}
LOGGING_ES_PASS: {{logging_es_password}}
API_BINDING: "0.0.0.0:8080"
API_BINDING: "0.0.0.0:${port}"
path.data: data
path.logs: log
@ -194,104 +195,6 @@ badger:
value_log_max_entries: 1000000
value_log_file_size: 104857600
value_threshold: 1024
metrics:
enabled: true
queue: metrics
network:
enabled: true
summary: true
details: true
memory:
metrics:
- swap
- memory
disk:
metrics:
- iops
- usage
cpu:
metrics:
- idle
- system
- user
- iowait
- load
instance:
enabled: true
elasticsearch:
- name: default
enabled: true
endpoint: \$[[env.LOGGING_ES_ENDPOINT]]
discovery:
enabled: true
basic_auth:
username: \$[[env.LOGGING_ES_USER]]
password: \$[[env.LOGGING_ES_PASS]]
pipeline:
- name: logs_indexing_merge
auto_start: true
keep_running: true
processor:
- indexing_merge:
index_name: ".infini_logs"
elasticsearch: "default"
input_queue: "logs"
idle_timeout_in_seconds: 10
output_queue:
name: "logs_requests"
label:
tag: "logs"
worker_size: 1
bulk_size_in_mb: 10
- name: ingest_logs
auto_start: true
keep_running: true
processor:
- bulk_indexing:
bulk:
compress: true
batch_size_in_mb: 5
batch_size_in_docs: 5000
consumer:
fetch_max_messages: 100
queues:
type: indexing_merge
tag: "logs"
when:
cluster_available: ["default"]
- name: metrics_indexing_merge
auto_start: true
keep_running: true
processor:
- indexing_merge:
elasticsearch: "default"
index_name: ".infini_metrics"
input_queue: "metrics"
output_queue:
name: "metrics_requests"
label:
tag: "metrics"
worker_size: 1
bulk_size_in_mb: 5
- name: ingest_metrics
auto_start: true
keep_running: true
processor:
- bulk_indexing:
bulk:
compress: true
batch_size_in_mb: 5
batch_size_in_docs: 5000
consumer:
fetch_max_messages: 100
queues:
type: indexing_merge
tag: "metrics"
when:
cluster_available: ["default"]
agent:
major_ip_pattern: "192.*"
"
@ -331,6 +234,16 @@ if [ $? -ne 0 ]; then
fi
printf "\n* agent service started"
console_endpoint="{{console_endpoint}}"
sleep 3
printf "\n* start register\n"
token={{token}}
curl -X POST ${console_endpoint}/agent/instance?token=${token}
if [ $? -ne 0 ]; then
exit 1
fi
printf "\n* agent registered\n"
printf "\n* ${GREEN}Congratulations, install success!${CLR}\n\n"

View File

@ -41,7 +41,22 @@ func (module *AgentModule) Start() error {
orm.RegisterSchemaWithIndexName(agent.ESNodeInfo{}, "agent-node")
orm.RegisterSchemaWithIndexName(host.HostInfo{}, "host")
orm.RegisterSchemaWithIndexName(agent.Setting{}, "agent-setting")
client.RegisterClient(&client.Client{})
var (
executor client.Executor
err error
)
if module.AgentConfig.Setup == nil {
executor = &client.HttpExecutor{}
}else{
executor, err = client.NewMTLSExecutor(module.AgentConfig.Setup.CACertFile, module.AgentConfig.Setup.CAKeyFile)
if err != nil {
panic(err)
}
}
agClient := &client.Client{
Executor: executor,
}
client.RegisterClient(agClient)
if module.AgentConfig.StateManager.Enabled {
onlineAgentIDs, err := common.GetLatestOnlineAgentIDs(nil, 60)
@ -59,7 +74,7 @@ func (module *AgentModule) Start() error {
}
}
sm := state.NewStateManager(time.Second*30, "agent_state", agentIds)
sm := state.NewStateManager(time.Second*30, "agent_state", agentIds, agClient)
state.RegisterStateManager(sm)
go sm.LoopState()
}

View File

@ -23,6 +23,7 @@ import (
"infini.sh/framework/modules/elastic/common"
"net/http"
"strconv"
"time"
)
type APIHandler struct {
@ -37,22 +38,30 @@ func (h *APIHandler) createInstance(w http.ResponseWriter, req *http.Request, ps
log.Error(err)
return
}
oldInst := &agent.Instance{}
oldInst.ID = obj.ID
exists, err := orm.Get(oldInst)
if err != nil && err != elastic2.ErrNotFound {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err)
return
}
if exists {
errMsg := fmt.Sprintf("agent [%s] already exists", obj.ID)
h.WriteError(w, errMsg, http.StatusInternalServerError)
log.Error(errMsg)
return
//validate token for auto register
token := h.GetParameter(req, "token")
if token != "" {
if v, ok := tokens.Load(token); !ok {
h.WriteError(w, "token is invalid", http.StatusUnauthorized)
return
}else{
if t, ok := v.(*Token); !ok || t.CreatedAt.Add(ExpiredIn).Before(time.Now()) {
tokens.Delete(token)
h.WriteError(w, "token was expired", http.StatusUnauthorized)
return
}
}
remoteIP := util.ClientIP(req)
agCfg := common2.GetAgentConfig()
port := agCfg.Setup.Port
if port == "" {
port = "8080"
}
obj.Endpoint = fmt.Sprintf("https://%s:%s", remoteIP, port)
obj.Tags = append(obj.Tags, "mtls")
obj.Tags = append(obj.Tags, "auto")
}
//fetch more information of agent instance
res, err := client.GetClient().GetInstanceBasicInfo(context.Background(), obj.GetEndpoint())
if err != nil {
@ -72,6 +81,24 @@ func (h *APIHandler) createInstance(w http.ResponseWriter, req *http.Request, ps
obj.MajorIP = res.MajorIP
obj.Host = res.Host
obj.IPS = res.IPS
if obj.Name == "" {
obj.Name = res.Name
}
}
oldInst := &agent.Instance{}
oldInst.ID = obj.ID
exists, err := orm.Get(oldInst)
if err != nil && err != elastic2.ErrNotFound {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err)
return
}
if exists {
errMsg := fmt.Sprintf("agent [%s] already exists", obj.ID)
h.WriteError(w, errMsg, http.StatusInternalServerError)
log.Error(errMsg)
return
}
obj.Status = model.StatusOnline
@ -85,11 +112,33 @@ func (h *APIHandler) createInstance(w http.ResponseWriter, req *http.Request, ps
if err != nil {
log.Error(err)
}
err = pushIngestConfigToAgent(obj)
if err != nil {
log.Error(err)
}
h.WriteCreatedOKJSON(w, obj.ID)
}
func pushIngestConfigToAgent(inst *agent.Instance) error{
ingestCfg, basicAuth, err := common2.GetAgentIngestConfig()
if err != nil {
return err
}
if basicAuth != nil && basicAuth.Password != "" {
err = client.GetClient().SetKeystoreValue(context.Background(), inst.GetEndpoint(), "ingest_cluster_password", basicAuth.Password)
if err != nil {
return fmt.Errorf("set keystore value to agent error: %w", err)
}
}
err = client.GetClient().SaveDynamicConfig(context.Background(), inst.GetEndpoint(), "ingest", ingestCfg )
if err != nil {
fmt.Errorf("save dynamic config to agent error: %w", err)
}
return nil
}
func (h *APIHandler) getInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
id := ps.MustGetParameter("instance_id")
@ -157,6 +206,22 @@ func (h *APIHandler) deleteInstance(w http.ResponseWriter, req *http.Request, ps
return
}
queryDsl = util.MapStr{
"query": util.MapStr{
"term": util.MapStr{
"metadata.labels.agent_id": util.MapStr{
"value": id,
},
},
},
}
err = orm.DeleteBy(agent.Setting{}, util.MustToJSONBytes(queryDsl))
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error("delete agent settings error: ", err)
return
}
h.WriteDeletedOKJSON(w, id)
}
@ -500,6 +565,26 @@ func (h *APIHandler) deleteESNode(w http.ResponseWriter, req *http.Request, ps h
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
q = util.MapStr{
"query": util.MapStr{
"bool": util.MapStr{
"must": []util.MapStr{
{
"terms": util.MapStr{
"metadata.labels.node_uuid": nodeIDs,
},
},
{
"term": util.MapStr{
"metadata.labels.agent_id": util.MapStr{
"value": id,
},
},
},
},
},
},
}
}
h.WriteAckOKJSON(w)
}

View File

@ -11,7 +11,6 @@ import (
"infini.sh/console/modules/agent/common"
"infini.sh/framework/core/api/rbac"
httprouter "infini.sh/framework/core/api/router"
"infini.sh/framework/core/elastic"
"infini.sh/framework/core/global"
"infini.sh/framework/core/util"
"os"
@ -69,16 +68,16 @@ func (h *APIHandler) generateInstallCommand(w http.ResponseWriter, req *http.Req
}
tokens.Store(tokenStr, t)
agCfg := common.GetAgentConfig()
scriptEndpoint := agCfg.Setup.ScriptEndpoint
if scriptEndpoint == "" {
consoleEndpoint := agCfg.Setup.ConsoleEndpoint
if consoleEndpoint == "" {
scheme := "http"
if req.TLS != nil {
scheme = "https"
}
scriptEndpoint = fmt.Sprintf("%s://%s", scheme, req.Host)
consoleEndpoint = fmt.Sprintf("%s://%s", scheme, req.Host)
}
h.WriteJSON(w, util.MapStr{
"script": fmt.Sprintf(`sudo bash -c "$(curl -L '%s/agent/install.sh?token=%s')"`, scriptEndpoint, tokenStr),
"script": fmt.Sprintf(`sudo bash -c "$(curl -L '%s/agent/install.sh?token=%s')"`, consoleEndpoint, tokenStr),
"token": tokenStr,
"expired_at": t.CreatedAt.Add(ExpiredIn),
}, http.StatusOK)
@ -120,25 +119,31 @@ func (h *APIHandler) getInstallScript(w http.ResponseWriter, req *http.Request,
if downloadURL == "" {
downloadURL = "https://release.infinilabs.com/agent/stable/"
}
esCfg := elastic.GetConfig(global.MustLookupString(elastic.GlobalSystemElasticsearchID))
var (
loggingESUser string
loggingESPassword string
)
if esCfg.BasicAuth != nil {
loggingESUser = esCfg.BasicAuth.Username
loggingESPassword = esCfg.BasicAuth.Password
//esCfg := elastic.GetConfig(global.MustLookupString(elastic.GlobalSystemElasticsearchID))
//var (
// loggingESUser string
// loggingESPassword string
//)
//if esCfg.BasicAuth != nil {
// loggingESUser = esCfg.BasicAuth.Username
// loggingESPassword = esCfg.BasicAuth.Password
//}
port := agCfg.Setup.Port
if port == "" {
port = "8080"
}
tpl.Execute(w, map[string]interface{}{
"base_url": agCfg.Setup.DownloadURL,
"agent_version": agCfg.Setup.Version,
//"console_endpoint": util.MustToJSON(util.MustToJSON(gatewayEndpoints)),
"console_endpoint": agCfg.Setup.ConsoleEndpoint,
"client_crt": clientCertPEM,
"client_key": clientKeyPEM,
"ca_crt": caCert,
"logging_es_endpoint": esCfg.Endpoint,
"logging_es_user": loggingESUser,
"logging_es_password": loggingESPassword,
"port": port,
"token": tokenStr,
//"logging_es_endpoint": esCfg.Endpoint,
//"logging_es_user": loggingESUser,
//"logging_es_password": loggingESPassword,
})
}

View File

@ -5,20 +5,13 @@
package client
import (
"bytes"
"context"
"fmt"
"infini.sh/console/modules/agent/common"
"infini.sh/framework/core/agent"
"infini.sh/framework/core/elastic"
"infini.sh/framework/core/global"
"infini.sh/framework/core/host"
"infini.sh/framework/core/util"
"io"
"net/http"
"os"
"path"
"sync"
)
var defaultClient ClientAPI
@ -44,11 +37,15 @@ type ClientAPI interface {
AuthESNode(ctx context.Context, agentBaseURL string, cfg elastic.ElasticsearchConfig) (*agent.ESNodeInfo, error)
CreatePipeline(ctx context.Context, agentBaseURL string, body []byte) error
DeletePipeline(ctx context.Context, agentBaseURL, pipelineID string) error
SetKeystoreValue(ctx context.Context, agentBaseURL string, key, value string) error
SaveDynamicConfig(ctx context.Context, agentBaseURL string, name, content string) error
}
type Client struct {
Executor Executor
}
func (client *Client) GetHostInfo(ctx context.Context, agentBaseURL string) (*host.HostInfo, error) {
req := &util.Request{
Method: http.MethodGet,
@ -220,100 +217,37 @@ func (client *Client) DeletePipeline(ctx context.Context, agentBaseURL, pipeline
return client.doRequest(req, nil)
}
//func (client *Client) doRequest(req *util.Request, respObj interface{}) error {
// result, err := util.ExecuteRequest(req)
// if err != nil {
// return err
// }
// if result.StatusCode != 200 {
// return fmt.Errorf(string(result.Body))
// }
// if respObj == nil {
// return nil
// }
// return util.FromJSONBytes(result.Body, respObj)
//}
var(
hClient *http.Client
hClientOnce = sync.Once{}
)
func (client *Client) doRequest(req *util.Request, respObj interface{}) error {
agCfg := common.GetAgentConfig()
var err error
hClientOnce.Do(func() {
var (
instanceCrt string
instanceKey string
)
instanceCrt, instanceKey, err = getAgentInstanceCerts(agCfg.Setup.CACertFile, agCfg.Setup.CAKeyFile)
hClient, err = util.NewMTLSClient(agCfg.Setup.CACertFile, instanceCrt, instanceKey)
})
if err != nil {
return err
func (client *Client) SetKeystoreValue(ctx context.Context, agentBaseURL string, key, value string) error{
body := util.MapStr{
"key": key,
"value": value,
}
var reader io.Reader
if len(req.Body) > 0 {
reader = bytes.NewReader(req.Body)
req := &util.Request{
Method: http.MethodPost,
Url: fmt.Sprintf("%s/_framework/keystore", agentBaseURL),
Context: ctx,
Body: util.MustToJSONBytes(body),
}
var hr *http.Request
if req.Context == nil {
hr, err = http.NewRequest(req.Method, req.Url, reader)
}else{
hr, err = http.NewRequestWithContext(req.Context, req.Method, req.Url, reader)
}
if err != nil {
return err
}
res, err := hClient.Do(hr)
if err != nil {
return err
}
if respObj != nil {
defer res.Body.Close()
buf, err := io.ReadAll(res.Body)
if err != nil {
return err
}
err = util.FromJSONBytes(buf, respObj)
if err != nil {
return err
}
}
return nil
return client.doRequest(req, nil)
}
func getAgentInstanceCerts(caFile, caKey string) (string, string, error) {
dataDir := global.Env().GetDataDir()
instanceCrt := path.Join(dataDir, "certs/agent/instance.crt")
instanceKey := path.Join(dataDir, "certs/agent/instance.key")
var (
err error
clientCertPEM []byte
clientKeyPEM []byte
)
if util.FileExists(instanceCrt) && util.FileExists(instanceKey) {
return instanceCrt, instanceKey, nil
func (client *Client) SaveDynamicConfig(ctx context.Context, agentBaseURL string, name, content string) error{
body := util.MapStr{
"configs": util.MapStr{
name: content,
},
}
_, clientCertPEM, clientKeyPEM, err = common.GenerateClientCert(caFile, caKey)
if err != nil {
return "", "", err
req := &util.Request{
Method: http.MethodPost,
Url: fmt.Sprintf("%s/agent/config", agentBaseURL),
Context: ctx,
Body: util.MustToJSONBytes(body),
}
baseDir := path.Join(dataDir, "certs/agent")
if !util.IsExist(baseDir){
err = os.MkdirAll(baseDir, 0775)
if err != nil {
return "", "", err
}
}
_, err = util.FilePutContentWithByte(instanceCrt, clientCertPEM)
if err != nil {
return "", "", err
}
_, err = util.FilePutContentWithByte(instanceKey, clientKeyPEM)
if err != nil {
return "", "", err
}
return instanceCrt, instanceKey, nil
}
return client.doRequest(req, nil)
}
func (client *Client) doRequest(req *util.Request, respObj interface{}) error {
return client.Executor.DoRequest(req, respObj)
}

View File

@ -0,0 +1,100 @@
/* Copyright © INFINI Ltd. All rights reserved.
* Web: https://infinilabs.com
* Email: hello#infini.ltd */
package client
import (
"bytes"
"fmt"
"infini.sh/console/modules/agent/common"
"infini.sh/framework/core/util"
"io"
"net/http"
)
type Executor interface {
DoRequest(req *util.Request, respObj interface{}) error
}
type HttpExecutor struct {
}
func (executor *HttpExecutor) DoRequest(req *util.Request, respObj interface{}) error {
result, err := util.ExecuteRequest(req)
if err != nil {
return err
}
if result.StatusCode != 200 {
return fmt.Errorf(string(result.Body))
}
if respObj == nil {
return nil
}
return util.FromJSONBytes(result.Body, respObj)
}
func NewMTLSExecutor(caCertFile, caKeyFile string) (*MTLSExecutor, error){
var (
instanceCrt string
instanceKey string
)
instanceCrt, instanceKey, err := common.GetAgentInstanceCerts(caCertFile, caKeyFile)
if err != nil {
return nil, fmt.Errorf("generate tls cert error: %w", err)
}
hClient, err := util.NewMTLSClient(caCertFile, instanceCrt, instanceKey)
if err != nil {
return nil, err
}
return &MTLSExecutor{
CaCertFile: caCertFile,
CAKeyFile: caKeyFile,
client: hClient,
}, nil
}
type MTLSExecutor struct {
CaCertFile string
CAKeyFile string
client *http.Client
}
func (executor *MTLSExecutor) DoRequest(req *util.Request, respObj interface{}) error {
var reader io.Reader
if len(req.Body) > 0 {
reader = bytes.NewReader(req.Body)
}
var (
hr *http.Request
err error
)
if req.Context == nil {
hr, err = http.NewRequest(req.Method, req.Url, reader)
}else{
hr, err = http.NewRequestWithContext(req.Context, req.Method, req.Url, reader)
}
if err != nil {
return err
}
res, err := executor.client.Do(hr)
if err != nil {
return err
}
defer res.Body.Close()
buf, err := io.ReadAll(res.Body)
if err != nil {
return err
}
if res.StatusCode != 200 {
return fmt.Errorf(string(buf))
}
if respObj != nil {
err = util.FromJSONBytes(buf, respObj)
if err != nil {
return err
}
}
return nil
}

View File

@ -61,3 +61,37 @@ func generateCert(caFile, caKey string, isServer bool)(caCert, instanceCertPEM,
}
return caCert, instanceCertPEM, instanceKeyPEM, nil
}
func GetAgentInstanceCerts(caFile, caKey string) (string, string, error) {
dataDir := global.Env().GetDataDir()
instanceCrt := path.Join(dataDir, "certs/agent/instance.crt")
instanceKey := path.Join(dataDir, "certs/agent/instance.key")
var (
err error
clientCertPEM []byte
clientKeyPEM []byte
)
if util.FileExists(instanceCrt) && util.FileExists(instanceKey) {
return instanceCrt, instanceKey, nil
}
_, clientCertPEM, clientKeyPEM, err = GenerateClientCert(caFile, caKey)
if err != nil {
return "", "", err
}
baseDir := path.Join(dataDir, "certs/agent")
if !util.IsExist(baseDir){
err = os.MkdirAll(baseDir, 0775)
if err != nil {
return "", "", err
}
}
_, err = util.FilePutContentWithByte(instanceCrt, clientCertPEM)
if err != nil {
return "", "", err
}
_, err = util.FilePutContentWithByte(instanceKey, clientKeyPEM)
if err != nil {
return "", "", err
}
return instanceCrt, instanceKey, nil
}

View File

@ -7,19 +7,14 @@ package common
import (
"infini.sh/console/modules/agent/model"
"infini.sh/framework/core/env"
"sync"
)
var agentCfg *model.AgentConfig
var onceCfg = sync.Once{}
func GetAgentConfig() *model.AgentConfig {
onceCfg.Do(func() {
agentCfg = &model.AgentConfig{}
_, err := env.ParseConfig("agent", agentCfg )
if err != nil {
panic(err)
}
})
agentCfg := &model.AgentConfig{}
_, err := env.ParseConfig("agent", agentCfg )
if err != nil {
panic(err)
}
return agentCfg
}

View File

@ -8,11 +8,13 @@ import (
"fmt"
"infini.sh/console/modules/agent/model"
"infini.sh/framework/core/agent"
"infini.sh/framework/core/credential"
"infini.sh/framework/core/elastic"
"infini.sh/framework/core/event"
"infini.sh/framework/core/orm"
"infini.sh/framework/core/util"
log "src/github.com/cihub/seelog"
"strings"
)
func ParseAgentSettings(settings []agent.Setting)(*model.ParseAgentSettingsResult, error){
@ -71,44 +73,48 @@ func ParseAgentSettings(settings []agent.Setting)(*model.ParseAgentSettingsResul
}, nil
}
// GetAgentSettings query agent setting by agent id and updated timestamp,
// if there has any setting was updated, then return setting list includes settings not changed,
// otherwise return empty setting list
func GetAgentSettings(agentID string, timestamp int64) ([]agent.Setting, error) {
queryDsl := util.MapStr{
"size": 500,
"query": util.MapStr{
"bool": util.MapStr{
"must": []util.MapStr{
{
"term": util.MapStr{
"metadata.category": util.MapStr{
"value": "agent",
},
},
},
{
"term": util.MapStr{
"metadata.name": util.MapStr{
"value": "task",
},
},
},
{
"term": util.MapStr{
"metadata.labels.agent_id": util.MapStr{
"value": agentID,
},
},
},
{
"range": util.MapStr{
"updated": util.MapStr{
"gt": timestamp,
},
query := util.MapStr{
"bool": util.MapStr{
"must": []util.MapStr{
{
"term": util.MapStr{
"metadata.category": util.MapStr{
"value": "agent",
},
},
},
{
"term": util.MapStr{
"metadata.name": util.MapStr{
"value": "task",
},
},
},
{
"term": util.MapStr{
"metadata.labels.agent_id": util.MapStr{
"value": agentID,
},
},
},
//{
// "range": util.MapStr{
// "updated": util.MapStr{
// "gt": timestamp,
// },
// },
//},
},
},
}
queryDsl := util.MapStr{
"size": 500,
"query": query,
}
q := orm.Query{
RawQuery: util.MustToJSONBytes(queryDsl),
}
@ -119,7 +125,10 @@ func GetAgentSettings(agentID string, timestamp int64) ([]agent.Setting, error)
if len(result.Result) == 0 {
return nil, nil
}
var settings []agent.Setting
var (
settings []agent.Setting
hasUpdated bool
)
for _, row := range result.Result {
setting := agent.Setting{}
buf, err := util.ToJSONBytes(row)
@ -130,8 +139,14 @@ func GetAgentSettings(agentID string, timestamp int64) ([]agent.Setting, error)
if err != nil {
return nil, err
}
if setting.Updated != nil && setting.Updated.UnixMilli() > timestamp {
hasUpdated = true
}
settings = append(settings, setting)
}
if !hasUpdated {
return nil, nil
}
return settings, nil
}
@ -335,4 +350,132 @@ func GetLatestOnlineAgentIDs(agentIds []string, lastSeconds int) (map[string]str
}
}
return agentIDs, nil
}
func GetAgentIngestConfig() (string, *elastic.BasicAuth, error) {
agCfg := GetAgentConfig()
var (
endpoint string
ok bool
)
if endpoint, ok = agCfg.Setup.IngestClusterEndpoint.(string);ok {
if endpoint = strings.TrimSpace(endpoint); endpoint == "" {
return "", nil, fmt.Errorf("config ingest_cluster_endpoint must not be empty")
}
}
var (
basicAuth elastic.BasicAuth
)
if agCfg.Setup.IngestClusterCredentialID != "" {
cred := credential.Credential{}
cred.ID = agCfg.Setup.IngestClusterCredentialID
_, err := orm.Get(&cred)
if err != nil {
return "", nil, fmt.Errorf("query credential [%s] error: %w", cred.ID, err)
}
info, err := cred.Decode()
if err != nil {
return "", nil, fmt.Errorf("decode credential [%s] error: %w", cred.ID, err)
}
if basicAuth, ok = info.(elastic.BasicAuth); !ok {
log.Debug("invalid credential: ", cred)
}
}
tpl := `elasticsearch:
- name: default
enabled: true
endpoint: %s
discovery:
enabled: true
basic_auth:
username: %s
password: $[[keystore.ingest_cluster_password]]
metrics:
enabled: true
queue: metrics
network:
enabled: true
summary: true
details: true
memory:
metrics:
- swap
- memory
disk:
metrics:
- iops
- usage
cpu:
metrics:
- idle
- system
- user
- iowait
- load
instance:
enabled: true
pipeline:
- name: logs_indexing_merge
auto_start: true
keep_running: true
processor:
- indexing_merge:
index_name: ".infini_logs"
elasticsearch: "default"
input_queue: "logs"
idle_timeout_in_seconds: 10
output_queue:
name: "logs_requests"
label:
tag: "logs"
worker_size: 1
bulk_size_in_mb: 10
- name: ingest_logs
auto_start: true
keep_running: true
processor:
- bulk_indexing:
bulk:
compress: true
batch_size_in_mb: 5
batch_size_in_docs: 5000
consumer:
fetch_max_messages: 100
queues:
type: indexing_merge
tag: "logs"
when:
cluster_available: ["default"]
- name: metrics_indexing_merge
auto_start: true
keep_running: true
processor:
- indexing_merge:
elasticsearch: "default"
index_name: ".infini_metrics"
input_queue: "metrics"
output_queue:
name: "metrics_requests"
label:
tag: "metrics"
worker_size: 1
bulk_size_in_mb: 5
- name: ingest_metrics
auto_start: true
keep_running: true
processor:
- bulk_indexing:
bulk:
compress: true
batch_size_in_mb: 5
batch_size_in_docs: 5000
consumer:
fetch_max_messages: 100
queues:
type: indexing_merge
tag: "metrics"
when:
cluster_available: ["default"]`
tpl = fmt.Sprintf(tpl, endpoint, basicAuth.Username)
return tpl, &basicAuth, nil
}

View File

@ -9,7 +9,7 @@ type AgentConfig struct {
StateManager struct{
Enabled bool `config:"enabled"`
} `config:"state_manager"`
Setup SetupConfig `config:"setup"`
Setup *SetupConfig `config:"setup"`
}
type SetupConfig struct {
@ -17,5 +17,8 @@ type SetupConfig struct {
Version string `config:"version"`
CACertFile string `config:"ca_cert"`
CAKeyFile string `config:"ca_key"`
ScriptEndpoint string `config:"script_endpoint"`
ConsoleEndpoint string `config:"console_endpoint"`
IngestClusterEndpoint interface{} `config:"ingest_cluster_endpoint"`
IngestClusterCredentialID string `config:"ingest_cluster_credential_id"`
Port string `config:"port"`
}

View File

@ -17,9 +17,10 @@ import (
"infini.sh/framework/core/kv"
"infini.sh/framework/core/orm"
"infini.sh/framework/core/util"
"infini.sh/framework/modules/elastic"
"runtime"
"runtime/debug"
"strings"
"src/gopkg.in/yaml.v2"
"sync"
"time"
)
@ -63,13 +64,13 @@ type StateManager struct {
timestamps map[string]int64
}
func NewStateManager(TTL time.Duration, kvKey string, agentIds map[string]string) *StateManager {
func NewStateManager(TTL time.Duration, kvKey string, agentIds map[string]string, agentClient *client.Client) *StateManager {
return &StateManager{
TTL: TTL,
KVKey: kvKey,
stopC: make(chan struct{}),
stopCompleteC: make(chan struct{}),
agentClient: &client.Client{},
agentClient: agentClient,
agentIds: agentIds,
workerChan: make(chan struct{}, runtime.NumCPU()),
timestamps: map[string]int64{},
@ -124,7 +125,9 @@ func (sm *StateManager) checkAgentStatus() {
}()
ag, err := sm.GetAgent(agentID)
if err != nil {
log.Error(err)
if err != elastic.ErrNotFound {
log.Error(err)
}
return
}
ag.Status = model.StatusOffline
@ -142,6 +145,13 @@ func (sm *StateManager) checkAgentStatus() {
}
func (sm *StateManager) syncSettings(agentID string) {
ag, err := sm.GetAgent(agentID)
if err != nil {
if err != elastic.ErrNotFound {
log.Errorf("get agent error: %v", err)
}
return
}
newTimestamp := time.Now().UnixMilli()
settings, err := common.GetAgentSettings(agentID, sm.timestamps[agentID])
if err != nil {
@ -157,36 +167,58 @@ func (sm *StateManager) syncSettings(agentID string) {
log.Errorf("parse agent settings error: %v", err)
return
}
ag, err := sm.GetAgent(agentID)
agClient := sm.GetAgentClient()
var clusterCfgs []util.MapStr
if len(parseResult.ClusterConfigs) > 0 {
for _, cfg := range parseResult.ClusterConfigs {
clusterCfg := util.MapStr{
"name": cfg.ID,
"enabled": true,
"endpoint": cfg.Endpoint,
}
if cfg.BasicAuth != nil && cfg.BasicAuth.Password != ""{
err = agClient.SetKeystoreValue(context.Background(), ag.GetEndpoint(), fmt.Sprintf("%s_password", cfg.ID), cfg.BasicAuth.Password)
if err != nil {
log.Errorf("set keystore value error: %v", err)
continue
}
clusterCfg["basic_auth"] = util.MapStr{
"username": cfg.BasicAuth.Username,
"password": fmt.Sprintf("$[[keystore.%s_password]]", cfg.ID),
}
}
clusterCfgs = append(clusterCfgs, clusterCfg)
}
}
var dynamicCfg = util.MapStr{}
if len(clusterCfgs) > 0 {
dynamicCfg["elasticsearch"] = clusterCfgs
}
if len(parseResult.Pipelines) > 0 {
dynamicCfg["pipeline"] = parseResult.Pipelines
}
cfgBytes, err := yaml.Marshal(dynamicCfg)
if err != nil {
log.Errorf("get agent error: %v", err)
log.Error("serialize config to yaml error: ", err)
return
}
agClient := sm.GetAgentClient()
if len(parseResult.ClusterConfigs) > 0 {
err = agClient.RegisterElasticsearch(nil, ag.GetEndpoint(), parseResult.ClusterConfigs)
if err != nil {
log.Errorf("register elasticsearch config error: %v", err)
return
}
}
for _, pipelineID := range parseResult.ToDeletePipelineNames {
err = agClient.DeletePipeline(context.Background(), ag.GetEndpoint(), pipelineID)
if err != nil {
if !strings.Contains(err.Error(), "not found") {
log.Errorf("delete pipeline error: %v", err)
continue
}
}
//todo update delete pipeline state
}
for _, pipeline := range parseResult.Pipelines {
err = agClient.CreatePipeline(context.Background(), ag.GetEndpoint(), util.MustToJSONBytes(pipeline))
if err != nil {
log.Errorf("create pipeline error: %v", err)
return
}
}
err = agClient.SaveDynamicConfig(context.Background(), ag.GetEndpoint(), "dynamic_task", string(cfgBytes))
//for _, pipelineID := range parseResult.ToDeletePipelineNames {
// err = agClient.DeletePipeline(context.Background(), ag.GetEndpoint(), pipelineID)
// if err != nil {
// if !strings.Contains(err.Error(), "not found") {
// log.Errorf("delete pipeline error: %v", err)
// continue
// }
// }
//}
//for _, pipeline := range parseResult.Pipelines {
// err = agClient.CreatePipeline(context.Background(), ag.GetEndpoint(), util.MustToJSONBytes(pipeline))
// if err != nil {
// log.Errorf("create pipeline error: %v", err)
// return
// }
//}
sm.timestamps[agentID] = newTimestamp
}
@ -239,7 +271,7 @@ func (sm *StateManager) GetAgent(ID string) (*agent.Instance, error) {
if time.Since(timestamp) > sm.TTL {
exists, err := orm.Get(inst)
if err != nil {
return nil, fmt.Errorf("get agent [%s] error: %w", ID, err)
return nil, err
}
if !exists {
return nil, fmt.Errorf("can not found agent [%s]", ID)