Merge pull request 'agent setup initial implementation' (#110) from agent_setup into master

This commit is contained in:
silenceqi 2023-06-01 10:54:18 +08:00
commit baee9d0c0e
17 changed files with 1261 additions and 394 deletions

250
config/install_agent.tpl Normal file
View File

@ -0,0 +1,250 @@
#!/bin/bash
# Agent install script for UNIX-like OS
# Author: INFINI
# BASE_URL : need, download server addresseg: https://release.infinilabs.com/agent/stable
# AGENT_VER : need, Agent version, eg: 0.4.0-126
# INSTALL_PATH : option, download path. eg: /home/user/infini default: /opt
# ES_NAME
# ES_PWD
printf "\n* _ ___ __ __ _____ "
printf "\n* /_\\ / _ \\ /__\\/\\ \\ \\/__ \\"
printf "\n* //_\\\\ / /_\\//_\\ / \\/ / / /\\/"
printf "\n* / _ \\/ /_\\\\//__/ /\\ / / / "
printf "\n* \\_/ \\_/\\____/\\__/\\_\\ \\/ \\/ \n\n"
# detect root user
if [ "$(echo "$UID")" = "0" ]; then
sudo_cmd=''
else
sudo_cmd='sudo'
fi
##################
# colors
##################
RED="\033[31m"
CLR="\033[0m"
GREEN="\033[32m"
##################
# validate os & arch
##################
arch=
case $(uname -m) in
"x86_64")
arch="amd64"
;;
"i386" | "i686")
arch="386"
;;
"aarch64")
arch="arm64"
;;
"arm" | "armv7l")
arch="arm"
;;
"arm64")
arch="arm64"
;;
*)
# shellcheck disable=SC2059
printf "${RED}[E] Unsupport arch $(uname -m) ${CLR}\n"
exit 1
;;
esac
os="linux"
if [[ "$OSTYPE" == "darwin"* ]]; then
if [[ $arch != "amd64" ]] && [[ $arch != "arm64" ]]; then # Darwin only support amd64 and arm64
# shellcheck disable=SC2059
printf "${RED}[E] Darwin only support amd64/arm64.${CLR}\n"
exit 1;
fi
os="mac"
# # NOTE: under darwin, for arm64 and amd64, both use amd64
# arch="arm"
fi
##################
# validate params
##################
base_url="{{base_url}}"
if [ -n "$BASE_URL" ]; then
base_url=$BASE_URL
fi
agent_ver="{{agent_version}}"
if [ -n "$AGENT_VER" ]; then
agent_ver=$AGENT_VER
fi
ca_crt="{{ca_crt}}"
client_crt="{{client_crt}}"
client_key="{{client_key}}"
##################
# download agent
##################
suffix="tar.gz"
if [[ "$os" == "mac" ]]; then
suffix="zip"
fi
download_url="${base_url}/agent-${agent_ver}-${os}-${arch}.${suffix}"
install_path="/opt"
if [ -n "$INSTALL_PATH" ]; then
install_path=$INSTALL_PATH
fi
file_name="agent-${agent_ver}-${os}-${arch}.${suffix}" #agent在服务器上的文件名
agent="${install_path}/agent/${file_name}" #agent下载后保存的文件
agent_exc="${install_path}/agent/agent-${os}-${arch}" #agent可执行文件
agent_exsit="true"
if [ ! -d "${install_path}/agent" ]; then
printf "\n* mkdir ${install_path}/agent"
$sudo_cmd mkdir "${install_path}/agent"
agent_exsit="false"
fi
if [ $? -ne 0 ]; then
exit 1
fi
printf "\n* downloading ${download_url}\n"
printf "\n* save to : ${agent}\n"
cd "$install_path/agent"
sudo curl -O --progress-bar $download_url
if [ $? -ne 0 ]; then
exit 1
fi
printf "\n* downloaded: ${agent}"
##################
# install agent
##################
printf "\n* start install"
if [[ "${suffix}" == "zip" ]]; then
printf "\n* uzip ${agent}\n"
$sudo_cmd unzip $agent
else
printf "\n* tar -xzvf ${agent}\n"
$sudo_cmd tar -xzvf $agent
fi
if [ $? -ne 0 ]; then
exit 1
fi
rm -f ${agent}
##################
# save cert
##################
$sudo_cmd mkdir config
$sudo_cmd sh -c "echo '${ca_crt}' > ./config/ca.crt"
$sudo_cmd sh -c "echo '${client_crt}' > ./config/client.crt"
$sudo_cmd sh -c "echo '${client_key}' > ./config/client.key"
if [ $? -ne 0 ]; then
exit 1
fi
port="{{port}}"
## generate agent.yml
agent_config="path.configs: "config"
configs.auto_reload: true
env:
API_BINDING: "0.0.0.0:${port}"
path.data: data
path.logs: log
api:
enabled: true
tls:
enabled: true
cert_file: "${install_path}/agent/config/client.crt"
key_file: "${install_path}/agent/config/client.key"
ca_file: "${install_path}/agent/config/ca.crt"
skip_insecure_verify: false
network:
binding: \$[[env.API_BINDING]]
badger:
value_log_max_entries: 1000000
value_log_file_size: 104857600
value_threshold: 1024
agent:
major_ip_pattern: "192.*"
"
agent_yml_path="${install_path}/agent/agent.yml"
$sudo_cmd rm $agent_yml_path
$sudo_cmd touch $agent_yml_path
$sudo_cmd sh -c "echo '${agent_config}' > $agent_yml_path"
if [ $? -ne 0 ]; then
exit 1
fi
$sudo_cmd chmod +x $agent_exc
#try to stop and uninstall service
if [[ "$agent_exsit" == "true" ]]; then
printf "\n* stop && uninstall service\n"
$sudo_cmd $agent_exc -service stop
$sudo_cmd $agent_exc -service uninstall
fi
printf "\n* start install service\n"
$sudo_cmd $agent_exc -service install
if [ $? -ne 0 ]; then
exit 1
fi
printf "\n* service installed\n"
printf "\n* service starting >>>>>>\n"
$sudo_cmd $agent_exc -service start
if [ $? -ne 0 ]; then
exit 1
fi
printf "\n* agent service started"
console_endpoint="{{console_endpoint}}"
sleep 3
printf "\n* start register\n"
token={{token}}
curl -X POST ${console_endpoint}/agent/instance?token=${token}
if [ $? -ne 0 ]; then
exit 1
fi
printf "\n* agent registered\n"
printf "\n* ${GREEN}Congratulations, install success!${CLR}\n\n"

View File

@ -7,7 +7,10 @@ package agent
import ( import (
log "github.com/cihub/seelog" log "github.com/cihub/seelog"
"infini.sh/console/modules/agent/api" "infini.sh/console/modules/agent/api"
"infini.sh/console/modules/agent/client"
"infini.sh/console/modules/agent/common" "infini.sh/console/modules/agent/common"
"infini.sh/console/modules/agent/model"
"infini.sh/console/modules/agent/state"
"infini.sh/framework/core/agent" "infini.sh/framework/core/agent"
"infini.sh/framework/core/env" "infini.sh/framework/core/env"
"infini.sh/framework/core/host" "infini.sh/framework/core/host"
@ -38,7 +41,22 @@ func (module *AgentModule) Start() error {
orm.RegisterSchemaWithIndexName(agent.ESNodeInfo{}, "agent-node") orm.RegisterSchemaWithIndexName(agent.ESNodeInfo{}, "agent-node")
orm.RegisterSchemaWithIndexName(host.HostInfo{}, "host") orm.RegisterSchemaWithIndexName(host.HostInfo{}, "host")
orm.RegisterSchemaWithIndexName(agent.Setting{}, "agent-setting") orm.RegisterSchemaWithIndexName(agent.Setting{}, "agent-setting")
common.RegisterClient(&common.Client{}) var (
executor client.Executor
err error
)
if module.AgentConfig.Setup == nil {
executor = &client.HttpExecutor{}
}else{
executor, err = client.NewMTLSExecutor(module.AgentConfig.Setup.CACertFile, module.AgentConfig.Setup.CAKeyFile)
if err != nil {
panic(err)
}
}
agClient := &client.Client{
Executor: executor,
}
client.RegisterClient(agClient)
if module.AgentConfig.StateManager.Enabled { if module.AgentConfig.StateManager.Enabled {
onlineAgentIDs, err := common.GetLatestOnlineAgentIDs(nil, 60) onlineAgentIDs, err := common.GetLatestOnlineAgentIDs(nil, 60)
@ -56,8 +74,8 @@ func (module *AgentModule) Start() error {
} }
} }
sm := common.NewStateManager(time.Second*30, "agent_state", agentIds) sm := state.NewStateManager(time.Second*30, "agent_state", agentIds, agClient)
common.RegisterStateManager(sm) state.RegisterStateManager(sm)
go sm.LoopState() go sm.LoopState()
} }
return nil return nil
@ -69,12 +87,12 @@ func (module *AgentModule) Stop() error {
} }
log.Info("start to stop agent module") log.Info("start to stop agent module")
if module.AgentConfig.StateManager.Enabled { if module.AgentConfig.StateManager.Enabled {
common.GetStateManager().Stop() state.GetStateManager().Stop()
} }
log.Info("agent module was stopped") log.Info("agent module was stopped")
return nil return nil
} }
type AgentModule struct { type AgentModule struct {
common.AgentConfig model.AgentConfig
} }

View File

@ -7,13 +7,13 @@ package api
import ( import (
"context" "context"
"fmt" "fmt"
common2 "infini.sh/console/modules/agent/common" log "github.com/cihub/seelog"
"infini.sh/console/modules/agent/state"
httprouter "infini.sh/framework/core/api/router" httprouter "infini.sh/framework/core/api/router"
"infini.sh/framework/core/host" "infini.sh/framework/core/host"
"infini.sh/framework/core/orm" "infini.sh/framework/core/orm"
"infini.sh/framework/core/util" "infini.sh/framework/core/util"
"net/http" "net/http"
log "github.com/cihub/seelog"
"time" "time"
) )
@ -107,7 +107,7 @@ func (h *APIHandler) GetHostAgentInfo(w http.ResponseWriter, req *http.Request,
return return
} }
sm := common2.GetStateManager() sm := state.GetStateManager()
ag, err := sm.GetAgent(hostInfo.AgentID) ag, err := sm.GetAgent(hostInfo.AgentID)
if err != nil { if err != nil {
log.Error(err) log.Error(err)
@ -159,7 +159,7 @@ func (h *APIHandler) GetHostElasticProcess(w http.ResponseWriter, req *http.Requ
h.WriteJSON(w, util.MapStr{}, http.StatusOK) h.WriteJSON(w, util.MapStr{}, http.StatusOK)
return return
} }
sm := common2.GetStateManager() sm := state.GetStateManager()
ag, err := sm.GetAgent(hostInfo.AgentID) ag, err := sm.GetAgent(hostInfo.AgentID)
if err != nil { if err != nil {
log.Error(err) log.Error(err)
@ -193,7 +193,7 @@ func (h *APIHandler) GetHostElasticProcess(w http.ResponseWriter, req *http.Requ
} }
func enrollHostFromAgent(agentID string) (*host.HostInfo, error){ func enrollHostFromAgent(agentID string) (*host.HostInfo, error){
sm := common2.GetStateManager() sm := state.GetStateManager()
ag, err := sm.GetAgent(agentID) ag, err := sm.GetAgent(agentID)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -23,8 +23,13 @@ func Init() {
api.HandleAPIMethod(api.POST, "/agent/instance/:instance_id/node/_auth", handler.RequirePermission(handler.authESNode, enum.PermissionAgentInstanceWrite)) api.HandleAPIMethod(api.POST, "/agent/instance/:instance_id/node/_auth", handler.RequirePermission(handler.authESNode, enum.PermissionAgentInstanceWrite))
api.HandleAPIMethod(api.DELETE, "/agent/instance/:instance_id/_nodes", handler.RequirePermission(handler.deleteESNode, enum.PermissionAgentInstanceWrite)) api.HandleAPIMethod(api.DELETE, "/agent/instance/:instance_id/_nodes", handler.RequirePermission(handler.deleteESNode, enum.PermissionAgentInstanceWrite))
api.HandleAPIMethod(api.POST, "/agent/instance/:instance_id/node/_associate", handler.RequirePermission(handler.associateESNode, enum.PermissionAgentInstanceWrite)) api.HandleAPIMethod(api.POST, "/agent/instance/:instance_id/node/_associate", handler.RequirePermission(handler.associateESNode, enum.PermissionAgentInstanceWrite))
api.HandleAPIMethod(api.POST, "/agent/instance/try_connect", handler.RequireLogin(handler.tryConnect))
api.HandleAPIMethod(api.POST, "/host/_enroll", handler.enrollHost) api.HandleAPIMethod(api.POST, "/host/_enroll", handler.enrollHost)
api.HandleAPIMethod(api.GET, "/host/:host_id/agent/info",handler.GetHostAgentInfo) api.HandleAPIMethod(api.GET, "/host/:host_id/agent/info",handler.GetHostAgentInfo)
api.HandleAPIMethod(api.GET, "/host/:host_id/processes",handler.GetHostElasticProcess) api.HandleAPIMethod(api.GET, "/host/:host_id/processes",handler.GetHostElasticProcess)
api.HandleAPIMethod(api.POST, "/agent/install_command", handler.RequireLogin(handler.generateInstallCommand))
api.HandleAPIMethod(api.GET, "/agent/install.sh", handler.getInstallScript)
} }

View File

@ -8,20 +8,22 @@ import (
"context" "context"
"fmt" "fmt"
log "github.com/cihub/seelog" log "github.com/cihub/seelog"
"infini.sh/console/modules/agent/client"
common2 "infini.sh/console/modules/agent/common"
"infini.sh/console/modules/agent/model"
"infini.sh/console/modules/agent/state"
"infini.sh/framework/core/agent" "infini.sh/framework/core/agent"
"infini.sh/framework/core/api" "infini.sh/framework/core/api"
httprouter "infini.sh/framework/core/api/router" httprouter "infini.sh/framework/core/api/router"
"infini.sh/framework/core/elastic" "infini.sh/framework/core/elastic"
"infini.sh/framework/core/event" "infini.sh/framework/core/event"
"infini.sh/framework/core/host"
"infini.sh/framework/core/orm" "infini.sh/framework/core/orm"
"infini.sh/framework/core/util" "infini.sh/framework/core/util"
common2 "infini.sh/console/modules/agent/common"
elastic2 "infini.sh/framework/modules/elastic" elastic2 "infini.sh/framework/modules/elastic"
"infini.sh/framework/modules/elastic/adapter"
"infini.sh/framework/modules/elastic/common" "infini.sh/framework/modules/elastic/common"
"net/http" "net/http"
"strconv" "strconv"
"time"
) )
type APIHandler struct { type APIHandler struct {
@ -36,24 +38,32 @@ func (h *APIHandler) createInstance(w http.ResponseWriter, req *http.Request, ps
log.Error(err) log.Error(err)
return return
} }
//validate token for auto register
oldInst := &agent.Instance{} token := h.GetParameter(req, "token")
oldInst.ID = obj.ID if token != "" {
exists, err := orm.Get(oldInst) if v, ok := tokens.Load(token); !ok {
h.WriteError(w, "token is invalid", http.StatusUnauthorized)
if err != nil && err != elastic2.ErrNotFound { return
h.WriteError(w, err.Error(), http.StatusInternalServerError) }else{
log.Error(err) if t, ok := v.(*Token); !ok || t.CreatedAt.Add(ExpiredIn).Before(time.Now()) {
tokens.Delete(token)
h.WriteError(w, "token was expired", http.StatusUnauthorized)
return return
} }
if exists {
errMsg := fmt.Sprintf("agent [%s] already exists", obj.ID)
h.WriteError(w, errMsg, http.StatusInternalServerError)
log.Error(errMsg)
return
} }
remoteIP := util.ClientIP(req)
agCfg := common2.GetAgentConfig()
port := agCfg.Setup.Port
if port == "" {
port = "8080"
}
obj.Endpoint = fmt.Sprintf("https://%s:%s", remoteIP, port)
obj.Tags = append(obj.Tags, "mtls")
obj.Tags = append(obj.Tags, "auto")
}
//fetch more information of agent instance //fetch more information of agent instance
res, err := common2.GetClient().GetInstanceBasicInfo(context.Background(), obj.GetEndpoint()) res, err := client.GetClient().GetInstanceBasicInfo(context.Background(), obj.GetEndpoint())
if err != nil { if err != nil {
errStr := fmt.Sprintf("get agent instance basic info error: %s", err.Error()) errStr := fmt.Sprintf("get agent instance basic info error: %s", err.Error())
h.WriteError(w,errStr , http.StatusInternalServerError) h.WriteError(w,errStr , http.StatusInternalServerError)
@ -71,9 +81,27 @@ func (h *APIHandler) createInstance(w http.ResponseWriter, req *http.Request, ps
obj.MajorIP = res.MajorIP obj.MajorIP = res.MajorIP
obj.Host = res.Host obj.Host = res.Host
obj.IPS = res.IPS obj.IPS = res.IPS
if obj.Name == "" {
obj.Name = res.Name
}
}
oldInst := &agent.Instance{}
oldInst.ID = obj.ID
exists, err := orm.Get(oldInst)
if err != nil && err != elastic2.ErrNotFound {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err)
return
}
if exists {
errMsg := fmt.Sprintf("agent [%s] already exists", obj.ID)
h.WriteError(w, errMsg, http.StatusInternalServerError)
log.Error(errMsg)
return
} }
obj.Status = common2.StatusOnline obj.Status = model.StatusOnline
err = orm.Create(nil, obj) err = orm.Create(nil, obj)
if err != nil { if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError) h.WriteError(w, err.Error(), http.StatusInternalServerError)
@ -84,39 +112,29 @@ func (h *APIHandler) createInstance(w http.ResponseWriter, req *http.Request, ps
if err != nil { if err != nil {
log.Error(err) log.Error(err)
} }
err = pushIngestConfigToAgent(obj)
if err != nil {
log.Error(err)
}
h.WriteCreatedOKJSON(w, obj.ID) h.WriteCreatedOKJSON(w, obj.ID)
} }
func bindAgentToHostByIP(ag *agent.Instance) error{ func pushIngestConfigToAgent(inst *agent.Instance) error{
err, result := orm.GetBy("ip", ag.MajorIP, host.HostInfo{}) ingestCfg, basicAuth, err := common2.GetAgentIngestConfig()
if err != nil { if err != nil {
return err return err
} }
if len(result.Result) > 0 { if basicAuth != nil && basicAuth.Password != "" {
buf := util.MustToJSONBytes(result.Result[0]) err = client.GetClient().SetKeystoreValue(context.Background(), inst.GetEndpoint(), "ingest_cluster_password", basicAuth.Password)
hostInfo := &host.HostInfo{}
err = util.FromJSONBytes(buf, hostInfo)
if err != nil { if err != nil {
return err return fmt.Errorf("set keystore value to agent error: %w", err)
}
sm := common2.GetStateManager()
if ag.Status == "" {
_, err1 := sm.GetAgentClient().GetHostInfo(nil, ag.GetEndpoint())
if err1 == nil {
ag.Status = "online"
}else{
ag.Status = "offline"
} }
} }
err = client.GetClient().SaveDynamicConfig(context.Background(), inst.GetEndpoint(), "ingest", ingestCfg )
hostInfo.AgentStatus = ag.Status
hostInfo.AgentID = ag.ID
err = orm.Update(nil, hostInfo)
if err != nil { if err != nil {
return err fmt.Errorf("save dynamic config to agent error: %w", err)
}
} }
return nil return nil
} }
@ -169,14 +187,42 @@ func (h *APIHandler) deleteInstance(w http.ResponseWriter, req *http.Request, ps
log.Error(err) log.Error(err)
return return
} }
if sm := common2.GetStateManager(); sm != nil { if sm := state.GetStateManager(); sm != nil {
sm.DeleteAgent(obj.ID) sm.DeleteAgent(obj.ID)
} }
queryDsl := util.MapStr{
"query": util.MapStr{
"term": util.MapStr{
"agent_id": util.MapStr{
"value": id,
},
},
},
}
err = orm.DeleteBy(agent.ESNodeInfo{}, util.MustToJSONBytes(queryDsl))
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error("delete node info error: ", err)
return
}
h.WriteJSON(w, util.MapStr{ queryDsl = util.MapStr{
"_id": obj.ID, "query": util.MapStr{
"result": "deleted", "term": util.MapStr{
}, 200) "metadata.labels.agent_id": util.MapStr{
"value": id,
},
},
},
}
err = orm.DeleteBy(agent.Setting{}, util.MustToJSONBytes(queryDsl))
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error("delete agent settings error: ", err)
return
}
h.WriteDeletedOKJSON(w, id)
} }
func (h *APIHandler) getInstanceStats(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { func (h *APIHandler) getInstanceStats(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
@ -410,7 +456,7 @@ func (h *APIHandler) authESNode(w http.ResponseWriter, req *http.Request, ps htt
return return
} }
cfg.BasicAuth = &basicAuth cfg.BasicAuth = &basicAuth
nodeInfo, err := common2.GetClient().AuthESNode(context.Background(), inst.GetEndpoint(), *cfg) nodeInfo, err := client.GetClient().AuthESNode(context.Background(), inst.GetEndpoint(), *cfg)
if err != nil { if err != nil {
log.Error(err) log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError) h.WriteError(w, err.Error(), http.StatusInternalServerError)
@ -519,12 +565,50 @@ func (h *APIHandler) deleteESNode(w http.ResponseWriter, req *http.Request, ps h
h.WriteError(w, err.Error(), http.StatusInternalServerError) h.WriteError(w, err.Error(), http.StatusInternalServerError)
return return
} }
q = util.MapStr{
"query": util.MapStr{
"bool": util.MapStr{
"must": []util.MapStr{
{
"terms": util.MapStr{
"metadata.labels.node_uuid": nodeIDs,
},
},
{
"term": util.MapStr{
"metadata.labels.agent_id": util.MapStr{
"value": id,
},
},
},
},
},
},
}
} }
h.WriteAckOKJSON(w) h.WriteAckOKJSON(w)
} }
func (h *APIHandler) tryConnect(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
var reqBody = struct {
Endpoint string `json:"endpoint"`
BasicAuth agent.BasicAuth
}{}
err := h.DecodeJSON(req, &reqBody)
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
connectRes, err := client.GetClient().GetInstanceBasicInfo(context.Background(), reqBody.Endpoint)
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
h.WriteJSON(w, connectRes, http.StatusOK)
}
func refreshNodesInfo(inst *agent.Instance) ([]agent.ESNodeInfo, error) { func refreshNodesInfo(inst *agent.Instance) ([]agent.ESNodeInfo, error) {
nodesInfo, err := common2.GetClient().GetElasticsearchNodes(context.Background(), inst.GetEndpoint()) nodesInfo, err := client.GetClient().GetElasticsearchNodes(context.Background(), inst.GetEndpoint())
if err != nil { if err != nil {
return nil, fmt.Errorf("get elasticsearch nodes error: %w", err) return nil, fmt.Errorf("get elasticsearch nodes error: %w", err)
} }
@ -534,7 +618,6 @@ func refreshNodesInfo(inst *agent.Instance) ([]agent.ESNodeInfo, error) {
} }
oldPids := map[int]struct{}{} oldPids := map[int]struct{}{}
var resultNodes []agent.ESNodeInfo var resultNodes []agent.ESNodeInfo
//settings, err := common2.GetAgentSettings(inst.ID, 0)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -555,24 +638,6 @@ func refreshNodesInfo(inst *agent.Instance) ([]agent.ESNodeInfo, error) {
if oldNode != nil && oldNode.ClusterID != "" { if oldNode != nil && oldNode.ClusterID != "" {
node.ClusterID = oldNode.ClusterID node.ClusterID = oldNode.ClusterID
} }
//else{
// if cfg := clusterCfgs[node.ClusterUuid]; cfg != nil {
// node.ClusterID = cfg.ID
// setting := pickAgentSettings(settings, node)
// if setting == nil {
// setting, err = getAgentTaskSetting(inst.ID, node)
// if err != nil {
// log.Error()
// }
// err = orm.Create(nil, setting)
// if err != nil {
// log.Error("save agent task setting error: ", err)
// }
// }
// }else{
// //cluster not registered in console
// }
//}
} }
node.Status = "online" node.Status = "online"
@ -626,6 +691,7 @@ func getNodesInfoFromES(agentID string) (map[int]*agent.ESNodeInfo, error){
}, },
}, },
}, },
} }
q := orm.Query{ q := orm.Query{
RawQuery: util.MustToJSONBytes(query), RawQuery: util.MustToJSONBytes(query),
@ -645,26 +711,6 @@ func getNodesInfoFromES(agentID string) (map[int]*agent.ESNodeInfo, error){
return nodesInfo, nil return nodesInfo, nil
} }
func getClusterConfigs() map[string]*elastic.ElasticsearchConfig {
cfgs := map[string]*elastic.ElasticsearchConfig{}
elastic.WalkConfigs(func(key, value interface{}) bool {
if cfg, ok := value.(*elastic.ElasticsearchConfig); ok {
clusterUUID := cfg.ClusterUUID
if cfg.ClusterUUID == "" {
verInfo, err := adapter.ClusterVersion(elastic.GetMetadata(cfg.ID))
if err != nil {
log.Error(err)
return true
}
clusterUUID = verInfo.ClusterUUID
}
cfgs[clusterUUID] = cfg
}
return true
})
return cfgs
}
func pickAgentSettings(settings []agent.Setting, nodeInfo agent.ESNodeInfo) *agent.Setting { func pickAgentSettings(settings []agent.Setting, nodeInfo agent.ESNodeInfo) *agent.Setting {
for _, setting := range settings { for _, setting := range settings {
if setting.Metadata.Labels["node_uuid"] == nodeInfo.NodeUUID { if setting.Metadata.Labels["node_uuid"] == nodeInfo.NodeUUID {
@ -698,7 +744,7 @@ func getAgentTaskSetting(agentID string, node agent.ESNodeInfo) (*agent.Setting,
} }
// getSettingsByClusterID query agent task settings with cluster id // getSettingsByClusterID query agent task settings with cluster id
func getSettingsByClusterID(clusterID string) (*common2.TaskSetting, error) { func getSettingsByClusterID(clusterID string) (*model.TaskSetting, error) {
queryDsl := util.MapStr{ queryDsl := util.MapStr{
"size": 200, "size": 200,
"query": util.MapStr{ "query": util.MapStr{
@ -746,8 +792,8 @@ func getSettingsByClusterID(clusterID string) (*common2.TaskSetting, error) {
return nil, err return nil, err
} }
setting := &common2.TaskSetting{ setting := &model.TaskSetting{
NodeStats: &common2.NodeStatsTask{ NodeStats: &model.NodeStatsTask{
Enabled: true, Enabled: true,
}, },
} }
@ -776,17 +822,17 @@ func getSettingsByClusterID(clusterID string) (*common2.TaskSetting, error) {
} }
} }
if clusterStats { if clusterStats {
setting.ClusterStats = &common2.ClusterStatsTask{ setting.ClusterStats = &model.ClusterStatsTask{
Enabled: true, Enabled: true,
} }
} }
if indexStats { if indexStats {
setting.IndexStats = &common2.IndexStatsTask{ setting.IndexStats = &model.IndexStatsTask{
Enabled: true, Enabled: true,
} }
} }
if clusterHealth { if clusterHealth {
setting.ClusterHealth = &common2.ClusterHealthTask{ setting.ClusterHealth = &model.ClusterHealthTask{
Enabled: true, Enabled: true,
} }
} }

View File

@ -7,7 +7,8 @@ package api
import ( import (
"fmt" "fmt"
log "github.com/cihub/seelog" log "github.com/cihub/seelog"
"infini.sh/console/modules/agent/common" "infini.sh/console/modules/agent/client"
"infini.sh/console/modules/agent/state"
"infini.sh/framework/core/agent" "infini.sh/framework/core/agent"
httprouter "infini.sh/framework/core/api/router" httprouter "infini.sh/framework/core/api/router"
"infini.sh/framework/core/orm" "infini.sh/framework/core/orm"
@ -31,7 +32,7 @@ func (h *APIHandler) getLogFilesByNode(w http.ResponseWriter, req *http.Request,
}, http.StatusOK) }, http.StatusOK)
return return
} }
logFiles, err := common.GetClient().GetElasticLogFiles(nil, inst.GetEndpoint(), node.Path.Logs) logFiles, err := client.GetClient().GetElasticLogFiles(nil, inst.GetEndpoint(), node.Path.Logs)
if err != nil { if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError) h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err) log.Error(err)
@ -68,7 +69,7 @@ func (h *APIHandler) getLogFileContent(w http.ResponseWriter, req *http.Request,
return return
} }
reqBody.LogsPath = node.Path.Logs reqBody.LogsPath = node.Path.Logs
sm := common.GetStateManager() sm := state.GetStateManager()
res, err := sm.GetAgentClient().GetElasticLogFileContent(nil, inst.GetEndpoint(), reqBody) res, err := sm.GetAgentClient().GetElasticLogFileContent(nil, inst.GetEndpoint(), reqBody)
if err != nil { if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError) h.WriteError(w, err.Error(), http.StatusInternalServerError)
@ -88,6 +89,13 @@ func getAgentByNodeID(nodeID string) (*agent.Instance, *agent.ESNodeInfo, error)
}, },
}, },
}, },
"sort": []util.MapStr{
{
"timestamp": util.MapStr{
"order": "desc",
},
},
},
} }
q := &orm.Query{ q := &orm.Query{
RawQuery: util.MustToJSONBytes(queryDsl), RawQuery: util.MustToJSONBytes(queryDsl),

140
modules/agent/api/setup.go Normal file
View File

@ -0,0 +1,140 @@
/* Copyright © INFINI Ltd. All rights reserved.
* Web: https://infinilabs.com
* Email: hello#infini.ltd */
package api
import (
"fmt"
log "github.com/cihub/seelog"
"github.com/valyala/fasttemplate"
"infini.sh/console/modules/agent/common"
"infini.sh/framework/core/api/rbac"
httprouter "infini.sh/framework/core/api/router"
"infini.sh/framework/core/global"
"infini.sh/framework/core/util"
"os"
"net/http"
"path"
"strings"
"sync"
"time"
)
var tokens = sync.Map{}
type Token struct {
CreatedAt time.Time
UserID string
}
const ExpiredIn = time.Millisecond * 1000 * 60 * 20
func (h *APIHandler) generateInstallCommand(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
claims, ok := req.Context().Value("user").(*rbac.UserClaims)
if !ok {
h.WriteError(w, "user not found", http.StatusInternalServerError)
return
}
var (
t *Token
tokenStr string
)
tokens.Range(func(key, value any) bool {
if v, ok := value.(*Token); ok && claims.UserId == v.UserID {
t = v
tokenStr = key.(string)
return false
}
return true
})
if t == nil {
tokenStr = util.GetUUID()
t = &Token{
CreatedAt: time.Now(),
UserID: claims.UserId,
}
}else{
if t.CreatedAt.Add(ExpiredIn).Before(time.Now()){
tokens.Delete(tokenStr)
tokenStr = util.GetUUID()
t = &Token{
CreatedAt: time.Now(),
UserID: claims.UserId,
}
}else{
t.CreatedAt = time.Now()
}
}
tokens.Store(tokenStr, t)
agCfg := common.GetAgentConfig()
consoleEndpoint := agCfg.Setup.ConsoleEndpoint
if consoleEndpoint == "" {
scheme := "http"
if req.TLS != nil {
scheme = "https"
}
consoleEndpoint = fmt.Sprintf("%s://%s", scheme, req.Host)
}
h.WriteJSON(w, util.MapStr{
"script": fmt.Sprintf(`sudo bash -c "$(curl -L '%s/agent/install.sh?token=%s')"`, consoleEndpoint, tokenStr),
"token": tokenStr,
"expired_at": t.CreatedAt.Add(ExpiredIn),
}, http.StatusOK)
}
func (h *APIHandler) getInstallScript(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
tokenStr := h.GetParameter(req, "token")
if strings.TrimSpace(tokenStr) == "" {
h.WriteError(w, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized)
return
}
if v, ok := tokens.Load(tokenStr); !ok {
h.WriteError(w, "token is invalid", http.StatusUnauthorized)
return
}else{
if t, ok := v.(*Token); !ok || t.CreatedAt.Add(ExpiredIn).Before(time.Now()) {
tokens.Delete(tokenStr)
h.WriteError(w, "token was expired", http.StatusUnauthorized)
return
}
}
agCfg := common.GetAgentConfig()
caCert, clientCertPEM, clientKeyPEM, err := common.GenerateServerCert(agCfg.Setup.CACertFile, agCfg.Setup.CAKeyFile)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
scriptTplPath := path.Join(global.Env().GetConfigDir(), "install_agent.tpl")
buf, err := os.ReadFile(scriptTplPath)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
tpl := fasttemplate.New(string(buf), "{{", "}}")
downloadURL := agCfg.Setup.DownloadURL
if downloadURL == "" {
downloadURL = "https://release.infinilabs.com/agent/stable/"
}
port := agCfg.Setup.Port
if port == "" {
port = "8080"
}
_, err = tpl.Execute(w, map[string]interface{}{
"base_url": agCfg.Setup.DownloadURL,
"agent_version": agCfg.Setup.Version,
"console_endpoint": agCfg.Setup.ConsoleEndpoint,
"client_crt": clientCertPEM,
"client_key": clientKeyPEM,
"ca_crt": caCert,
"port": port,
"token": tokenStr,
})
if err != nil {
log.Error(err)
}
}

View File

@ -2,7 +2,7 @@
* Web: https://infinilabs.com * Web: https://infinilabs.com
* Email: hello#infini.ltd */ * Email: hello#infini.ltd */
package common package client
import ( import (
"context" "context"
@ -14,8 +14,37 @@ import (
"net/http" "net/http"
) )
type Client struct { var defaultClient ClientAPI
func GetClient() ClientAPI {
if defaultClient == nil {
panic("agent client not init")
} }
return defaultClient
}
func RegisterClient(client ClientAPI) {
defaultClient = client
}
type ClientAPI interface {
GetHostInfo(ctx context.Context, agentBaseURL string) (*host.HostInfo, error)
GetElasticProcess(ctx context.Context, agentBaseURL string, agentID string)(interface{}, error)
GetElasticLogFiles(ctx context.Context, agentBaseURL string, logsPath string)(interface{}, error)
GetElasticLogFileContent(ctx context.Context, agentBaseURL string, body interface{})(interface{}, error)
GetInstanceBasicInfo(ctx context.Context, agentBaseURL string) (*agent.Instance, error)
RegisterElasticsearch(ctx context.Context, agentBaseURL string, cfgs []elastic.ElasticsearchConfig) error
GetElasticsearchNodes(ctx context.Context, agentBaseURL string) ([]agent.ESNodeInfo, error)
AuthESNode(ctx context.Context, agentBaseURL string, cfg elastic.ElasticsearchConfig) (*agent.ESNodeInfo, error)
CreatePipeline(ctx context.Context, agentBaseURL string, body []byte) error
DeletePipeline(ctx context.Context, agentBaseURL, pipelineID string) error
SetKeystoreValue(ctx context.Context, agentBaseURL string, key, value string) error
SaveDynamicConfig(ctx context.Context, agentBaseURL string, name, content string) error
}
type Client struct {
Executor Executor
}
func (client *Client) GetHostInfo(ctx context.Context, agentBaseURL string) (*host.HostInfo, error) { func (client *Client) GetHostInfo(ctx context.Context, agentBaseURL string) (*host.HostInfo, error) {
req := &util.Request{ req := &util.Request{
@ -188,16 +217,37 @@ func (client *Client) DeletePipeline(ctx context.Context, agentBaseURL, pipeline
return client.doRequest(req, nil) return client.doRequest(req, nil)
} }
func (client *Client) SetKeystoreValue(ctx context.Context, agentBaseURL string, key, value string) error{
body := util.MapStr{
"key": key,
"value": value,
}
req := &util.Request{
Method: http.MethodPost,
Url: fmt.Sprintf("%s/_framework/keystore", agentBaseURL),
Context: ctx,
Body: util.MustToJSONBytes(body),
}
return client.doRequest(req, nil)
}
func (client *Client) SaveDynamicConfig(ctx context.Context, agentBaseURL string, name, content string) error{
body := util.MapStr{
"configs": util.MapStr{
name: content,
},
}
req := &util.Request{
Method: http.MethodPost,
Url: fmt.Sprintf("%s/agent/config", agentBaseURL),
Context: ctx,
Body: util.MustToJSONBytes(body),
}
return client.doRequest(req, nil)
}
func (client *Client) doRequest(req *util.Request, respObj interface{}) error { func (client *Client) doRequest(req *util.Request, respObj interface{}) error {
result, err := util.ExecuteRequest(req) return client.Executor.DoRequest(req, respObj)
if err != nil {
return err
}
if result.StatusCode != 200 {
return fmt.Errorf(string(result.Body))
}
if respObj == nil {
return nil
}
return util.FromJSONBytes(result.Body, respObj)
} }

View File

@ -0,0 +1,100 @@
/* Copyright © INFINI Ltd. All rights reserved.
* Web: https://infinilabs.com
* Email: hello#infini.ltd */
package client
import (
"bytes"
"fmt"
"infini.sh/console/modules/agent/common"
"infini.sh/framework/core/util"
"io"
"net/http"
)
type Executor interface {
DoRequest(req *util.Request, respObj interface{}) error
}
type HttpExecutor struct {
}
func (executor *HttpExecutor) DoRequest(req *util.Request, respObj interface{}) error {
result, err := util.ExecuteRequest(req)
if err != nil {
return err
}
if result.StatusCode != 200 {
return fmt.Errorf(string(result.Body))
}
if respObj == nil {
return nil
}
return util.FromJSONBytes(result.Body, respObj)
}
func NewMTLSExecutor(caCertFile, caKeyFile string) (*MTLSExecutor, error){
var (
instanceCrt string
instanceKey string
)
instanceCrt, instanceKey, err := common.GetAgentInstanceCerts(caCertFile, caKeyFile)
if err != nil {
return nil, fmt.Errorf("generate tls cert error: %w", err)
}
hClient, err := util.NewMTLSClient(caCertFile, instanceCrt, instanceKey)
if err != nil {
return nil, err
}
return &MTLSExecutor{
CaCertFile: caCertFile,
CAKeyFile: caKeyFile,
client: hClient,
}, nil
}
type MTLSExecutor struct {
CaCertFile string
CAKeyFile string
client *http.Client
}
func (executor *MTLSExecutor) DoRequest(req *util.Request, respObj interface{}) error {
var reader io.Reader
if len(req.Body) > 0 {
reader = bytes.NewReader(req.Body)
}
var (
hr *http.Request
err error
)
if req.Context == nil {
hr, err = http.NewRequest(req.Method, req.Url, reader)
}else{
hr, err = http.NewRequestWithContext(req.Context, req.Method, req.Url, reader)
}
if err != nil {
return err
}
res, err := executor.client.Do(hr)
if err != nil {
return err
}
defer res.Body.Close()
buf, err := io.ReadAll(res.Body)
if err != nil {
return err
}
if res.StatusCode != 200 {
return fmt.Errorf(string(buf))
}
if respObj != nil {
err = util.FromJSONBytes(buf, respObj)
if err != nil {
return err
}
}
return nil
}

View File

@ -0,0 +1,97 @@
/* Copyright © INFINI Ltd. All rights reserved.
* Web: https://infinilabs.com
* Email: hello#infini.ltd */
package common
import (
"crypto/rsa"
"crypto/x509"
"encoding/pem"
"infini.sh/framework/core/global"
"infini.sh/framework/core/util"
"os"
"path"
)
func GenerateClientCert(caFile, caKey string) (caCert, clientCertPEM, clientKeyPEM []byte, err error){
return generateCert(caFile, caKey, false)
}
func GenerateServerCert(caFile, caKey string) (caCert, serverCertPEM, serverKeyPEM []byte, err error){
return generateCert(caFile, caKey, true)
}
func generateCert(caFile, caKey string, isServer bool)(caCert, instanceCertPEM, instanceKeyPEM []byte, err error){
pool := x509.NewCertPool()
if caFile == "" {
caFile = path.Join(global.Env().GetConfigDir(), "certs", "ca.crt")
}
caCert, err = os.ReadFile(caFile)
if err != nil {
return
}
pool.AppendCertsFromPEM(caCert)
b, _ := pem.Decode(caCert)
var rootCert *x509.Certificate
caCertBytes := b.Bytes
rootCert, err = x509.ParseCertificate(b.Bytes)
if err != nil {
return
}
if caKey == "" {
caKey = path.Join(global.Env().GetConfigDir(), "certs", "ca.key")
}
var keyBytes []byte
keyBytes, err = os.ReadFile(caKey)
if err != nil {
return
}
b, _ = pem.Decode(keyBytes)
certKey, err := util.ParsePrivateKey(b.Bytes)
if err != nil {
return
}
if isServer{
b = &pem.Block{Type: "CERTIFICATE", Bytes: caCertBytes}
certPEM := pem.EncodeToMemory(b)
instanceCertPEM, instanceKeyPEM, err = util.GenerateServerCert(rootCert, certKey.(*rsa.PrivateKey), certPEM, nil)
}else{
_, instanceCertPEM, instanceKeyPEM = util.GetClientCert(rootCert, certKey)
}
return caCert, instanceCertPEM, instanceKeyPEM, nil
}
func GetAgentInstanceCerts(caFile, caKey string) (string, string, error) {
dataDir := global.Env().GetDataDir()
instanceCrt := path.Join(dataDir, "certs/agent/instance.crt")
instanceKey := path.Join(dataDir, "certs/agent/instance.key")
var (
err error
clientCertPEM []byte
clientKeyPEM []byte
)
if util.FileExists(instanceCrt) && util.FileExists(instanceKey) {
return instanceCrt, instanceKey, nil
}
_, clientCertPEM, clientKeyPEM, err = GenerateClientCert(caFile, caKey)
if err != nil {
return "", "", err
}
baseDir := path.Join(dataDir, "certs/agent")
if !util.IsExist(baseDir){
err = os.MkdirAll(baseDir, 0775)
if err != nil {
return "", "", err
}
}
_, err = util.FilePutContentWithByte(instanceCrt, clientCertPEM)
if err != nil {
return "", "", err
}
_, err = util.FilePutContentWithByte(instanceKey, clientKeyPEM)
if err != nil {
return "", "", err
}
return instanceCrt, instanceKey, nil
}

View File

@ -4,18 +4,17 @@
package common package common
type AgentConfig struct { import (
Enabled bool `config:"enabled"` "infini.sh/console/modules/agent/model"
StateManager struct{ "infini.sh/framework/core/env"
Enabled bool `config:"enabled"` )
} `config:"state_manager"`
Setup SetupConfig `config:"setup"`
}
type SetupConfig struct {
DownloadURL string `config:"download_url"` func GetAgentConfig() *model.AgentConfig {
Version string `config:"version"` agentCfg := &model.AgentConfig{}
CACertFile string `config:"ca_cert"` _, err := env.ParseConfig("agent", agentCfg )
CAKeyFile string `config:"ca_key"` if err != nil {
ScriptEndpoint string `config:"script_endpoint"` panic(err)
}
return agentCfg
} }

View File

@ -6,13 +6,18 @@ package common
import ( import (
"fmt" "fmt"
"infini.sh/console/modules/agent/model"
"infini.sh/framework/core/agent" "infini.sh/framework/core/agent"
"infini.sh/framework/core/credential"
"infini.sh/framework/core/elastic" "infini.sh/framework/core/elastic"
"infini.sh/framework/core/event"
"infini.sh/framework/core/orm" "infini.sh/framework/core/orm"
"infini.sh/framework/core/util" "infini.sh/framework/core/util"
log "src/github.com/cihub/seelog"
"strings"
) )
func ParseAgentSettings(settings []agent.Setting)(*ParseAgentSettingsResult, error){ func ParseAgentSettings(settings []agent.Setting)(*model.ParseAgentSettingsResult, error){
var clusterCfgs []elastic.ElasticsearchConfig var clusterCfgs []elastic.ElasticsearchConfig
var ( var (
pipelines []util.MapStr pipelines []util.MapStr
@ -49,7 +54,7 @@ func ParseAgentSettings(settings []agent.Setting)(*ParseAgentSettingsResult, err
if err != nil { if err != nil {
return nil, err return nil, err
} }
taskSetting := TaskSetting{} taskSetting := model.TaskSetting{}
err = util.FromJSONBytes(vBytes, &taskSetting) err = util.FromJSONBytes(vBytes, &taskSetting)
if err != nil { if err != nil {
return nil, err return nil, err
@ -61,17 +66,18 @@ func ParseAgentSettings(settings []agent.Setting)(*ParseAgentSettingsResult, err
pipelines = append(pipelines, partPipelines...) pipelines = append(pipelines, partPipelines...)
toDeletePipelineNames = append(toDeletePipelineNames, partDeletePipelineNames...) toDeletePipelineNames = append(toDeletePipelineNames, partDeletePipelineNames...)
} }
return &ParseAgentSettingsResult{ return &model.ParseAgentSettingsResult{
ClusterConfigs: clusterCfgs, ClusterConfigs: clusterCfgs,
Pipelines: pipelines, Pipelines: pipelines,
ToDeletePipelineNames: toDeletePipelineNames, ToDeletePipelineNames: toDeletePipelineNames,
}, nil }, nil
} }
// GetAgentSettings query agent setting by agent id and updated timestamp,
// if there has any setting was updated, then return setting list includes settings not changed,
// otherwise return empty setting list
func GetAgentSettings(agentID string, timestamp int64) ([]agent.Setting, error) { func GetAgentSettings(agentID string, timestamp int64) ([]agent.Setting, error) {
queryDsl := util.MapStr{ query := util.MapStr{
"size": 500,
"query": util.MapStr{
"bool": util.MapStr{ "bool": util.MapStr{
"must": []util.MapStr{ "must": []util.MapStr{
{ {
@ -95,17 +101,20 @@ func GetAgentSettings(agentID string, timestamp int64) ([]agent.Setting, error)
}, },
}, },
}, },
{ //{
"range": util.MapStr{ // "range": util.MapStr{
"updated": util.MapStr{ // "updated": util.MapStr{
"gt": timestamp, // "gt": timestamp,
}, // },
}, // },
}, //},
},
}, },
}, },
} }
queryDsl := util.MapStr{
"size": 500,
"query": query,
}
q := orm.Query{ q := orm.Query{
RawQuery: util.MustToJSONBytes(queryDsl), RawQuery: util.MustToJSONBytes(queryDsl),
} }
@ -116,7 +125,10 @@ func GetAgentSettings(agentID string, timestamp int64) ([]agent.Setting, error)
if len(result.Result) == 0 { if len(result.Result) == 0 {
return nil, nil return nil, nil
} }
var settings []agent.Setting var (
settings []agent.Setting
hasUpdated bool
)
for _, row := range result.Result { for _, row := range result.Result {
setting := agent.Setting{} setting := agent.Setting{}
buf, err := util.ToJSONBytes(row) buf, err := util.ToJSONBytes(row)
@ -127,12 +139,18 @@ func GetAgentSettings(agentID string, timestamp int64) ([]agent.Setting, error)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if setting.Updated != nil && setting.Updated.UnixMilli() > timestamp {
hasUpdated = true
}
settings = append(settings, setting) settings = append(settings, setting)
} }
if !hasUpdated {
return nil, nil
}
return settings, nil return settings, nil
} }
func TransformSettingsToConfig(setting *TaskSetting, clusterID, nodeUUID string) ([]util.MapStr, []string, error) { func TransformSettingsToConfig(setting *model.TaskSetting, clusterID, nodeUUID string) ([]util.MapStr, []string, error) {
if setting == nil { if setting == nil {
return nil, nil, fmt.Errorf("empty setting") return nil, nil, fmt.Errorf("empty setting")
} }
@ -227,3 +245,237 @@ func getMetricPipelineName(clusterID, processorName string) string{
return fmt.Sprintf("collect_%s_%s", clusterID, processorName) return fmt.Sprintf("collect_%s_%s", clusterID, processorName)
} }
func LoadAgentsFromES(clusterID string) ([]agent.Instance, error) {
q := orm.Query{
Size: 1000,
}
if clusterID != "" {
q.Conds = orm.And(orm.Eq("id", clusterID))
}
err, result := orm.Search(agent.Instance{}, &q)
if err != nil {
return nil, fmt.Errorf("query agent error: %w", err)
}
if len(result.Result) > 0 {
var agents = make([]agent.Instance, 0, len(result.Result))
for _, row := range result.Result {
ag := agent.Instance{}
bytes := util.MustToJSONBytes(row)
err = util.FromJSONBytes(bytes, &ag)
if err != nil {
log.Errorf("got unexpected agent: %s, error: %v", string(bytes), err)
continue
}
agents = append(agents, ag)
}
return agents, nil
}
return nil, nil
}
func GetLatestOnlineAgentIDs(agentIds []string, lastSeconds int) (map[string]struct{}, error) {
q := orm.Query{
WildcardIndex: true,
}
mustQ := []util.MapStr{
{
"term": util.MapStr{
"metadata.name": util.MapStr{
"value": "agent",
},
},
},
{
"term": util.MapStr{
"metadata.category": util.MapStr{
"value": "instance",
},
},
},
}
if len(agentIds) > 0 {
mustQ = append(mustQ, util.MapStr{
"terms": util.MapStr{
"agent.id": agentIds,
},
})
}
queryDSL := util.MapStr{
"_source": "agent.id",
"sort": []util.MapStr{
{
"timestamp": util.MapStr{
"order": "desc",
},
},
},
"collapse": util.MapStr{
"field": "agent.id",
},
"query": util.MapStr{
"bool": util.MapStr{
"filter": []util.MapStr{
{
"range": util.MapStr{
"timestamp": util.MapStr{
"gte": fmt.Sprintf("now-%ds", lastSeconds),
},
},
},
},
"must": mustQ,
},
},
}
q.RawQuery = util.MustToJSONBytes(queryDSL)
err, result := orm.Search(event.Event{}, &q)
if err != nil {
return nil, fmt.Errorf("query agent instance metric error: %w", err)
}
agentIDs := map[string]struct{}{}
if len(result.Result) > 0 {
searchRes := elastic.SearchResponse{}
err = util.FromJSONBytes(result.Raw, &searchRes)
if err != nil {
return nil, err
}
agentIDKeyPath := []string{"agent", "id"}
for _, hit := range searchRes.Hits.Hits {
agentID, _ := util.GetMapValueByKeys(agentIDKeyPath, hit.Source)
if v, ok := agentID.(string); ok {
agentIDs[v] = struct{}{}
}
}
}
return agentIDs, nil
}
func GetAgentIngestConfig() (string, *elastic.BasicAuth, error) {
agCfg := GetAgentConfig()
var (
endpoint string
ok bool
)
if endpoint, ok = agCfg.Setup.IngestClusterEndpoint.(string);ok {
if endpoint = strings.TrimSpace(endpoint); endpoint == "" {
return "", nil, fmt.Errorf("config ingest_cluster_endpoint must not be empty")
}
}
var (
basicAuth elastic.BasicAuth
)
if agCfg.Setup.IngestClusterCredentialID != "" {
cred := credential.Credential{}
cred.ID = agCfg.Setup.IngestClusterCredentialID
_, err := orm.Get(&cred)
if err != nil {
return "", nil, fmt.Errorf("query credential [%s] error: %w", cred.ID, err)
}
info, err := cred.Decode()
if err != nil {
return "", nil, fmt.Errorf("decode credential [%s] error: %w", cred.ID, err)
}
if basicAuth, ok = info.(elastic.BasicAuth); !ok {
log.Debug("invalid credential: ", cred)
}
}
tpl := `elasticsearch:
- name: default
enabled: true
endpoint: %s
discovery:
enabled: true
basic_auth:
username: %s
password: $[[keystore.ingest_cluster_password]]
metrics:
enabled: true
queue: metrics
network:
enabled: true
summary: true
details: true
memory:
metrics:
- swap
- memory
disk:
metrics:
- iops
- usage
cpu:
metrics:
- idle
- system
- user
- iowait
- load
instance:
enabled: true
pipeline:
- name: logs_indexing_merge
auto_start: true
keep_running: true
processor:
- indexing_merge:
index_name: ".infini_logs"
elasticsearch: "default"
input_queue: "logs"
idle_timeout_in_seconds: 10
output_queue:
name: "logs_requests"
label:
tag: "logs"
worker_size: 1
bulk_size_in_mb: 10
- name: ingest_logs
auto_start: true
keep_running: true
processor:
- bulk_indexing:
bulk:
compress: true
batch_size_in_mb: 5
batch_size_in_docs: 5000
consumer:
fetch_max_messages: 100
queues:
type: indexing_merge
tag: "logs"
when:
cluster_available: ["default"]
- name: metrics_indexing_merge
auto_start: true
keep_running: true
processor:
- indexing_merge:
elasticsearch: "default"
index_name: ".infini_metrics"
input_queue: "metrics"
output_queue:
name: "metrics_requests"
label:
tag: "metrics"
worker_size: 1
bulk_size_in_mb: 5
- name: ingest_metrics
auto_start: true
keep_running: true
processor:
- bulk_indexing:
bulk:
compress: true
batch_size_in_mb: 5
batch_size_in_docs: 5000
consumer:
fetch_max_messages: 100
queues:
type: indexing_merge
tag: "metrics"
when:
cluster_available: ["default"]`
tpl = fmt.Sprintf(tpl, endpoint, basicAuth.Username)
return tpl, &basicAuth, nil
}

View File

@ -1,65 +0,0 @@
/* Copyright © INFINI Ltd. All rights reserved.
* Web: https://infinilabs.com
* Email: hello#infini.ltd */
package common
import (
"context"
"infini.sh/framework/core/agent"
"infini.sh/framework/core/elastic"
"infini.sh/framework/core/host"
)
var defaultClient ClientAPI
func GetClient() ClientAPI {
if defaultClient == nil {
panic("agent client not init")
}
return defaultClient
}
func RegisterClient(client ClientAPI) {
defaultClient = client
}
type ClientAPI interface {
GetHostInfo(ctx context.Context, agentBaseURL string) (*host.HostInfo, error)
GetElasticProcess(ctx context.Context, agentBaseURL string, agentID string)(interface{}, error)
GetElasticLogFiles(ctx context.Context, agentBaseURL string, logsPath string)(interface{}, error)
GetElasticLogFileContent(ctx context.Context, agentBaseURL string, body interface{})(interface{}, error)
GetInstanceBasicInfo(ctx context.Context, agentBaseURL string) (*agent.Instance, error)
RegisterElasticsearch(ctx context.Context, agentBaseURL string, cfgs []elastic.ElasticsearchConfig) error
GetElasticsearchNodes(ctx context.Context, agentBaseURL string) ([]agent.ESNodeInfo, error)
AuthESNode(ctx context.Context, agentBaseURL string, cfg elastic.ElasticsearchConfig) (*agent.ESNodeInfo, error)
CreatePipeline(ctx context.Context, agentBaseURL string, body []byte) error
DeletePipeline(ctx context.Context, agentBaseURL, pipelineID string) error
}
var stateManager IStateManager
func GetStateManager() IStateManager {
if stateManager == nil {
panic("agent state manager not init")
}
return stateManager
}
func RegisterStateManager(sm IStateManager) {
stateManager = sm
}
func IsEnabled() bool {
return stateManager != nil
}
type IStateManager interface {
GetAgent(ID string) (*agent.Instance, error)
UpdateAgent(inst *agent.Instance, syncToES bool) (*agent.Instance, error)
GetTaskAgent(clusterID string) (*agent.Instance, error)
DeleteAgent(agentID string) error
LoopState()
Stop()
GetAgentClient() ClientAPI
}

View File

@ -0,0 +1,24 @@
/* Copyright © INFINI Ltd. All rights reserved.
* Web: https://infinilabs.com
* Email: hello#infini.ltd */
package model
type AgentConfig struct {
Enabled bool `config:"enabled"`
StateManager struct{
Enabled bool `config:"enabled"`
} `config:"state_manager"`
Setup *SetupConfig `config:"setup"`
}
type SetupConfig struct {
DownloadURL string `config:"download_url"`
Version string `config:"version"`
CACertFile string `config:"ca_cert"`
CAKeyFile string `config:"ca_key"`
ConsoleEndpoint string `config:"console_endpoint"`
IngestClusterEndpoint interface{} `config:"ingest_cluster_endpoint"`
IngestClusterCredentialID string `config:"ingest_cluster_credential_id"`
Port string `config:"port"`
}

View File

@ -0,0 +1,10 @@
/* Copyright © INFINI Ltd. All rights reserved.
* Web: https://infinilabs.com
* Email: hello#infini.ltd */
package model
const (
StatusOnline string = "online"
StatusOffline = "offline"
)

View File

@ -2,7 +2,7 @@
* Web: https://infinilabs.com * Web: https://infinilabs.com
* Email: hello#infini.ltd */ * Email: hello#infini.ltd */
package common package model
import ( import (
"infini.sh/framework/core/elastic" "infini.sh/framework/core/elastic"

View File

@ -2,51 +2,75 @@
* Web: https://infinilabs.com * Web: https://infinilabs.com
* Email: hello#infini.ltd */ * Email: hello#infini.ltd */
package common package state
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/buger/jsonparser" "github.com/buger/jsonparser"
log "github.com/cihub/seelog" log "github.com/cihub/seelog"
"infini.sh/console/modules/agent/client"
"infini.sh/console/modules/agent/common"
"infini.sh/console/modules/agent/model"
"infini.sh/framework/core/agent" "infini.sh/framework/core/agent"
"infini.sh/framework/core/elastic"
"infini.sh/framework/core/event"
"infini.sh/framework/core/host" "infini.sh/framework/core/host"
"infini.sh/framework/core/kv" "infini.sh/framework/core/kv"
"infini.sh/framework/core/orm" "infini.sh/framework/core/orm"
"infini.sh/framework/core/util" "infini.sh/framework/core/util"
"infini.sh/framework/modules/elastic"
"runtime" "runtime"
"runtime/debug" "runtime/debug"
"strings" "src/gopkg.in/yaml.v2"
"sync" "sync"
"time" "time"
) )
const ( var stateManager IStateManager
StatusOnline string = "online"
StatusOffline = "offline" func GetStateManager() IStateManager {
) if stateManager == nil {
panic("agent state manager not init")
}
return stateManager
}
func RegisterStateManager(sm IStateManager) {
stateManager = sm
}
func IsEnabled() bool {
return stateManager != nil
}
type IStateManager interface {
GetAgent(ID string) (*agent.Instance, error)
UpdateAgent(inst *agent.Instance, syncToES bool) (*agent.Instance, error)
GetTaskAgent(clusterID string) (*agent.Instance, error)
DeleteAgent(agentID string) error
LoopState()
Stop()
GetAgentClient() client.ClientAPI
}
type StateManager struct { type StateManager struct {
TTL time.Duration // kv ttl TTL time.Duration // kv ttl
KVKey string KVKey string
stopC chan struct{} stopC chan struct{}
stopCompleteC chan struct{} stopCompleteC chan struct{}
agentClient *Client agentClient *client.Client
agentIds map[string]string agentIds map[string]string
agentMutex sync.Mutex agentMutex sync.Mutex
workerChan chan struct{} workerChan chan struct{}
timestamps map[string]int64 timestamps map[string]int64
} }
func NewStateManager(TTL time.Duration, kvKey string, agentIds map[string]string) *StateManager { func NewStateManager(TTL time.Duration, kvKey string, agentIds map[string]string, agentClient *client.Client) *StateManager {
return &StateManager{ return &StateManager{
TTL: TTL, TTL: TTL,
KVKey: kvKey, KVKey: kvKey,
stopC: make(chan struct{}), stopC: make(chan struct{}),
stopCompleteC: make(chan struct{}), stopCompleteC: make(chan struct{}),
agentClient: &Client{}, agentClient: agentClient,
agentIds: agentIds, agentIds: agentIds,
workerChan: make(chan struct{}, runtime.NumCPU()), workerChan: make(chan struct{}, runtime.NumCPU()),
timestamps: map[string]int64{}, timestamps: map[string]int64{},
@ -54,7 +78,7 @@ func NewStateManager(TTL time.Duration, kvKey string, agentIds map[string]string
} }
func (sm *StateManager) checkAgentStatus() { func (sm *StateManager) checkAgentStatus() {
onlineAgentIDs, err := GetLatestOnlineAgentIDs(nil, int(sm.TTL.Seconds())) onlineAgentIDs, err := common.GetLatestOnlineAgentIDs(nil, int(sm.TTL.Seconds()))
if err != nil { if err != nil {
log.Error(err) log.Error(err)
return return
@ -64,32 +88,32 @@ func (sm *StateManager) checkAgentStatus() {
for agentID := range onlineAgentIDs { for agentID := range onlineAgentIDs {
if _, ok := sm.agentIds[agentID]; !ok { if _, ok := sm.agentIds[agentID]; !ok {
log.Infof("status of agent [%s] changed to online", agentID) log.Infof("status of agent [%s] changed to online", agentID)
sm.agentIds[agentID] = StatusOnline sm.agentIds[agentID] = model.StatusOnline
} }
} }
sm.agentMutex.Unlock() sm.agentMutex.Unlock()
for agentID, status := range sm.agentIds { for agentID, status := range sm.agentIds {
if _, ok := onlineAgentIDs[agentID]; ok { if _, ok := onlineAgentIDs[agentID]; ok {
sm.syncSettings(agentID) sm.syncSettings(agentID)
host.UpdateHostAgentStatus(agentID, StatusOnline) host.UpdateHostAgentStatus(agentID, model.StatusOnline)
if status == StatusOnline { if status == model.StatusOnline {
continue continue
} }
// status change to online // status change to online
sm.agentIds[agentID] = StatusOnline sm.agentIds[agentID] = model.StatusOnline
log.Infof("status of agent [%s] changed to online", agentID) log.Infof("status of agent [%s] changed to online", agentID)
//set timestamp equals 0 to create pipeline //set timestamp equals 0 to create pipeline
sm.timestamps[agentID] = 0 sm.timestamps[agentID] = 0
continue continue
}else{ }else{
// already offline // already offline
if status == StatusOffline { if status == model.StatusOffline {
continue continue
} }
} }
// status change to offline // status change to offline
// todo validate whether agent is offline // todo validate whether agent is offline
sm.agentIds[agentID] = StatusOffline sm.agentIds[agentID] = model.StatusOffline
sm.workerChan <- struct{}{} sm.workerChan <- struct{}{}
go func(agentID string) { go func(agentID string) {
defer func() { defer func() {
@ -101,10 +125,12 @@ func (sm *StateManager) checkAgentStatus() {
}() }()
ag, err := sm.GetAgent(agentID) ag, err := sm.GetAgent(agentID)
if err != nil { if err != nil {
if err != elastic.ErrNotFound {
log.Error(err) log.Error(err)
}
return return
} }
ag.Status = StatusOffline ag.Status = model.StatusOffline
log.Infof("agent [%s] is offline", ag.Endpoint) log.Infof("agent [%s] is offline", ag.Endpoint)
_, err = sm.UpdateAgent(ag, true) _, err = sm.UpdateAgent(ag, true)
if err != nil { if err != nil {
@ -112,15 +138,22 @@ func (sm *StateManager) checkAgentStatus() {
return return
} }
//update host agent status //update host agent status
host.UpdateHostAgentStatus(ag.ID, StatusOffline) host.UpdateHostAgentStatus(ag.ID, model.StatusOffline)
}(agentID) }(agentID)
} }
} }
func (sm *StateManager) syncSettings(agentID string) { func (sm *StateManager) syncSettings(agentID string) {
ag, err := sm.GetAgent(agentID)
if err != nil {
if err != elastic.ErrNotFound {
log.Errorf("get agent error: %v", err)
}
return
}
newTimestamp := time.Now().UnixMilli() newTimestamp := time.Now().UnixMilli()
settings, err := GetAgentSettings(agentID, sm.timestamps[agentID]) settings, err := common.GetAgentSettings(agentID, sm.timestamps[agentID])
if err != nil { if err != nil {
log.Errorf("query agent settings error: %v", err) log.Errorf("query agent settings error: %v", err)
return return
@ -129,46 +162,52 @@ func (sm *StateManager) syncSettings(agentID string) {
log.Debugf("got no settings of agent [%s]", agentID) log.Debugf("got no settings of agent [%s]", agentID)
return return
} }
parseResult, err := ParseAgentSettings(settings) parseResult, err := common.ParseAgentSettings(settings)
if err != nil { if err != nil {
log.Errorf("parse agent settings error: %v", err) log.Errorf("parse agent settings error: %v", err)
return return
} }
ag, err := sm.GetAgent(agentID)
if err != nil {
log.Errorf("get agent error: %v", err)
return
}
agClient := sm.GetAgentClient() agClient := sm.GetAgentClient()
var clusterCfgs []util.MapStr
if len(parseResult.ClusterConfigs) > 0 { if len(parseResult.ClusterConfigs) > 0 {
err = agClient.RegisterElasticsearch(nil, ag.GetEndpoint(), parseResult.ClusterConfigs) for _, cfg := range parseResult.ClusterConfigs {
if err != nil { clusterCfg := util.MapStr{
log.Errorf("register elasticsearch config error: %v", err) "name": cfg.ID,
return "enabled": true,
"endpoint": cfg.Endpoint,
} }
} if cfg.BasicAuth != nil && cfg.BasicAuth.Password != ""{
for _, pipelineID := range parseResult.ToDeletePipelineNames { err = agClient.SetKeystoreValue(context.Background(), ag.GetEndpoint(), fmt.Sprintf("%s_password", cfg.ID), cfg.BasicAuth.Password)
err = agClient.DeletePipeline(context.Background(), ag.GetEndpoint(), pipelineID)
if err != nil { if err != nil {
if !strings.Contains(err.Error(), "not found") { log.Errorf("set keystore value error: %v", err)
log.Errorf("delete pipeline error: %v", err)
continue continue
} }
clusterCfg["basic_auth"] = util.MapStr{
"username": cfg.BasicAuth.Username,
"password": fmt.Sprintf("$[[keystore.%s_password]]", cfg.ID),
} }
//todo update delete pipeline state
} }
for _, pipeline := range parseResult.Pipelines { clusterCfgs = append(clusterCfgs, clusterCfg)
err = agClient.CreatePipeline(context.Background(), ag.GetEndpoint(), util.MustToJSONBytes(pipeline)) }
}
var dynamicCfg = util.MapStr{}
if len(clusterCfgs) > 0 {
dynamicCfg["elasticsearch"] = clusterCfgs
}
if len(parseResult.Pipelines) > 0 {
dynamicCfg["pipeline"] = parseResult.Pipelines
}
cfgBytes, err := yaml.Marshal(dynamicCfg)
if err != nil { if err != nil {
log.Errorf("create pipeline error: %v", err) log.Error("serialize config to yaml error: ", err)
return return
} }
} err = agClient.SaveDynamicConfig(context.Background(), ag.GetEndpoint(), "dynamic_task", string(cfgBytes))
sm.timestamps[agentID] = newTimestamp sm.timestamps[agentID] = newTimestamp
} }
func (sm *StateManager) getAvailableAgent(clusterID string) (*agent.Instance, error) { func (sm *StateManager) getAvailableAgent(clusterID string) (*agent.Instance, error) {
agents, err := LoadAgentsFromES(clusterID) agents, err := common.LoadAgentsFromES(clusterID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -216,7 +255,7 @@ func (sm *StateManager) GetAgent(ID string) (*agent.Instance, error) {
if time.Since(timestamp) > sm.TTL { if time.Since(timestamp) > sm.TTL {
exists, err := orm.Get(inst) exists, err := orm.Get(inst)
if err != nil { if err != nil {
return nil, fmt.Errorf("get agent [%s] error: %w", ID, err) return nil, err
} }
if !exists { if !exists {
return nil, fmt.Errorf("can not found agent [%s]", ID) return nil, fmt.Errorf("can not found agent [%s]", ID)
@ -261,112 +300,6 @@ func (sm *StateManager) DeleteAgent(agentID string) error {
return kv.DeleteKey(sm.KVKey, []byte(agentID)) return kv.DeleteKey(sm.KVKey, []byte(agentID))
} }
func (sm *StateManager) GetAgentClient() ClientAPI { func (sm *StateManager) GetAgentClient() client.ClientAPI {
return sm.agentClient return sm.agentClient
} }
func LoadAgentsFromES(clusterID string) ([]agent.Instance, error) {
q := orm.Query{
Size: 1000,
}
if clusterID != "" {
q.Conds = orm.And(orm.Eq("id", clusterID))
}
err, result := orm.Search(agent.Instance{}, &q)
if err != nil {
return nil, fmt.Errorf("query agent error: %w", err)
}
if len(result.Result) > 0 {
var agents = make([]agent.Instance, 0, len(result.Result))
for _, row := range result.Result {
ag := agent.Instance{}
bytes := util.MustToJSONBytes(row)
err = util.FromJSONBytes(bytes, &ag)
if err != nil {
log.Errorf("got unexpected agent: %s, error: %v", string(bytes), err)
continue
}
agents = append(agents, ag)
}
return agents, nil
}
return nil, nil
}
func GetLatestOnlineAgentIDs(agentIds []string, lastSeconds int) (map[string]struct{}, error) {
q := orm.Query{
WildcardIndex: true,
}
mustQ := []util.MapStr{
{
"term": util.MapStr{
"metadata.name": util.MapStr{
"value": "agent",
},
},
},
{
"term": util.MapStr{
"metadata.category": util.MapStr{
"value": "instance",
},
},
},
}
if len(agentIds) > 0 {
mustQ = append(mustQ, util.MapStr{
"terms": util.MapStr{
"agent.id": agentIds,
},
})
}
queryDSL := util.MapStr{
"_source": "agent.id",
"sort": []util.MapStr{
{
"timestamp": util.MapStr{
"order": "desc",
},
},
},
"collapse": util.MapStr{
"field": "agent.id",
},
"query": util.MapStr{
"bool": util.MapStr{
"filter": []util.MapStr{
{
"range": util.MapStr{
"timestamp": util.MapStr{
"gte": fmt.Sprintf("now-%ds", lastSeconds),
},
},
},
},
"must": mustQ,
},
},
}
q.RawQuery = util.MustToJSONBytes(queryDSL)
err, result := orm.Search(event.Event{}, &q)
if err != nil {
return nil, fmt.Errorf("query agent instance metric error: %w", err)
}
agentIDs := map[string]struct{}{}
if len(result.Result) > 0 {
searchRes := elastic.SearchResponse{}
err = util.FromJSONBytes(result.Raw, &searchRes)
if err != nil {
return nil, err
}
agentIDKeyPath := []string{"agent", "id"}
for _, hit := range searchRes.Hits.Hits {
agentID, _ := util.GetMapValueByKeys(agentIDKeyPath, hit.Source)
if v, ok := agentID.(string); ok {
agentIDs[v] = struct{}{}
}
}
}
return agentIDs, nil
}