Merge branch 'master' of ssh://git.infini.ltd:64221/infini/console
This commit is contained in:
commit
687209da9b
|
@ -0,0 +1,250 @@
|
|||
#!/bin/bash
|
||||
# Agent install script for UNIX-like OS
|
||||
# Author: INFINI
|
||||
# BASE_URL : need, download server address,eg: https://release.infinilabs.com/agent/stable
|
||||
# AGENT_VER : need, Agent version, eg: 0.4.0-126
|
||||
# INSTALL_PATH : option, download path. eg: /home/user/infini default: /opt
|
||||
# ES_NAME
|
||||
# ES_PWD
|
||||
|
||||
printf "\n* _ ___ __ __ _____ "
|
||||
printf "\n* /_\\ / _ \\ /__\\/\\ \\ \\/__ \\"
|
||||
printf "\n* //_\\\\ / /_\\//_\\ / \\/ / / /\\/"
|
||||
printf "\n* / _ \\/ /_\\\\//__/ /\\ / / / "
|
||||
printf "\n* \\_/ \\_/\\____/\\__/\\_\\ \\/ \\/ \n\n"
|
||||
# detect root user
|
||||
if [ "$(echo "$UID")" = "0" ]; then
|
||||
sudo_cmd=''
|
||||
else
|
||||
sudo_cmd='sudo'
|
||||
fi
|
||||
|
||||
##################
|
||||
# colors
|
||||
##################
|
||||
RED="\033[31m"
|
||||
CLR="\033[0m"
|
||||
GREEN="\033[32m"
|
||||
|
||||
##################
|
||||
# validate os & arch
|
||||
##################
|
||||
|
||||
arch=
|
||||
case $(uname -m) in
|
||||
|
||||
"x86_64")
|
||||
arch="amd64"
|
||||
;;
|
||||
|
||||
"i386" | "i686")
|
||||
arch="386"
|
||||
;;
|
||||
|
||||
"aarch64")
|
||||
arch="arm64"
|
||||
;;
|
||||
|
||||
"arm" | "armv7l")
|
||||
arch="arm"
|
||||
;;
|
||||
|
||||
"arm64")
|
||||
arch="arm64"
|
||||
;;
|
||||
|
||||
*)
|
||||
# shellcheck disable=SC2059
|
||||
printf "${RED}[E] Unsupport arch $(uname -m) ${CLR}\n"
|
||||
exit 1
|
||||
;;
|
||||
esac
|
||||
|
||||
os="linux"
|
||||
|
||||
if [[ "$OSTYPE" == "darwin"* ]]; then
|
||||
if [[ $arch != "amd64" ]] && [[ $arch != "arm64" ]]; then # Darwin only support amd64 and arm64
|
||||
# shellcheck disable=SC2059
|
||||
printf "${RED}[E] Darwin only support amd64/arm64.${CLR}\n"
|
||||
exit 1;
|
||||
fi
|
||||
|
||||
os="mac"
|
||||
|
||||
# # NOTE: under darwin, for arm64 and amd64, both use amd64
|
||||
# arch="arm"
|
||||
fi
|
||||
|
||||
##################
|
||||
# validate params
|
||||
##################
|
||||
|
||||
base_url="{{base_url}}"
|
||||
if [ -n "$BASE_URL" ]; then
|
||||
base_url=$BASE_URL
|
||||
fi
|
||||
|
||||
agent_ver="{{agent_version}}"
|
||||
if [ -n "$AGENT_VER" ]; then
|
||||
agent_ver=$AGENT_VER
|
||||
fi
|
||||
|
||||
ca_crt="{{ca_crt}}"
|
||||
client_crt="{{client_crt}}"
|
||||
client_key="{{client_key}}"
|
||||
|
||||
##################
|
||||
# download agent
|
||||
##################
|
||||
|
||||
suffix="tar.gz"
|
||||
if [[ "$os" == "mac" ]]; then
|
||||
suffix="zip"
|
||||
fi
|
||||
|
||||
download_url="${base_url}/agent-${agent_ver}-${os}-${arch}.${suffix}"
|
||||
|
||||
install_path="/opt"
|
||||
if [ -n "$INSTALL_PATH" ]; then
|
||||
install_path=$INSTALL_PATH
|
||||
fi
|
||||
|
||||
file_name="agent-${agent_ver}-${os}-${arch}.${suffix}" #agent在服务器上的文件名
|
||||
agent="${install_path}/agent/${file_name}" #agent下载后保存的文件
|
||||
agent_exc="${install_path}/agent/agent-${os}-${arch}" #agent可执行文件
|
||||
|
||||
agent_exsit="true"
|
||||
if [ ! -d "${install_path}/agent" ]; then
|
||||
printf "\n* mkdir ${install_path}/agent"
|
||||
$sudo_cmd mkdir "${install_path}/agent"
|
||||
agent_exsit="false"
|
||||
fi
|
||||
|
||||
if [ $? -ne 0 ]; then
|
||||
exit 1
|
||||
fi
|
||||
|
||||
printf "\n* downloading ${download_url}\n"
|
||||
|
||||
printf "\n* save to : ${agent}\n"
|
||||
|
||||
cd "$install_path/agent"
|
||||
|
||||
sudo curl -O --progress-bar $download_url
|
||||
|
||||
if [ $? -ne 0 ]; then
|
||||
exit 1
|
||||
fi
|
||||
|
||||
printf "\n* downloaded: ${agent}"
|
||||
|
||||
##################
|
||||
# install agent
|
||||
##################
|
||||
|
||||
printf "\n* start install"
|
||||
|
||||
if [[ "${suffix}" == "zip" ]]; then
|
||||
printf "\n* uzip ${agent}\n"
|
||||
$sudo_cmd unzip $agent
|
||||
else
|
||||
printf "\n* tar -xzvf ${agent}\n"
|
||||
$sudo_cmd tar -xzvf $agent
|
||||
fi
|
||||
|
||||
if [ $? -ne 0 ]; then
|
||||
exit 1
|
||||
fi
|
||||
|
||||
rm -f ${agent}
|
||||
|
||||
##################
|
||||
# save cert
|
||||
##################
|
||||
$sudo_cmd mkdir config
|
||||
$sudo_cmd sh -c "echo '${ca_crt}' > ./config/ca.crt"
|
||||
$sudo_cmd sh -c "echo '${client_crt}' > ./config/client.crt"
|
||||
$sudo_cmd sh -c "echo '${client_key}' > ./config/client.key"
|
||||
if [ $? -ne 0 ]; then
|
||||
exit 1
|
||||
fi
|
||||
|
||||
port="{{port}}"
|
||||
|
||||
## generate agent.yml
|
||||
agent_config="path.configs: "config"
|
||||
configs.auto_reload: true
|
||||
env:
|
||||
API_BINDING: "0.0.0.0:${port}"
|
||||
|
||||
path.data: data
|
||||
path.logs: log
|
||||
|
||||
api:
|
||||
enabled: true
|
||||
tls:
|
||||
enabled: true
|
||||
cert_file: "${install_path}/agent/config/client.crt"
|
||||
key_file: "${install_path}/agent/config/client.key"
|
||||
ca_file: "${install_path}/agent/config/ca.crt"
|
||||
skip_insecure_verify: false
|
||||
network:
|
||||
binding: \$[[env.API_BINDING]]
|
||||
|
||||
badger:
|
||||
value_log_max_entries: 1000000
|
||||
value_log_file_size: 104857600
|
||||
value_threshold: 1024
|
||||
agent:
|
||||
major_ip_pattern: "192.*"
|
||||
"
|
||||
|
||||
agent_yml_path="${install_path}/agent/agent.yml"
|
||||
|
||||
$sudo_cmd rm $agent_yml_path
|
||||
$sudo_cmd touch $agent_yml_path
|
||||
$sudo_cmd sh -c "echo '${agent_config}' > $agent_yml_path"
|
||||
|
||||
if [ $? -ne 0 ]; then
|
||||
exit 1
|
||||
fi
|
||||
|
||||
$sudo_cmd chmod +x $agent_exc
|
||||
|
||||
#try to stop and uninstall service
|
||||
if [[ "$agent_exsit" == "true" ]]; then
|
||||
printf "\n* stop && uninstall service\n"
|
||||
$sudo_cmd $agent_exc -service stop
|
||||
$sudo_cmd $agent_exc -service uninstall
|
||||
fi
|
||||
|
||||
printf "\n* start install service\n"
|
||||
$sudo_cmd $agent_exc -service install
|
||||
|
||||
if [ $? -ne 0 ]; then
|
||||
exit 1
|
||||
fi
|
||||
|
||||
printf "\n* service installed\n"
|
||||
printf "\n* service starting >>>>>>\n"
|
||||
$sudo_cmd $agent_exc -service start
|
||||
|
||||
if [ $? -ne 0 ]; then
|
||||
exit 1
|
||||
fi
|
||||
|
||||
printf "\n* agent service started"
|
||||
console_endpoint="{{console_endpoint}}"
|
||||
sleep 3
|
||||
printf "\n* start register\n"
|
||||
token={{token}}
|
||||
curl -X POST ${console_endpoint}/agent/instance?token=${token}
|
||||
|
||||
if [ $? -ne 0 ]; then
|
||||
exit 1
|
||||
fi
|
||||
printf "\n* agent registered\n"
|
||||
|
||||
printf "\n* ${GREEN}Congratulations, install success!${CLR}\n\n"
|
||||
|
||||
|
17
console.yml
17
console.yml
|
@ -1,6 +1,11 @@
|
|||
path.configs: "config"
|
||||
configs.auto_reload: true
|
||||
|
||||
#env:
|
||||
# INFINI_CONSOLE_ENDPOINT: "http://192.168.3.9:9000"
|
||||
# INGEST_CLUSTER_ENDPOINT: "http://192.168.3.9:9210"
|
||||
# INGEST_CLUSTER_CREDENTIAL_ID: chjkp9dath21f1ae9tq0
|
||||
|
||||
web:
|
||||
enabled: true
|
||||
embedding_api: true
|
||||
|
@ -64,4 +69,14 @@ badger:
|
|||
# authorize_url: "https://github.com/login/oauth/authorize"
|
||||
# token_url: "https://github.com/login/oauth/access_token"
|
||||
# redirect_url: ""
|
||||
# scopes: []
|
||||
# scopes: []
|
||||
|
||||
#agent:
|
||||
# setup:
|
||||
# download_url: "https://release.infinilabs.com/agent/snapshot"
|
||||
# version: 0.5.0_NIGHTLY-157
|
||||
# ca_cert: "/opt/config/certs/ca.crt"
|
||||
# ca_key: "/opt/config/certs/ca.key"
|
||||
# console_endpoint: $[[env.INFINI_CONSOLE_ENDPOINT]]
|
||||
# ingest_cluster_endpoint: $[[env.INGEST_CLUSTER_ENDPOINT]]
|
||||
# ingest_cluster_credential_id: $[[env.INGEST_CLUSTER_CREDENTIAL_ID]]
|
|
@ -6,6 +6,7 @@ package model
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
|
@ -69,6 +70,25 @@ func (inst *Instance) DeletePipeline(pipelineID string) error {
|
|||
return inst.doRequest(req, nil)
|
||||
}
|
||||
|
||||
func (inst *Instance) GetPipeline(pipelineID string) (*pipeline.PipelineStatus, error) {
|
||||
if pipelineID == "" {
|
||||
return nil, errors.New("invalid pipelineID")
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
defer cancel()
|
||||
req := &util.Request{
|
||||
Method: http.MethodGet,
|
||||
Url: fmt.Sprintf("%s/pipeline/task/%s", inst.Endpoint, pipelineID),
|
||||
Context: ctx,
|
||||
}
|
||||
res := pipeline.PipelineStatus{}
|
||||
err := inst.doRequest(req, &res)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &res, nil
|
||||
}
|
||||
|
||||
func (inst *Instance) GetPipelinesByIDs(pipelineIDs []string) (pipeline.GetPipelinesResponse, error) {
|
||||
body := util.MustToJSONBytes(util.MapStr{
|
||||
"ids": pipelineIDs,
|
||||
|
|
|
@ -7,7 +7,10 @@ package agent
|
|||
import (
|
||||
log "github.com/cihub/seelog"
|
||||
"infini.sh/console/modules/agent/api"
|
||||
"infini.sh/console/modules/agent/client"
|
||||
"infini.sh/console/modules/agent/common"
|
||||
"infini.sh/console/modules/agent/model"
|
||||
"infini.sh/console/modules/agent/state"
|
||||
"infini.sh/framework/core/agent"
|
||||
"infini.sh/framework/core/env"
|
||||
"infini.sh/framework/core/host"
|
||||
|
@ -38,7 +41,22 @@ func (module *AgentModule) Start() error {
|
|||
orm.RegisterSchemaWithIndexName(agent.ESNodeInfo{}, "agent-node")
|
||||
orm.RegisterSchemaWithIndexName(host.HostInfo{}, "host")
|
||||
orm.RegisterSchemaWithIndexName(agent.Setting{}, "agent-setting")
|
||||
common.RegisterClient(&common.Client{})
|
||||
var (
|
||||
executor client.Executor
|
||||
err error
|
||||
)
|
||||
if module.AgentConfig.Setup == nil {
|
||||
executor = &client.HttpExecutor{}
|
||||
}else{
|
||||
executor, err = client.NewMTLSExecutor(module.AgentConfig.Setup.CACertFile, module.AgentConfig.Setup.CAKeyFile)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
agClient := &client.Client{
|
||||
Executor: executor,
|
||||
}
|
||||
client.RegisterClient(agClient)
|
||||
|
||||
if module.AgentConfig.StateManager.Enabled {
|
||||
onlineAgentIDs, err := common.GetLatestOnlineAgentIDs(nil, 60)
|
||||
|
@ -56,8 +74,8 @@ func (module *AgentModule) Start() error {
|
|||
}
|
||||
}
|
||||
|
||||
sm := common.NewStateManager(time.Second*30, "agent_state", agentIds)
|
||||
common.RegisterStateManager(sm)
|
||||
sm := state.NewStateManager(time.Second*30, "agent_state", agentIds, agClient)
|
||||
state.RegisterStateManager(sm)
|
||||
go sm.LoopState()
|
||||
}
|
||||
return nil
|
||||
|
@ -69,12 +87,12 @@ func (module *AgentModule) Stop() error {
|
|||
}
|
||||
log.Info("start to stop agent module")
|
||||
if module.AgentConfig.StateManager.Enabled {
|
||||
common.GetStateManager().Stop()
|
||||
state.GetStateManager().Stop()
|
||||
}
|
||||
log.Info("agent module was stopped")
|
||||
return nil
|
||||
}
|
||||
|
||||
type AgentModule struct {
|
||||
common.AgentConfig
|
||||
model.AgentConfig
|
||||
}
|
||||
|
|
|
@ -7,13 +7,13 @@ package api
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
common2 "infini.sh/console/modules/agent/common"
|
||||
log "github.com/cihub/seelog"
|
||||
"infini.sh/console/modules/agent/state"
|
||||
httprouter "infini.sh/framework/core/api/router"
|
||||
"infini.sh/framework/core/host"
|
||||
"infini.sh/framework/core/orm"
|
||||
"infini.sh/framework/core/util"
|
||||
"net/http"
|
||||
log "github.com/cihub/seelog"
|
||||
"time"
|
||||
)
|
||||
|
||||
|
@ -107,7 +107,7 @@ func (h *APIHandler) GetHostAgentInfo(w http.ResponseWriter, req *http.Request,
|
|||
return
|
||||
}
|
||||
|
||||
sm := common2.GetStateManager()
|
||||
sm := state.GetStateManager()
|
||||
ag, err := sm.GetAgent(hostInfo.AgentID)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
|
@ -159,7 +159,7 @@ func (h *APIHandler) GetHostElasticProcess(w http.ResponseWriter, req *http.Requ
|
|||
h.WriteJSON(w, util.MapStr{}, http.StatusOK)
|
||||
return
|
||||
}
|
||||
sm := common2.GetStateManager()
|
||||
sm := state.GetStateManager()
|
||||
ag, err := sm.GetAgent(hostInfo.AgentID)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
|
@ -193,7 +193,7 @@ func (h *APIHandler) GetHostElasticProcess(w http.ResponseWriter, req *http.Requ
|
|||
}
|
||||
|
||||
func enrollHostFromAgent(agentID string) (*host.HostInfo, error){
|
||||
sm := common2.GetStateManager()
|
||||
sm := state.GetStateManager()
|
||||
ag, err := sm.GetAgent(agentID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
@ -14,6 +14,7 @@ func Init() {
|
|||
api.HandleAPIMethod(api.POST, "/agent/instance", handler.createInstance)
|
||||
api.HandleAPIMethod(api.GET, "/agent/instance/_search", handler.RequirePermission(handler.searchInstance, enum.PermissionAgentInstanceRead))
|
||||
api.HandleAPIMethod(api.GET, "/agent/instance/:instance_id", handler.getInstance)
|
||||
api.HandleAPIMethod(api.PUT, "/agent/instance/:instance_id", handler.updateInstance)
|
||||
api.HandleAPIMethod(api.DELETE, "/agent/instance/:instance_id", handler.RequirePermission(handler.deleteInstance, enum.PermissionAgentInstanceWrite))
|
||||
api.HandleAPIMethod(api.POST, "/agent/instance/_stats", handler.RequirePermission(handler.getInstanceStats, enum.PermissionAgentInstanceRead))
|
||||
api.HandleAPIMethod(api.GET, "/agent/log/node/:node_id/files", handler.RequirePermission(handler.getLogFilesByNode, enum.PermissionAgentInstanceRead))
|
||||
|
@ -23,8 +24,13 @@ func Init() {
|
|||
api.HandleAPIMethod(api.POST, "/agent/instance/:instance_id/node/_auth", handler.RequirePermission(handler.authESNode, enum.PermissionAgentInstanceWrite))
|
||||
api.HandleAPIMethod(api.DELETE, "/agent/instance/:instance_id/_nodes", handler.RequirePermission(handler.deleteESNode, enum.PermissionAgentInstanceWrite))
|
||||
api.HandleAPIMethod(api.POST, "/agent/instance/:instance_id/node/_associate", handler.RequirePermission(handler.associateESNode, enum.PermissionAgentInstanceWrite))
|
||||
api.HandleAPIMethod(api.POST, "/agent/instance/try_connect", handler.RequireLogin(handler.tryConnect))
|
||||
|
||||
api.HandleAPIMethod(api.POST, "/host/_enroll", handler.enrollHost)
|
||||
api.HandleAPIMethod(api.GET, "/host/:host_id/agent/info",handler.GetHostAgentInfo)
|
||||
api.HandleAPIMethod(api.GET, "/host/:host_id/processes",handler.GetHostElasticProcess)
|
||||
|
||||
|
||||
api.HandleAPIMethod(api.POST, "/agent/install_command", handler.RequireLogin(handler.generateInstallCommand))
|
||||
api.HandleAPIMethod(api.GET, "/agent/install.sh", handler.getInstallScript)
|
||||
}
|
||||
|
|
|
@ -8,20 +8,22 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
log "github.com/cihub/seelog"
|
||||
"infini.sh/console/modules/agent/client"
|
||||
common2 "infini.sh/console/modules/agent/common"
|
||||
"infini.sh/console/modules/agent/model"
|
||||
"infini.sh/console/modules/agent/state"
|
||||
"infini.sh/framework/core/agent"
|
||||
"infini.sh/framework/core/api"
|
||||
httprouter "infini.sh/framework/core/api/router"
|
||||
"infini.sh/framework/core/elastic"
|
||||
"infini.sh/framework/core/event"
|
||||
"infini.sh/framework/core/host"
|
||||
"infini.sh/framework/core/orm"
|
||||
"infini.sh/framework/core/util"
|
||||
common2 "infini.sh/console/modules/agent/common"
|
||||
elastic2 "infini.sh/framework/modules/elastic"
|
||||
"infini.sh/framework/modules/elastic/adapter"
|
||||
"infini.sh/framework/modules/elastic/common"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
type APIHandler struct {
|
||||
|
@ -36,24 +38,32 @@ func (h *APIHandler) createInstance(w http.ResponseWriter, req *http.Request, ps
|
|||
log.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
oldInst := &agent.Instance{}
|
||||
oldInst.ID = obj.ID
|
||||
exists, err := orm.Get(oldInst)
|
||||
|
||||
if err != nil && err != elastic2.ErrNotFound {
|
||||
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||
log.Error(err)
|
||||
return
|
||||
}
|
||||
if exists {
|
||||
errMsg := fmt.Sprintf("agent [%s] already exists", obj.ID)
|
||||
h.WriteError(w, errMsg, http.StatusInternalServerError)
|
||||
log.Error(errMsg)
|
||||
return
|
||||
//validate token for auto register
|
||||
token := h.GetParameter(req, "token")
|
||||
if token != "" {
|
||||
if v, ok := tokens.Load(token); !ok {
|
||||
h.WriteError(w, "token is invalid", http.StatusUnauthorized)
|
||||
return
|
||||
}else{
|
||||
if t, ok := v.(*Token); !ok || t.CreatedAt.Add(ExpiredIn).Before(time.Now()) {
|
||||
tokens.Delete(token)
|
||||
h.WriteError(w, "token was expired", http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
}
|
||||
remoteIP := util.ClientIP(req)
|
||||
agCfg := common2.GetAgentConfig()
|
||||
port := agCfg.Setup.Port
|
||||
if port == "" {
|
||||
port = "8080"
|
||||
}
|
||||
obj.Endpoint = fmt.Sprintf("https://%s:%s", remoteIP, port)
|
||||
obj.Tags = append(obj.Tags, "mtls")
|
||||
obj.Tags = append(obj.Tags, "auto")
|
||||
}
|
||||
|
||||
//fetch more information of agent instance
|
||||
res, err := common2.GetClient().GetInstanceBasicInfo(context.Background(), obj.GetEndpoint())
|
||||
res, err := client.GetClient().GetInstanceBasicInfo(context.Background(), obj.GetEndpoint())
|
||||
if err != nil {
|
||||
errStr := fmt.Sprintf("get agent instance basic info error: %s", err.Error())
|
||||
h.WriteError(w,errStr , http.StatusInternalServerError)
|
||||
|
@ -71,9 +81,27 @@ func (h *APIHandler) createInstance(w http.ResponseWriter, req *http.Request, ps
|
|||
obj.MajorIP = res.MajorIP
|
||||
obj.Host = res.Host
|
||||
obj.IPS = res.IPS
|
||||
if obj.Name == "" {
|
||||
obj.Name = res.Name
|
||||
}
|
||||
}
|
||||
oldInst := &agent.Instance{}
|
||||
oldInst.ID = obj.ID
|
||||
exists, err := orm.Get(oldInst)
|
||||
|
||||
if err != nil && err != elastic2.ErrNotFound {
|
||||
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||
log.Error(err)
|
||||
return
|
||||
}
|
||||
if exists {
|
||||
errMsg := fmt.Sprintf("agent [%s] already exists", obj.ID)
|
||||
h.WriteError(w, errMsg, http.StatusInternalServerError)
|
||||
log.Error(errMsg)
|
||||
return
|
||||
}
|
||||
|
||||
obj.Status = common2.StatusOnline
|
||||
obj.Status = model.StatusOnline
|
||||
err = orm.Create(nil, obj)
|
||||
if err != nil {
|
||||
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||
|
@ -84,40 +112,30 @@ func (h *APIHandler) createInstance(w http.ResponseWriter, req *http.Request, ps
|
|||
if err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
err = pushIngestConfigToAgent(obj)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
|
||||
h.WriteCreatedOKJSON(w, obj.ID)
|
||||
|
||||
}
|
||||
|
||||
func bindAgentToHostByIP(ag *agent.Instance) error{
|
||||
err, result := orm.GetBy("ip", ag.MajorIP, host.HostInfo{})
|
||||
func pushIngestConfigToAgent(inst *agent.Instance) error{
|
||||
ingestCfg, basicAuth, err := common2.GetAgentIngestConfig()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(result.Result) > 0 {
|
||||
buf := util.MustToJSONBytes(result.Result[0])
|
||||
hostInfo := &host.HostInfo{}
|
||||
err = util.FromJSONBytes(buf, hostInfo)
|
||||
if basicAuth != nil && basicAuth.Password != "" {
|
||||
err = client.GetClient().SetKeystoreValue(context.Background(), inst.GetEndpoint(), "ingest_cluster_password", basicAuth.Password)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sm := common2.GetStateManager()
|
||||
if ag.Status == "" {
|
||||
_, err1 := sm.GetAgentClient().GetHostInfo(nil, ag.GetEndpoint())
|
||||
if err1 == nil {
|
||||
ag.Status = "online"
|
||||
}else{
|
||||
ag.Status = "offline"
|
||||
}
|
||||
}
|
||||
|
||||
hostInfo.AgentStatus = ag.Status
|
||||
hostInfo.AgentID = ag.ID
|
||||
err = orm.Update(nil, hostInfo)
|
||||
if err != nil {
|
||||
return err
|
||||
return fmt.Errorf("set keystore value to agent error: %w", err)
|
||||
}
|
||||
}
|
||||
err = client.GetClient().SaveDynamicConfig(context.Background(), inst.GetEndpoint(), "ingest", ingestCfg )
|
||||
if err != nil {
|
||||
fmt.Errorf("save dynamic config to agent error: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -169,14 +187,42 @@ func (h *APIHandler) deleteInstance(w http.ResponseWriter, req *http.Request, ps
|
|||
log.Error(err)
|
||||
return
|
||||
}
|
||||
if sm := common2.GetStateManager(); sm != nil {
|
||||
if sm := state.GetStateManager(); sm != nil {
|
||||
sm.DeleteAgent(obj.ID)
|
||||
}
|
||||
queryDsl := util.MapStr{
|
||||
"query": util.MapStr{
|
||||
"term": util.MapStr{
|
||||
"agent_id": util.MapStr{
|
||||
"value": id,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
err = orm.DeleteBy(agent.ESNodeInfo{}, util.MustToJSONBytes(queryDsl))
|
||||
if err != nil {
|
||||
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||
log.Error("delete node info error: ", err)
|
||||
return
|
||||
}
|
||||
|
||||
h.WriteJSON(w, util.MapStr{
|
||||
"_id": obj.ID,
|
||||
"result": "deleted",
|
||||
}, 200)
|
||||
queryDsl = util.MapStr{
|
||||
"query": util.MapStr{
|
||||
"term": util.MapStr{
|
||||
"metadata.labels.agent_id": util.MapStr{
|
||||
"value": id,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
err = orm.DeleteBy(agent.Setting{}, util.MustToJSONBytes(queryDsl))
|
||||
if err != nil {
|
||||
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||
log.Error("delete agent settings error: ", err)
|
||||
return
|
||||
}
|
||||
|
||||
h.WriteDeletedOKJSON(w, id)
|
||||
}
|
||||
|
||||
func (h *APIHandler) getInstanceStats(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
||||
|
@ -266,6 +312,52 @@ func (h *APIHandler) getInstanceStats(w http.ResponseWriter, req *http.Request,
|
|||
h.WriteJSON(w, result, http.StatusOK)
|
||||
}
|
||||
|
||||
func (h *APIHandler) updateInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
||||
id := ps.MustGetParameter("instance_id")
|
||||
oldInst := agent.Instance{}
|
||||
oldInst.ID = id
|
||||
_, err := orm.Get(&oldInst)
|
||||
if err != nil {
|
||||
if err == elastic2.ErrNotFound {
|
||||
h.WriteJSON(w, util.MapStr{
|
||||
"_id": id,
|
||||
"result": "not_found",
|
||||
}, http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||
log.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
obj := agent.Instance{}
|
||||
err = h.DecodeJSON(req, &obj)
|
||||
if err != nil {
|
||||
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||
log.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
oldInst.Name = obj.Name
|
||||
oldInst.Endpoint = obj.Endpoint
|
||||
oldInst.Description = obj.Description
|
||||
oldInst.Tags = obj.Tags
|
||||
oldInst.BasicAuth = obj.BasicAuth
|
||||
err = orm.Update(&orm.Context{
|
||||
Refresh: "wait_for",
|
||||
}, &oldInst)
|
||||
if err != nil {
|
||||
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||
log.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
h.WriteJSON(w, util.MapStr{
|
||||
"_id": obj.ID,
|
||||
"result": "updated",
|
||||
}, 200)
|
||||
}
|
||||
|
||||
|
||||
func (h *APIHandler) searchInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
||||
|
||||
|
@ -410,7 +502,7 @@ func (h *APIHandler) authESNode(w http.ResponseWriter, req *http.Request, ps htt
|
|||
return
|
||||
}
|
||||
cfg.BasicAuth = &basicAuth
|
||||
nodeInfo, err := common2.GetClient().AuthESNode(context.Background(), inst.GetEndpoint(), *cfg)
|
||||
nodeInfo, err := client.GetClient().AuthESNode(context.Background(), inst.GetEndpoint(), *cfg)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||
|
@ -519,12 +611,50 @@ func (h *APIHandler) deleteESNode(w http.ResponseWriter, req *http.Request, ps h
|
|||
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
q = util.MapStr{
|
||||
"query": util.MapStr{
|
||||
"bool": util.MapStr{
|
||||
"must": []util.MapStr{
|
||||
{
|
||||
"terms": util.MapStr{
|
||||
"metadata.labels.node_uuid": nodeIDs,
|
||||
},
|
||||
},
|
||||
{
|
||||
"term": util.MapStr{
|
||||
"metadata.labels.agent_id": util.MapStr{
|
||||
"value": id,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
h.WriteAckOKJSON(w)
|
||||
}
|
||||
|
||||
func (h *APIHandler) tryConnect(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
||||
var reqBody = struct {
|
||||
Endpoint string `json:"endpoint"`
|
||||
BasicAuth agent.BasicAuth
|
||||
}{}
|
||||
err := h.DecodeJSON(req, &reqBody)
|
||||
if err != nil {
|
||||
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
connectRes, err := client.GetClient().GetInstanceBasicInfo(context.Background(), reqBody.Endpoint)
|
||||
if err != nil {
|
||||
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
h.WriteJSON(w, connectRes, http.StatusOK)
|
||||
}
|
||||
|
||||
func refreshNodesInfo(inst *agent.Instance) ([]agent.ESNodeInfo, error) {
|
||||
nodesInfo, err := common2.GetClient().GetElasticsearchNodes(context.Background(), inst.GetEndpoint())
|
||||
nodesInfo, err := client.GetClient().GetElasticsearchNodes(context.Background(), inst.GetEndpoint())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get elasticsearch nodes error: %w", err)
|
||||
}
|
||||
|
@ -534,7 +664,6 @@ func refreshNodesInfo(inst *agent.Instance) ([]agent.ESNodeInfo, error) {
|
|||
}
|
||||
oldPids := map[int]struct{}{}
|
||||
var resultNodes []agent.ESNodeInfo
|
||||
//settings, err := common2.GetAgentSettings(inst.ID, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -555,24 +684,6 @@ func refreshNodesInfo(inst *agent.Instance) ([]agent.ESNodeInfo, error) {
|
|||
if oldNode != nil && oldNode.ClusterID != "" {
|
||||
node.ClusterID = oldNode.ClusterID
|
||||
}
|
||||
//else{
|
||||
// if cfg := clusterCfgs[node.ClusterUuid]; cfg != nil {
|
||||
// node.ClusterID = cfg.ID
|
||||
// setting := pickAgentSettings(settings, node)
|
||||
// if setting == nil {
|
||||
// setting, err = getAgentTaskSetting(inst.ID, node)
|
||||
// if err != nil {
|
||||
// log.Error()
|
||||
// }
|
||||
// err = orm.Create(nil, setting)
|
||||
// if err != nil {
|
||||
// log.Error("save agent task setting error: ", err)
|
||||
// }
|
||||
// }
|
||||
// }else{
|
||||
// //cluster not registered in console
|
||||
// }
|
||||
//}
|
||||
}
|
||||
|
||||
node.Status = "online"
|
||||
|
@ -626,6 +737,7 @@ func getNodesInfoFromES(agentID string) (map[int]*agent.ESNodeInfo, error){
|
|||
},
|
||||
},
|
||||
},
|
||||
|
||||
}
|
||||
q := orm.Query{
|
||||
RawQuery: util.MustToJSONBytes(query),
|
||||
|
@ -645,26 +757,6 @@ func getNodesInfoFromES(agentID string) (map[int]*agent.ESNodeInfo, error){
|
|||
return nodesInfo, nil
|
||||
}
|
||||
|
||||
func getClusterConfigs() map[string]*elastic.ElasticsearchConfig {
|
||||
cfgs := map[string]*elastic.ElasticsearchConfig{}
|
||||
elastic.WalkConfigs(func(key, value interface{}) bool {
|
||||
if cfg, ok := value.(*elastic.ElasticsearchConfig); ok {
|
||||
clusterUUID := cfg.ClusterUUID
|
||||
if cfg.ClusterUUID == "" {
|
||||
verInfo, err := adapter.ClusterVersion(elastic.GetMetadata(cfg.ID))
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return true
|
||||
}
|
||||
clusterUUID = verInfo.ClusterUUID
|
||||
}
|
||||
cfgs[clusterUUID] = cfg
|
||||
}
|
||||
return true
|
||||
})
|
||||
return cfgs
|
||||
}
|
||||
|
||||
func pickAgentSettings(settings []agent.Setting, nodeInfo agent.ESNodeInfo) *agent.Setting {
|
||||
for _, setting := range settings {
|
||||
if setting.Metadata.Labels["node_uuid"] == nodeInfo.NodeUUID {
|
||||
|
@ -679,6 +771,10 @@ func getAgentTaskSetting(agentID string, node agent.ESNodeInfo) (*agent.Setting,
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
taskSetting.Logs = &model.LogsTask{
|
||||
Enabled:true,
|
||||
LogsPath: node.Path.Logs,
|
||||
}
|
||||
return &agent.Setting{
|
||||
Metadata: agent.SettingsMetadata{
|
||||
Category: "agent",
|
||||
|
@ -698,7 +794,7 @@ func getAgentTaskSetting(agentID string, node agent.ESNodeInfo) (*agent.Setting,
|
|||
}
|
||||
|
||||
// getSettingsByClusterID query agent task settings with cluster id
|
||||
func getSettingsByClusterID(clusterID string) (*common2.TaskSetting, error) {
|
||||
func getSettingsByClusterID(clusterID string) (*model.TaskSetting, error) {
|
||||
queryDsl := util.MapStr{
|
||||
"size": 200,
|
||||
"query": util.MapStr{
|
||||
|
@ -746,8 +842,8 @@ func getSettingsByClusterID(clusterID string) (*common2.TaskSetting, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
setting := &common2.TaskSetting{
|
||||
NodeStats: &common2.NodeStatsTask{
|
||||
setting := &model.TaskSetting{
|
||||
NodeStats: &model.NodeStatsTask{
|
||||
Enabled: true,
|
||||
},
|
||||
}
|
||||
|
@ -776,17 +872,17 @@ func getSettingsByClusterID(clusterID string) (*common2.TaskSetting, error) {
|
|||
}
|
||||
}
|
||||
if clusterStats {
|
||||
setting.ClusterStats = &common2.ClusterStatsTask{
|
||||
setting.ClusterStats = &model.ClusterStatsTask{
|
||||
Enabled: true,
|
||||
}
|
||||
}
|
||||
if indexStats {
|
||||
setting.IndexStats = &common2.IndexStatsTask{
|
||||
setting.IndexStats = &model.IndexStatsTask{
|
||||
Enabled: true,
|
||||
}
|
||||
}
|
||||
if clusterHealth {
|
||||
setting.ClusterHealth = &common2.ClusterHealthTask{
|
||||
setting.ClusterHealth = &model.ClusterHealthTask{
|
||||
Enabled: true,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -7,7 +7,8 @@ package api
|
|||
import (
|
||||
"fmt"
|
||||
log "github.com/cihub/seelog"
|
||||
"infini.sh/console/modules/agent/common"
|
||||
"infini.sh/console/modules/agent/client"
|
||||
"infini.sh/console/modules/agent/state"
|
||||
"infini.sh/framework/core/agent"
|
||||
httprouter "infini.sh/framework/core/api/router"
|
||||
"infini.sh/framework/core/orm"
|
||||
|
@ -31,7 +32,7 @@ func (h *APIHandler) getLogFilesByNode(w http.ResponseWriter, req *http.Request,
|
|||
}, http.StatusOK)
|
||||
return
|
||||
}
|
||||
logFiles, err := common.GetClient().GetElasticLogFiles(nil, inst.GetEndpoint(), node.Path.Logs)
|
||||
logFiles, err := client.GetClient().GetElasticLogFiles(nil, inst.GetEndpoint(), node.Path.Logs)
|
||||
if err != nil {
|
||||
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||
log.Error(err)
|
||||
|
@ -68,7 +69,7 @@ func (h *APIHandler) getLogFileContent(w http.ResponseWriter, req *http.Request,
|
|||
return
|
||||
}
|
||||
reqBody.LogsPath = node.Path.Logs
|
||||
sm := common.GetStateManager()
|
||||
sm := state.GetStateManager()
|
||||
res, err := sm.GetAgentClient().GetElasticLogFileContent(nil, inst.GetEndpoint(), reqBody)
|
||||
if err != nil {
|
||||
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||
|
@ -88,6 +89,13 @@ func getAgentByNodeID(nodeID string) (*agent.Instance, *agent.ESNodeInfo, error)
|
|||
},
|
||||
},
|
||||
},
|
||||
"sort": []util.MapStr{
|
||||
{
|
||||
"timestamp": util.MapStr{
|
||||
"order": "desc",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
q := &orm.Query{
|
||||
RawQuery: util.MustToJSONBytes(queryDsl),
|
||||
|
|
|
@ -0,0 +1,140 @@
|
|||
/* Copyright © INFINI Ltd. All rights reserved.
|
||||
* Web: https://infinilabs.com
|
||||
* Email: hello#infini.ltd */
|
||||
|
||||
package api
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
log "github.com/cihub/seelog"
|
||||
"github.com/valyala/fasttemplate"
|
||||
"infini.sh/console/modules/agent/common"
|
||||
"infini.sh/framework/core/api/rbac"
|
||||
httprouter "infini.sh/framework/core/api/router"
|
||||
"infini.sh/framework/core/global"
|
||||
"infini.sh/framework/core/util"
|
||||
"os"
|
||||
|
||||
"net/http"
|
||||
"path"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
var tokens = sync.Map{}
|
||||
type Token struct {
|
||||
CreatedAt time.Time
|
||||
UserID string
|
||||
}
|
||||
|
||||
const ExpiredIn = time.Millisecond * 1000 * 60 * 20
|
||||
func (h *APIHandler) generateInstallCommand(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
||||
claims, ok := req.Context().Value("user").(*rbac.UserClaims)
|
||||
if !ok {
|
||||
h.WriteError(w, "user not found", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
var (
|
||||
t *Token
|
||||
tokenStr string
|
||||
)
|
||||
tokens.Range(func(key, value any) bool {
|
||||
if v, ok := value.(*Token); ok && claims.UserId == v.UserID {
|
||||
t = v
|
||||
tokenStr = key.(string)
|
||||
return false
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
if t == nil {
|
||||
tokenStr = util.GetUUID()
|
||||
t = &Token{
|
||||
CreatedAt: time.Now(),
|
||||
UserID: claims.UserId,
|
||||
}
|
||||
}else{
|
||||
if t.CreatedAt.Add(ExpiredIn).Before(time.Now()){
|
||||
tokens.Delete(tokenStr)
|
||||
tokenStr = util.GetUUID()
|
||||
t = &Token{
|
||||
CreatedAt: time.Now(),
|
||||
UserID: claims.UserId,
|
||||
}
|
||||
}else{
|
||||
t.CreatedAt = time.Now()
|
||||
}
|
||||
}
|
||||
tokens.Store(tokenStr, t)
|
||||
agCfg := common.GetAgentConfig()
|
||||
consoleEndpoint := agCfg.Setup.ConsoleEndpoint
|
||||
if consoleEndpoint == "" {
|
||||
scheme := "http"
|
||||
if req.TLS != nil {
|
||||
scheme = "https"
|
||||
}
|
||||
consoleEndpoint = fmt.Sprintf("%s://%s", scheme, req.Host)
|
||||
}
|
||||
h.WriteJSON(w, util.MapStr{
|
||||
"script": fmt.Sprintf(`sudo bash -c "$(curl -L '%s/agent/install.sh?token=%s')"`, consoleEndpoint, tokenStr),
|
||||
"token": tokenStr,
|
||||
"expired_at": t.CreatedAt.Add(ExpiredIn),
|
||||
}, http.StatusOK)
|
||||
}
|
||||
|
||||
func (h *APIHandler) getInstallScript(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
||||
tokenStr := h.GetParameter(req, "token")
|
||||
if strings.TrimSpace(tokenStr) == "" {
|
||||
h.WriteError(w, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
if v, ok := tokens.Load(tokenStr); !ok {
|
||||
h.WriteError(w, "token is invalid", http.StatusUnauthorized)
|
||||
return
|
||||
}else{
|
||||
if t, ok := v.(*Token); !ok || t.CreatedAt.Add(ExpiredIn).Before(time.Now()) {
|
||||
tokens.Delete(tokenStr)
|
||||
h.WriteError(w, "token was expired", http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
}
|
||||
agCfg := common.GetAgentConfig()
|
||||
caCert, clientCertPEM, clientKeyPEM, err := common.GenerateServerCert(agCfg.Setup.CACertFile, agCfg.Setup.CAKeyFile)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
scriptTplPath := path.Join(global.Env().GetConfigDir(), "install_agent.tpl")
|
||||
buf, err := os.ReadFile(scriptTplPath)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
tpl := fasttemplate.New(string(buf), "{{", "}}")
|
||||
downloadURL := agCfg.Setup.DownloadURL
|
||||
if downloadURL == "" {
|
||||
downloadURL = "https://release.infinilabs.com/agent/stable/"
|
||||
}
|
||||
port := agCfg.Setup.Port
|
||||
if port == "" {
|
||||
port = "8080"
|
||||
}
|
||||
_, err = tpl.Execute(w, map[string]interface{}{
|
||||
"base_url": agCfg.Setup.DownloadURL,
|
||||
"agent_version": agCfg.Setup.Version,
|
||||
"console_endpoint": agCfg.Setup.ConsoleEndpoint,
|
||||
"client_crt": clientCertPEM,
|
||||
"client_key": clientKeyPEM,
|
||||
"ca_crt": caCert,
|
||||
"port": port,
|
||||
"token": tokenStr,
|
||||
})
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
}
|
||||
|
|
@ -2,7 +2,7 @@
|
|||
* Web: https://infinilabs.com
|
||||
* Email: hello#infini.ltd */
|
||||
|
||||
package common
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@ -14,9 +14,38 @@ import (
|
|||
"net/http"
|
||||
)
|
||||
|
||||
type Client struct {
|
||||
var defaultClient ClientAPI
|
||||
|
||||
func GetClient() ClientAPI {
|
||||
if defaultClient == nil {
|
||||
panic("agent client not init")
|
||||
}
|
||||
return defaultClient
|
||||
}
|
||||
|
||||
func RegisterClient(client ClientAPI) {
|
||||
defaultClient = client
|
||||
}
|
||||
type ClientAPI interface {
|
||||
GetHostInfo(ctx context.Context, agentBaseURL string) (*host.HostInfo, error)
|
||||
GetElasticProcess(ctx context.Context, agentBaseURL string, agentID string)(interface{}, error)
|
||||
GetElasticLogFiles(ctx context.Context, agentBaseURL string, logsPath string)(interface{}, error)
|
||||
GetElasticLogFileContent(ctx context.Context, agentBaseURL string, body interface{})(interface{}, error)
|
||||
GetInstanceBasicInfo(ctx context.Context, agentBaseURL string) (*agent.Instance, error)
|
||||
RegisterElasticsearch(ctx context.Context, agentBaseURL string, cfgs []elastic.ElasticsearchConfig) error
|
||||
GetElasticsearchNodes(ctx context.Context, agentBaseURL string) ([]agent.ESNodeInfo, error)
|
||||
AuthESNode(ctx context.Context, agentBaseURL string, cfg elastic.ElasticsearchConfig) (*agent.ESNodeInfo, error)
|
||||
CreatePipeline(ctx context.Context, agentBaseURL string, body []byte) error
|
||||
DeletePipeline(ctx context.Context, agentBaseURL, pipelineID string) error
|
||||
SetKeystoreValue(ctx context.Context, agentBaseURL string, key, value string) error
|
||||
SaveDynamicConfig(ctx context.Context, agentBaseURL string, name, content string) error
|
||||
}
|
||||
|
||||
type Client struct {
|
||||
Executor Executor
|
||||
}
|
||||
|
||||
|
||||
func (client *Client) GetHostInfo(ctx context.Context, agentBaseURL string) (*host.HostInfo, error) {
|
||||
req := &util.Request{
|
||||
Method: http.MethodGet,
|
||||
|
@ -188,16 +217,37 @@ func (client *Client) DeletePipeline(ctx context.Context, agentBaseURL, pipeline
|
|||
return client.doRequest(req, nil)
|
||||
}
|
||||
|
||||
func (client *Client) doRequest(req *util.Request, respObj interface{}) error {
|
||||
result, err := util.ExecuteRequest(req)
|
||||
if err != nil {
|
||||
return err
|
||||
func (client *Client) SetKeystoreValue(ctx context.Context, agentBaseURL string, key, value string) error{
|
||||
body := util.MapStr{
|
||||
"key": key,
|
||||
"value": value,
|
||||
}
|
||||
if result.StatusCode != 200 {
|
||||
return fmt.Errorf(string(result.Body))
|
||||
req := &util.Request{
|
||||
Method: http.MethodPost,
|
||||
Url: fmt.Sprintf("%s/_framework/keystore", agentBaseURL),
|
||||
Context: ctx,
|
||||
Body: util.MustToJSONBytes(body),
|
||||
}
|
||||
if respObj == nil {
|
||||
return nil
|
||||
}
|
||||
return util.FromJSONBytes(result.Body, respObj)
|
||||
return client.doRequest(req, nil)
|
||||
}
|
||||
|
||||
func (client *Client) SaveDynamicConfig(ctx context.Context, agentBaseURL string, name, content string) error{
|
||||
body := util.MapStr{
|
||||
"configs": util.MapStr{
|
||||
name: content,
|
||||
},
|
||||
}
|
||||
req := &util.Request{
|
||||
Method: http.MethodPost,
|
||||
Url: fmt.Sprintf("%s/agent/config", agentBaseURL),
|
||||
Context: ctx,
|
||||
Body: util.MustToJSONBytes(body),
|
||||
}
|
||||
return client.doRequest(req, nil)
|
||||
}
|
||||
|
||||
|
||||
func (client *Client) doRequest(req *util.Request, respObj interface{}) error {
|
||||
return client.Executor.DoRequest(req, respObj)
|
||||
}
|
||||
|
|
@ -0,0 +1,100 @@
|
|||
/* Copyright © INFINI Ltd. All rights reserved.
|
||||
* Web: https://infinilabs.com
|
||||
* Email: hello#infini.ltd */
|
||||
|
||||
package client
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"infini.sh/console/modules/agent/common"
|
||||
"infini.sh/framework/core/util"
|
||||
"io"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
type Executor interface {
|
||||
DoRequest(req *util.Request, respObj interface{}) error
|
||||
}
|
||||
|
||||
type HttpExecutor struct {
|
||||
}
|
||||
|
||||
func (executor *HttpExecutor) DoRequest(req *util.Request, respObj interface{}) error {
|
||||
result, err := util.ExecuteRequest(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if result.StatusCode != 200 {
|
||||
return fmt.Errorf(string(result.Body))
|
||||
}
|
||||
if respObj == nil {
|
||||
return nil
|
||||
}
|
||||
return util.FromJSONBytes(result.Body, respObj)
|
||||
}
|
||||
|
||||
func NewMTLSExecutor(caCertFile, caKeyFile string) (*MTLSExecutor, error){
|
||||
var (
|
||||
instanceCrt string
|
||||
instanceKey string
|
||||
)
|
||||
instanceCrt, instanceKey, err := common.GetAgentInstanceCerts(caCertFile, caKeyFile)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("generate tls cert error: %w", err)
|
||||
}
|
||||
hClient, err := util.NewMTLSClient(caCertFile, instanceCrt, instanceKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &MTLSExecutor{
|
||||
CaCertFile: caCertFile,
|
||||
CAKeyFile: caKeyFile,
|
||||
client: hClient,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type MTLSExecutor struct {
|
||||
CaCertFile string
|
||||
CAKeyFile string
|
||||
client *http.Client
|
||||
}
|
||||
|
||||
|
||||
func (executor *MTLSExecutor) DoRequest(req *util.Request, respObj interface{}) error {
|
||||
var reader io.Reader
|
||||
if len(req.Body) > 0 {
|
||||
reader = bytes.NewReader(req.Body)
|
||||
}
|
||||
var (
|
||||
hr *http.Request
|
||||
err error
|
||||
)
|
||||
if req.Context == nil {
|
||||
hr, err = http.NewRequest(req.Method, req.Url, reader)
|
||||
}else{
|
||||
hr, err = http.NewRequestWithContext(req.Context, req.Method, req.Url, reader)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
res, err := executor.client.Do(hr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer res.Body.Close()
|
||||
buf, err := io.ReadAll(res.Body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if res.StatusCode != 200 {
|
||||
return fmt.Errorf(string(buf))
|
||||
}
|
||||
if respObj != nil {
|
||||
err = util.FromJSONBytes(buf, respObj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,97 @@
|
|||
/* Copyright © INFINI Ltd. All rights reserved.
|
||||
* Web: https://infinilabs.com
|
||||
* Email: hello#infini.ltd */
|
||||
|
||||
package common
|
||||
|
||||
import (
|
||||
"crypto/rsa"
|
||||
"crypto/x509"
|
||||
"encoding/pem"
|
||||
"infini.sh/framework/core/global"
|
||||
"infini.sh/framework/core/util"
|
||||
"os"
|
||||
"path"
|
||||
)
|
||||
|
||||
func GenerateClientCert(caFile, caKey string) (caCert, clientCertPEM, clientKeyPEM []byte, err error){
|
||||
return generateCert(caFile, caKey, false)
|
||||
}
|
||||
|
||||
func GenerateServerCert(caFile, caKey string) (caCert, serverCertPEM, serverKeyPEM []byte, err error){
|
||||
return generateCert(caFile, caKey, true)
|
||||
}
|
||||
|
||||
func generateCert(caFile, caKey string, isServer bool)(caCert, instanceCertPEM, instanceKeyPEM []byte, err error){
|
||||
pool := x509.NewCertPool()
|
||||
if caFile == "" {
|
||||
caFile = path.Join(global.Env().GetConfigDir(), "certs", "ca.crt")
|
||||
}
|
||||
caCert, err = os.ReadFile(caFile)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
pool.AppendCertsFromPEM(caCert)
|
||||
b, _ := pem.Decode(caCert)
|
||||
var rootCert *x509.Certificate
|
||||
caCertBytes := b.Bytes
|
||||
rootCert, err = x509.ParseCertificate(b.Bytes)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if caKey == "" {
|
||||
caKey = path.Join(global.Env().GetConfigDir(), "certs", "ca.key")
|
||||
}
|
||||
var keyBytes []byte
|
||||
keyBytes, err = os.ReadFile(caKey)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
b, _ = pem.Decode(keyBytes)
|
||||
certKey, err := util.ParsePrivateKey(b.Bytes)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if isServer{
|
||||
b = &pem.Block{Type: "CERTIFICATE", Bytes: caCertBytes}
|
||||
certPEM := pem.EncodeToMemory(b)
|
||||
instanceCertPEM, instanceKeyPEM, err = util.GenerateServerCert(rootCert, certKey.(*rsa.PrivateKey), certPEM, nil)
|
||||
}else{
|
||||
_, instanceCertPEM, instanceKeyPEM = util.GetClientCert(rootCert, certKey)
|
||||
}
|
||||
return caCert, instanceCertPEM, instanceKeyPEM, nil
|
||||
}
|
||||
|
||||
func GetAgentInstanceCerts(caFile, caKey string) (string, string, error) {
|
||||
dataDir := global.Env().GetDataDir()
|
||||
instanceCrt := path.Join(dataDir, "certs/agent/instance.crt")
|
||||
instanceKey := path.Join(dataDir, "certs/agent/instance.key")
|
||||
var (
|
||||
err error
|
||||
clientCertPEM []byte
|
||||
clientKeyPEM []byte
|
||||
)
|
||||
if util.FileExists(instanceCrt) && util.FileExists(instanceKey) {
|
||||
return instanceCrt, instanceKey, nil
|
||||
}
|
||||
_, clientCertPEM, clientKeyPEM, err = GenerateClientCert(caFile, caKey)
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
baseDir := path.Join(dataDir, "certs/agent")
|
||||
if !util.IsExist(baseDir){
|
||||
err = os.MkdirAll(baseDir, 0775)
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
}
|
||||
_, err = util.FilePutContentWithByte(instanceCrt, clientCertPEM)
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
_, err = util.FilePutContentWithByte(instanceKey, clientKeyPEM)
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
return instanceCrt, instanceKey, nil
|
||||
}
|
|
@ -4,18 +4,17 @@
|
|||
|
||||
package common
|
||||
|
||||
type AgentConfig struct {
|
||||
Enabled bool `config:"enabled"`
|
||||
StateManager struct{
|
||||
Enabled bool `config:"enabled"`
|
||||
} `config:"state_manager"`
|
||||
Setup SetupConfig `config:"setup"`
|
||||
}
|
||||
import (
|
||||
"infini.sh/console/modules/agent/model"
|
||||
"infini.sh/framework/core/env"
|
||||
)
|
||||
|
||||
type SetupConfig struct {
|
||||
DownloadURL string `config:"download_url"`
|
||||
Version string `config:"version"`
|
||||
CACertFile string `config:"ca_cert"`
|
||||
CAKeyFile string `config:"ca_key"`
|
||||
ScriptEndpoint string `config:"script_endpoint"`
|
||||
|
||||
func GetAgentConfig() *model.AgentConfig {
|
||||
agentCfg := &model.AgentConfig{}
|
||||
_, err := env.ParseConfig("agent", agentCfg )
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return agentCfg
|
||||
}
|
|
@ -6,13 +6,18 @@ package common
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"infini.sh/console/modules/agent/model"
|
||||
"infini.sh/framework/core/agent"
|
||||
"infini.sh/framework/core/credential"
|
||||
"infini.sh/framework/core/elastic"
|
||||
"infini.sh/framework/core/event"
|
||||
"infini.sh/framework/core/orm"
|
||||
"infini.sh/framework/core/util"
|
||||
log "src/github.com/cihub/seelog"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func ParseAgentSettings(settings []agent.Setting)(*ParseAgentSettingsResult, error){
|
||||
func ParseAgentSettings(settings []agent.Setting)(*model.ParseAgentSettingsResult, error){
|
||||
var clusterCfgs []elastic.ElasticsearchConfig
|
||||
var (
|
||||
pipelines []util.MapStr
|
||||
|
@ -32,6 +37,7 @@ func ParseAgentSettings(settings []agent.Setting)(*ParseAgentSettingsResult, err
|
|||
Enabled: true,
|
||||
Name: cfg.Name,
|
||||
BasicAuth: cfg.BasicAuth,
|
||||
//todo get endpoint from agent node info
|
||||
Endpoint: setting.Metadata.Labels["endpoint"].(string),
|
||||
}
|
||||
newCfg.ID = clusterID
|
||||
|
@ -49,7 +55,7 @@ func ParseAgentSettings(settings []agent.Setting)(*ParseAgentSettingsResult, err
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
taskSetting := TaskSetting{}
|
||||
taskSetting := model.TaskSetting{}
|
||||
err = util.FromJSONBytes(vBytes, &taskSetting)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -61,51 +67,55 @@ func ParseAgentSettings(settings []agent.Setting)(*ParseAgentSettingsResult, err
|
|||
pipelines = append(pipelines, partPipelines...)
|
||||
toDeletePipelineNames = append(toDeletePipelineNames, partDeletePipelineNames...)
|
||||
}
|
||||
return &ParseAgentSettingsResult{
|
||||
return &model.ParseAgentSettingsResult{
|
||||
ClusterConfigs: clusterCfgs,
|
||||
Pipelines: pipelines,
|
||||
ToDeletePipelineNames: toDeletePipelineNames,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// GetAgentSettings query agent setting by agent id and updated timestamp,
|
||||
// if there has any setting was updated, then return setting list includes settings not changed,
|
||||
// otherwise return empty setting list
|
||||
func GetAgentSettings(agentID string, timestamp int64) ([]agent.Setting, error) {
|
||||
queryDsl := util.MapStr{
|
||||
"size": 500,
|
||||
"query": util.MapStr{
|
||||
"bool": util.MapStr{
|
||||
"must": []util.MapStr{
|
||||
{
|
||||
"term": util.MapStr{
|
||||
"metadata.category": util.MapStr{
|
||||
"value": "agent",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
"term": util.MapStr{
|
||||
"metadata.name": util.MapStr{
|
||||
"value": "task",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
"term": util.MapStr{
|
||||
"metadata.labels.agent_id": util.MapStr{
|
||||
"value": agentID,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
"range": util.MapStr{
|
||||
"updated": util.MapStr{
|
||||
"gt": timestamp,
|
||||
},
|
||||
query := util.MapStr{
|
||||
"bool": util.MapStr{
|
||||
"must": []util.MapStr{
|
||||
{
|
||||
"term": util.MapStr{
|
||||
"metadata.category": util.MapStr{
|
||||
"value": "agent",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
"term": util.MapStr{
|
||||
"metadata.name": util.MapStr{
|
||||
"value": "task",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
"term": util.MapStr{
|
||||
"metadata.labels.agent_id": util.MapStr{
|
||||
"value": agentID,
|
||||
},
|
||||
},
|
||||
},
|
||||
//{
|
||||
// "range": util.MapStr{
|
||||
// "updated": util.MapStr{
|
||||
// "gt": timestamp,
|
||||
// },
|
||||
// },
|
||||
//},
|
||||
},
|
||||
},
|
||||
}
|
||||
queryDsl := util.MapStr{
|
||||
"size": 500,
|
||||
"query": query,
|
||||
}
|
||||
q := orm.Query{
|
||||
RawQuery: util.MustToJSONBytes(queryDsl),
|
||||
}
|
||||
|
@ -116,7 +126,10 @@ func GetAgentSettings(agentID string, timestamp int64) ([]agent.Setting, error)
|
|||
if len(result.Result) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
var settings []agent.Setting
|
||||
var (
|
||||
settings []agent.Setting
|
||||
hasUpdated bool
|
||||
)
|
||||
for _, row := range result.Result {
|
||||
setting := agent.Setting{}
|
||||
buf, err := util.ToJSONBytes(row)
|
||||
|
@ -127,12 +140,18 @@ func GetAgentSettings(agentID string, timestamp int64) ([]agent.Setting, error)
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if setting.Updated != nil && setting.Updated.UnixMilli() > timestamp {
|
||||
hasUpdated = true
|
||||
}
|
||||
settings = append(settings, setting)
|
||||
}
|
||||
if !hasUpdated {
|
||||
return nil, nil
|
||||
}
|
||||
return settings, nil
|
||||
}
|
||||
|
||||
func TransformSettingsToConfig(setting *TaskSetting, clusterID, nodeUUID string) ([]util.MapStr, []string, error) {
|
||||
func TransformSettingsToConfig(setting *model.TaskSetting, clusterID, nodeUUID string) ([]util.MapStr, []string, error) {
|
||||
if setting == nil {
|
||||
return nil, nil, fmt.Errorf("empty setting")
|
||||
}
|
||||
|
@ -202,9 +221,35 @@ func TransformSettingsToConfig(setting *TaskSetting, clusterID, nodeUUID string)
|
|||
toDeletePipelineNames = append(toDeletePipelineNames, getMetricPipelineName(nodeUUID, processorName))
|
||||
}
|
||||
}
|
||||
if setting.Logs != nil {
|
||||
var processorName = "es_logs_processor"
|
||||
if setting.Logs.Enabled {
|
||||
params := util.MapStr{
|
||||
"elasticsearch": clusterID,
|
||||
"queue_name": "logs",
|
||||
}
|
||||
if setting.Logs.LogsPath != "" {
|
||||
params["logs_path"] = setting.Logs.LogsPath
|
||||
}
|
||||
cfg := util.MapStr{
|
||||
processorName: params,
|
||||
}
|
||||
enabled := true
|
||||
pipelineCfg := util.MapStr{
|
||||
"enabled": &enabled,
|
||||
"name": fmt.Sprintf("collect_%s_es_logs", nodeUUID),
|
||||
"auto_start": true,
|
||||
"keep_running": true,
|
||||
"retry_delay_in_ms": 3000,
|
||||
"processor": []util.MapStr{cfg},
|
||||
}
|
||||
pipelines = append(pipelines, pipelineCfg)
|
||||
}
|
||||
}
|
||||
return pipelines, toDeletePipelineNames, nil
|
||||
}
|
||||
|
||||
|
||||
func newClusterMetricPipeline(processorName string, clusterID string)(util.MapStr, error){
|
||||
cfg := util.MapStr{
|
||||
processorName: util.MapStr{
|
||||
|
@ -227,3 +272,237 @@ func getMetricPipelineName(clusterID, processorName string) string{
|
|||
return fmt.Sprintf("collect_%s_%s", clusterID, processorName)
|
||||
}
|
||||
|
||||
|
||||
func LoadAgentsFromES(clusterID string) ([]agent.Instance, error) {
|
||||
q := orm.Query{
|
||||
Size: 1000,
|
||||
}
|
||||
if clusterID != "" {
|
||||
q.Conds = orm.And(orm.Eq("id", clusterID))
|
||||
}
|
||||
err, result := orm.Search(agent.Instance{}, &q)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("query agent error: %w", err)
|
||||
}
|
||||
|
||||
if len(result.Result) > 0 {
|
||||
var agents = make([]agent.Instance, 0, len(result.Result))
|
||||
for _, row := range result.Result {
|
||||
ag := agent.Instance{}
|
||||
bytes := util.MustToJSONBytes(row)
|
||||
err = util.FromJSONBytes(bytes, &ag)
|
||||
if err != nil {
|
||||
log.Errorf("got unexpected agent: %s, error: %v", string(bytes), err)
|
||||
continue
|
||||
}
|
||||
agents = append(agents, ag)
|
||||
}
|
||||
return agents, nil
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func GetLatestOnlineAgentIDs(agentIds []string, lastSeconds int) (map[string]struct{}, error) {
|
||||
q := orm.Query{
|
||||
WildcardIndex: true,
|
||||
}
|
||||
mustQ := []util.MapStr{
|
||||
{
|
||||
"term": util.MapStr{
|
||||
"metadata.name": util.MapStr{
|
||||
"value": "agent",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
"term": util.MapStr{
|
||||
"metadata.category": util.MapStr{
|
||||
"value": "instance",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
if len(agentIds) > 0 {
|
||||
mustQ = append(mustQ, util.MapStr{
|
||||
"terms": util.MapStr{
|
||||
"agent.id": agentIds,
|
||||
},
|
||||
})
|
||||
}
|
||||
queryDSL := util.MapStr{
|
||||
"_source": "agent.id",
|
||||
"sort": []util.MapStr{
|
||||
{
|
||||
"timestamp": util.MapStr{
|
||||
"order": "desc",
|
||||
},
|
||||
},
|
||||
},
|
||||
"collapse": util.MapStr{
|
||||
"field": "agent.id",
|
||||
},
|
||||
"query": util.MapStr{
|
||||
"bool": util.MapStr{
|
||||
"filter": []util.MapStr{
|
||||
{
|
||||
"range": util.MapStr{
|
||||
"timestamp": util.MapStr{
|
||||
"gte": fmt.Sprintf("now-%ds", lastSeconds),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
"must": mustQ,
|
||||
},
|
||||
},
|
||||
}
|
||||
q.RawQuery = util.MustToJSONBytes(queryDSL)
|
||||
err, result := orm.Search(event.Event{}, &q)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("query agent instance metric error: %w", err)
|
||||
}
|
||||
agentIDs := map[string]struct{}{}
|
||||
if len(result.Result) > 0 {
|
||||
searchRes := elastic.SearchResponse{}
|
||||
err = util.FromJSONBytes(result.Raw, &searchRes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
agentIDKeyPath := []string{"agent", "id"}
|
||||
for _, hit := range searchRes.Hits.Hits {
|
||||
agentID, _ := util.GetMapValueByKeys(agentIDKeyPath, hit.Source)
|
||||
if v, ok := agentID.(string); ok {
|
||||
agentIDs[v] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
return agentIDs, nil
|
||||
}
|
||||
|
||||
func GetAgentIngestConfig() (string, *elastic.BasicAuth, error) {
|
||||
agCfg := GetAgentConfig()
|
||||
var (
|
||||
endpoint string
|
||||
ok bool
|
||||
)
|
||||
if endpoint, ok = agCfg.Setup.IngestClusterEndpoint.(string);ok {
|
||||
if endpoint = strings.TrimSpace(endpoint); endpoint == "" {
|
||||
return "", nil, fmt.Errorf("config ingest_cluster_endpoint must not be empty")
|
||||
}
|
||||
}
|
||||
var (
|
||||
basicAuth elastic.BasicAuth
|
||||
)
|
||||
if agCfg.Setup.IngestClusterCredentialID != "" {
|
||||
cred := credential.Credential{}
|
||||
cred.ID = agCfg.Setup.IngestClusterCredentialID
|
||||
_, err := orm.Get(&cred)
|
||||
if err != nil {
|
||||
return "", nil, fmt.Errorf("query credential [%s] error: %w", cred.ID, err)
|
||||
}
|
||||
info, err := cred.Decode()
|
||||
if err != nil {
|
||||
return "", nil, fmt.Errorf("decode credential [%s] error: %w", cred.ID, err)
|
||||
}
|
||||
if basicAuth, ok = info.(elastic.BasicAuth); !ok {
|
||||
log.Debug("invalid credential: ", cred)
|
||||
}
|
||||
}
|
||||
tpl := `elasticsearch:
|
||||
- name: default
|
||||
enabled: true
|
||||
endpoint: %s
|
||||
discovery:
|
||||
enabled: true
|
||||
basic_auth:
|
||||
username: %s
|
||||
password: $[[keystore.ingest_cluster_password]]
|
||||
metrics:
|
||||
enabled: true
|
||||
queue: metrics
|
||||
network:
|
||||
enabled: true
|
||||
summary: true
|
||||
details: true
|
||||
memory:
|
||||
metrics:
|
||||
- swap
|
||||
- memory
|
||||
disk:
|
||||
metrics:
|
||||
- iops
|
||||
- usage
|
||||
cpu:
|
||||
metrics:
|
||||
- idle
|
||||
- system
|
||||
- user
|
||||
- iowait
|
||||
- load
|
||||
instance:
|
||||
enabled: true
|
||||
pipeline:
|
||||
- name: logs_indexing_merge
|
||||
auto_start: true
|
||||
keep_running: true
|
||||
processor:
|
||||
- indexing_merge:
|
||||
index_name: ".infini_logs"
|
||||
elasticsearch: "default"
|
||||
input_queue: "logs"
|
||||
idle_timeout_in_seconds: 10
|
||||
output_queue:
|
||||
name: "logs_requests"
|
||||
label:
|
||||
tag: "logs"
|
||||
worker_size: 1
|
||||
bulk_size_in_mb: 10
|
||||
- name: ingest_logs
|
||||
auto_start: true
|
||||
keep_running: true
|
||||
processor:
|
||||
- bulk_indexing:
|
||||
bulk:
|
||||
compress: true
|
||||
batch_size_in_mb: 5
|
||||
batch_size_in_docs: 5000
|
||||
consumer:
|
||||
fetch_max_messages: 100
|
||||
queues:
|
||||
type: indexing_merge
|
||||
tag: "logs"
|
||||
when:
|
||||
cluster_available: ["default"]
|
||||
- name: metrics_indexing_merge
|
||||
auto_start: true
|
||||
keep_running: true
|
||||
processor:
|
||||
- indexing_merge:
|
||||
elasticsearch: "default"
|
||||
index_name: ".infini_metrics"
|
||||
input_queue: "metrics"
|
||||
output_queue:
|
||||
name: "metrics_requests"
|
||||
label:
|
||||
tag: "metrics"
|
||||
worker_size: 1
|
||||
bulk_size_in_mb: 5
|
||||
- name: ingest_metrics
|
||||
auto_start: true
|
||||
keep_running: true
|
||||
processor:
|
||||
- bulk_indexing:
|
||||
bulk:
|
||||
compress: true
|
||||
batch_size_in_mb: 5
|
||||
batch_size_in_docs: 5000
|
||||
consumer:
|
||||
fetch_max_messages: 100
|
||||
queues:
|
||||
type: indexing_merge
|
||||
tag: "metrics"
|
||||
when:
|
||||
cluster_available: ["default"]`
|
||||
tpl = fmt.Sprintf(tpl, endpoint, basicAuth.Username)
|
||||
return tpl, &basicAuth, nil
|
||||
}
|
|
@ -1,65 +0,0 @@
|
|||
/* Copyright © INFINI Ltd. All rights reserved.
|
||||
* Web: https://infinilabs.com
|
||||
* Email: hello#infini.ltd */
|
||||
|
||||
package common
|
||||
|
||||
import (
|
||||
"context"
|
||||
"infini.sh/framework/core/agent"
|
||||
"infini.sh/framework/core/elastic"
|
||||
"infini.sh/framework/core/host"
|
||||
)
|
||||
|
||||
var defaultClient ClientAPI
|
||||
|
||||
func GetClient() ClientAPI {
|
||||
if defaultClient == nil {
|
||||
panic("agent client not init")
|
||||
}
|
||||
return defaultClient
|
||||
}
|
||||
|
||||
func RegisterClient(client ClientAPI) {
|
||||
defaultClient = client
|
||||
}
|
||||
type ClientAPI interface {
|
||||
GetHostInfo(ctx context.Context, agentBaseURL string) (*host.HostInfo, error)
|
||||
GetElasticProcess(ctx context.Context, agentBaseURL string, agentID string)(interface{}, error)
|
||||
GetElasticLogFiles(ctx context.Context, agentBaseURL string, logsPath string)(interface{}, error)
|
||||
GetElasticLogFileContent(ctx context.Context, agentBaseURL string, body interface{})(interface{}, error)
|
||||
GetInstanceBasicInfo(ctx context.Context, agentBaseURL string) (*agent.Instance, error)
|
||||
RegisterElasticsearch(ctx context.Context, agentBaseURL string, cfgs []elastic.ElasticsearchConfig) error
|
||||
GetElasticsearchNodes(ctx context.Context, agentBaseURL string) ([]agent.ESNodeInfo, error)
|
||||
AuthESNode(ctx context.Context, agentBaseURL string, cfg elastic.ElasticsearchConfig) (*agent.ESNodeInfo, error)
|
||||
CreatePipeline(ctx context.Context, agentBaseURL string, body []byte) error
|
||||
DeletePipeline(ctx context.Context, agentBaseURL, pipelineID string) error
|
||||
}
|
||||
|
||||
|
||||
var stateManager IStateManager
|
||||
|
||||
func GetStateManager() IStateManager {
|
||||
if stateManager == nil {
|
||||
panic("agent state manager not init")
|
||||
}
|
||||
return stateManager
|
||||
}
|
||||
|
||||
func RegisterStateManager(sm IStateManager) {
|
||||
stateManager = sm
|
||||
}
|
||||
|
||||
func IsEnabled() bool {
|
||||
return stateManager != nil
|
||||
}
|
||||
|
||||
type IStateManager interface {
|
||||
GetAgent(ID string) (*agent.Instance, error)
|
||||
UpdateAgent(inst *agent.Instance, syncToES bool) (*agent.Instance, error)
|
||||
GetTaskAgent(clusterID string) (*agent.Instance, error)
|
||||
DeleteAgent(agentID string) error
|
||||
LoopState()
|
||||
Stop()
|
||||
GetAgentClient() ClientAPI
|
||||
}
|
|
@ -0,0 +1,24 @@
|
|||
/* Copyright © INFINI Ltd. All rights reserved.
|
||||
* Web: https://infinilabs.com
|
||||
* Email: hello#infini.ltd */
|
||||
|
||||
package model
|
||||
|
||||
type AgentConfig struct {
|
||||
Enabled bool `config:"enabled"`
|
||||
StateManager struct{
|
||||
Enabled bool `config:"enabled"`
|
||||
} `config:"state_manager"`
|
||||
Setup *SetupConfig `config:"setup"`
|
||||
}
|
||||
|
||||
type SetupConfig struct {
|
||||
DownloadURL string `config:"download_url"`
|
||||
Version string `config:"version"`
|
||||
CACertFile string `config:"ca_cert"`
|
||||
CAKeyFile string `config:"ca_key"`
|
||||
ConsoleEndpoint string `config:"console_endpoint"`
|
||||
IngestClusterEndpoint interface{} `config:"ingest_cluster_endpoint"`
|
||||
IngestClusterCredentialID string `config:"ingest_cluster_credential_id"`
|
||||
Port string `config:"port"`
|
||||
}
|
|
@ -0,0 +1,10 @@
|
|||
/* Copyright © INFINI Ltd. All rights reserved.
|
||||
* Web: https://infinilabs.com
|
||||
* Email: hello#infini.ltd */
|
||||
|
||||
package model
|
||||
|
||||
const (
|
||||
StatusOnline string = "online"
|
||||
StatusOffline = "offline"
|
||||
)
|
|
@ -2,7 +2,7 @@
|
|||
* Web: https://infinilabs.com
|
||||
* Email: hello#infini.ltd */
|
||||
|
||||
package common
|
||||
package model
|
||||
|
||||
import (
|
||||
"infini.sh/framework/core/elastic"
|
||||
|
@ -36,6 +36,7 @@ type NodeStatsTask struct {
|
|||
|
||||
type LogsTask struct {
|
||||
Enabled bool `json:"enabled"`
|
||||
LogsPath string `json:"logs_path"`
|
||||
}
|
||||
|
||||
type ParseAgentSettingsResult struct {
|
|
@ -2,51 +2,75 @@
|
|||
* Web: https://infinilabs.com
|
||||
* Email: hello#infini.ltd */
|
||||
|
||||
package common
|
||||
package state
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/buger/jsonparser"
|
||||
log "github.com/cihub/seelog"
|
||||
"infini.sh/console/modules/agent/client"
|
||||
"infini.sh/console/modules/agent/common"
|
||||
"infini.sh/console/modules/agent/model"
|
||||
"infini.sh/framework/core/agent"
|
||||
"infini.sh/framework/core/elastic"
|
||||
"infini.sh/framework/core/event"
|
||||
"infini.sh/framework/core/host"
|
||||
"infini.sh/framework/core/kv"
|
||||
"infini.sh/framework/core/orm"
|
||||
"infini.sh/framework/core/util"
|
||||
"infini.sh/framework/modules/elastic"
|
||||
"runtime"
|
||||
"runtime/debug"
|
||||
"strings"
|
||||
"src/gopkg.in/yaml.v2"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
StatusOnline string = "online"
|
||||
StatusOffline = "offline"
|
||||
)
|
||||
var stateManager IStateManager
|
||||
|
||||
func GetStateManager() IStateManager {
|
||||
if stateManager == nil {
|
||||
panic("agent state manager not init")
|
||||
}
|
||||
return stateManager
|
||||
}
|
||||
|
||||
func RegisterStateManager(sm IStateManager) {
|
||||
stateManager = sm
|
||||
}
|
||||
|
||||
func IsEnabled() bool {
|
||||
return stateManager != nil
|
||||
}
|
||||
|
||||
type IStateManager interface {
|
||||
GetAgent(ID string) (*agent.Instance, error)
|
||||
UpdateAgent(inst *agent.Instance, syncToES bool) (*agent.Instance, error)
|
||||
GetTaskAgent(clusterID string) (*agent.Instance, error)
|
||||
DeleteAgent(agentID string) error
|
||||
LoopState()
|
||||
Stop()
|
||||
GetAgentClient() client.ClientAPI
|
||||
}
|
||||
|
||||
type StateManager struct {
|
||||
TTL time.Duration // kv ttl
|
||||
KVKey string
|
||||
stopC chan struct{}
|
||||
stopCompleteC chan struct{}
|
||||
agentClient *Client
|
||||
agentClient *client.Client
|
||||
agentIds map[string]string
|
||||
agentMutex sync.Mutex
|
||||
workerChan chan struct{}
|
||||
timestamps map[string]int64
|
||||
}
|
||||
|
||||
func NewStateManager(TTL time.Duration, kvKey string, agentIds map[string]string) *StateManager {
|
||||
func NewStateManager(TTL time.Duration, kvKey string, agentIds map[string]string, agentClient *client.Client) *StateManager {
|
||||
return &StateManager{
|
||||
TTL: TTL,
|
||||
KVKey: kvKey,
|
||||
stopC: make(chan struct{}),
|
||||
stopCompleteC: make(chan struct{}),
|
||||
agentClient: &Client{},
|
||||
agentClient: agentClient,
|
||||
agentIds: agentIds,
|
||||
workerChan: make(chan struct{}, runtime.NumCPU()),
|
||||
timestamps: map[string]int64{},
|
||||
|
@ -54,7 +78,7 @@ func NewStateManager(TTL time.Duration, kvKey string, agentIds map[string]string
|
|||
}
|
||||
|
||||
func (sm *StateManager) checkAgentStatus() {
|
||||
onlineAgentIDs, err := GetLatestOnlineAgentIDs(nil, int(sm.TTL.Seconds()))
|
||||
onlineAgentIDs, err := common.GetLatestOnlineAgentIDs(nil, int(sm.TTL.Seconds()))
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return
|
||||
|
@ -64,32 +88,32 @@ func (sm *StateManager) checkAgentStatus() {
|
|||
for agentID := range onlineAgentIDs {
|
||||
if _, ok := sm.agentIds[agentID]; !ok {
|
||||
log.Infof("status of agent [%s] changed to online", agentID)
|
||||
sm.agentIds[agentID] = StatusOnline
|
||||
sm.agentIds[agentID] = model.StatusOnline
|
||||
}
|
||||
}
|
||||
sm.agentMutex.Unlock()
|
||||
for agentID, status := range sm.agentIds {
|
||||
if _, ok := onlineAgentIDs[agentID]; ok {
|
||||
sm.syncSettings(agentID)
|
||||
host.UpdateHostAgentStatus(agentID, StatusOnline)
|
||||
if status == StatusOnline {
|
||||
host.UpdateHostAgentStatus(agentID, model.StatusOnline)
|
||||
if status == model.StatusOnline {
|
||||
continue
|
||||
}
|
||||
// status change to online
|
||||
sm.agentIds[agentID] = StatusOnline
|
||||
sm.agentIds[agentID] = model.StatusOnline
|
||||
log.Infof("status of agent [%s] changed to online", agentID)
|
||||
//set timestamp equals 0 to create pipeline
|
||||
sm.timestamps[agentID] = 0
|
||||
continue
|
||||
}else{
|
||||
// already offline
|
||||
if status == StatusOffline {
|
||||
if status == model.StatusOffline {
|
||||
continue
|
||||
}
|
||||
}
|
||||
// status change to offline
|
||||
// todo validate whether agent is offline
|
||||
sm.agentIds[agentID] = StatusOffline
|
||||
sm.agentIds[agentID] = model.StatusOffline
|
||||
sm.workerChan <- struct{}{}
|
||||
go func(agentID string) {
|
||||
defer func() {
|
||||
|
@ -101,10 +125,12 @@ func (sm *StateManager) checkAgentStatus() {
|
|||
}()
|
||||
ag, err := sm.GetAgent(agentID)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
if err != elastic.ErrNotFound {
|
||||
log.Error(err)
|
||||
}
|
||||
return
|
||||
}
|
||||
ag.Status = StatusOffline
|
||||
ag.Status = model.StatusOffline
|
||||
log.Infof("agent [%s] is offline", ag.Endpoint)
|
||||
_, err = sm.UpdateAgent(ag, true)
|
||||
if err != nil {
|
||||
|
@ -112,15 +138,22 @@ func (sm *StateManager) checkAgentStatus() {
|
|||
return
|
||||
}
|
||||
//update host agent status
|
||||
host.UpdateHostAgentStatus(ag.ID, StatusOffline)
|
||||
host.UpdateHostAgentStatus(ag.ID, model.StatusOffline)
|
||||
}(agentID)
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func (sm *StateManager) syncSettings(agentID string) {
|
||||
ag, err := sm.GetAgent(agentID)
|
||||
if err != nil {
|
||||
if err != elastic.ErrNotFound {
|
||||
log.Errorf("get agent error: %v", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
newTimestamp := time.Now().UnixMilli()
|
||||
settings, err := GetAgentSettings(agentID, sm.timestamps[agentID])
|
||||
settings, err := common.GetAgentSettings(agentID, sm.timestamps[agentID])
|
||||
if err != nil {
|
||||
log.Errorf("query agent settings error: %v", err)
|
||||
return
|
||||
|
@ -129,46 +162,52 @@ func (sm *StateManager) syncSettings(agentID string) {
|
|||
log.Debugf("got no settings of agent [%s]", agentID)
|
||||
return
|
||||
}
|
||||
parseResult, err := ParseAgentSettings(settings)
|
||||
parseResult, err := common.ParseAgentSettings(settings)
|
||||
if err != nil {
|
||||
log.Errorf("parse agent settings error: %v", err)
|
||||
return
|
||||
}
|
||||
ag, err := sm.GetAgent(agentID)
|
||||
agClient := sm.GetAgentClient()
|
||||
var clusterCfgs []util.MapStr
|
||||
if len(parseResult.ClusterConfigs) > 0 {
|
||||
for _, cfg := range parseResult.ClusterConfigs {
|
||||
clusterCfg := util.MapStr{
|
||||
"name": cfg.ID,
|
||||
"enabled": true,
|
||||
"endpoint": cfg.Endpoint,
|
||||
}
|
||||
if cfg.BasicAuth != nil && cfg.BasicAuth.Password != ""{
|
||||
err = agClient.SetKeystoreValue(context.Background(), ag.GetEndpoint(), fmt.Sprintf("%s_password", cfg.ID), cfg.BasicAuth.Password)
|
||||
if err != nil {
|
||||
log.Errorf("set keystore value error: %v", err)
|
||||
continue
|
||||
}
|
||||
clusterCfg["basic_auth"] = util.MapStr{
|
||||
"username": cfg.BasicAuth.Username,
|
||||
"password": fmt.Sprintf("$[[keystore.%s_password]]", cfg.ID),
|
||||
}
|
||||
}
|
||||
clusterCfgs = append(clusterCfgs, clusterCfg)
|
||||
}
|
||||
}
|
||||
var dynamicCfg = util.MapStr{}
|
||||
if len(clusterCfgs) > 0 {
|
||||
dynamicCfg["elasticsearch"] = clusterCfgs
|
||||
}
|
||||
if len(parseResult.Pipelines) > 0 {
|
||||
dynamicCfg["pipeline"] = parseResult.Pipelines
|
||||
}
|
||||
cfgBytes, err := yaml.Marshal(dynamicCfg)
|
||||
if err != nil {
|
||||
log.Errorf("get agent error: %v", err)
|
||||
log.Error("serialize config to yaml error: ", err)
|
||||
return
|
||||
}
|
||||
agClient := sm.GetAgentClient()
|
||||
if len(parseResult.ClusterConfigs) > 0 {
|
||||
err = agClient.RegisterElasticsearch(nil, ag.GetEndpoint(), parseResult.ClusterConfigs)
|
||||
if err != nil {
|
||||
log.Errorf("register elasticsearch config error: %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
for _, pipelineID := range parseResult.ToDeletePipelineNames {
|
||||
err = agClient.DeletePipeline(context.Background(), ag.GetEndpoint(), pipelineID)
|
||||
if err != nil {
|
||||
if !strings.Contains(err.Error(), "not found") {
|
||||
log.Errorf("delete pipeline error: %v", err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
//todo update delete pipeline state
|
||||
}
|
||||
for _, pipeline := range parseResult.Pipelines {
|
||||
err = agClient.CreatePipeline(context.Background(), ag.GetEndpoint(), util.MustToJSONBytes(pipeline))
|
||||
if err != nil {
|
||||
log.Errorf("create pipeline error: %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
err = agClient.SaveDynamicConfig(context.Background(), ag.GetEndpoint(), "dynamic_task", string(cfgBytes))
|
||||
sm.timestamps[agentID] = newTimestamp
|
||||
}
|
||||
|
||||
func (sm *StateManager) getAvailableAgent(clusterID string) (*agent.Instance, error) {
|
||||
agents, err := LoadAgentsFromES(clusterID)
|
||||
agents, err := common.LoadAgentsFromES(clusterID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -216,7 +255,7 @@ func (sm *StateManager) GetAgent(ID string) (*agent.Instance, error) {
|
|||
if time.Since(timestamp) > sm.TTL {
|
||||
exists, err := orm.Get(inst)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get agent [%s] error: %w", ID, err)
|
||||
return nil, err
|
||||
}
|
||||
if !exists {
|
||||
return nil, fmt.Errorf("can not found agent [%s]", ID)
|
||||
|
@ -261,112 +300,6 @@ func (sm *StateManager) DeleteAgent(agentID string) error {
|
|||
return kv.DeleteKey(sm.KVKey, []byte(agentID))
|
||||
}
|
||||
|
||||
func (sm *StateManager) GetAgentClient() ClientAPI {
|
||||
func (sm *StateManager) GetAgentClient() client.ClientAPI {
|
||||
return sm.agentClient
|
||||
}
|
||||
|
||||
func LoadAgentsFromES(clusterID string) ([]agent.Instance, error) {
|
||||
q := orm.Query{
|
||||
Size: 1000,
|
||||
}
|
||||
if clusterID != "" {
|
||||
q.Conds = orm.And(orm.Eq("id", clusterID))
|
||||
}
|
||||
err, result := orm.Search(agent.Instance{}, &q)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("query agent error: %w", err)
|
||||
}
|
||||
|
||||
if len(result.Result) > 0 {
|
||||
var agents = make([]agent.Instance, 0, len(result.Result))
|
||||
for _, row := range result.Result {
|
||||
ag := agent.Instance{}
|
||||
bytes := util.MustToJSONBytes(row)
|
||||
err = util.FromJSONBytes(bytes, &ag)
|
||||
if err != nil {
|
||||
log.Errorf("got unexpected agent: %s, error: %v", string(bytes), err)
|
||||
continue
|
||||
}
|
||||
agents = append(agents, ag)
|
||||
}
|
||||
return agents, nil
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func GetLatestOnlineAgentIDs(agentIds []string, lastSeconds int) (map[string]struct{}, error) {
|
||||
q := orm.Query{
|
||||
WildcardIndex: true,
|
||||
}
|
||||
mustQ := []util.MapStr{
|
||||
{
|
||||
"term": util.MapStr{
|
||||
"metadata.name": util.MapStr{
|
||||
"value": "agent",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
"term": util.MapStr{
|
||||
"metadata.category": util.MapStr{
|
||||
"value": "instance",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
if len(agentIds) > 0 {
|
||||
mustQ = append(mustQ, util.MapStr{
|
||||
"terms": util.MapStr{
|
||||
"agent.id": agentIds,
|
||||
},
|
||||
})
|
||||
}
|
||||
queryDSL := util.MapStr{
|
||||
"_source": "agent.id",
|
||||
"sort": []util.MapStr{
|
||||
{
|
||||
"timestamp": util.MapStr{
|
||||
"order": "desc",
|
||||
},
|
||||
},
|
||||
},
|
||||
"collapse": util.MapStr{
|
||||
"field": "agent.id",
|
||||
},
|
||||
"query": util.MapStr{
|
||||
"bool": util.MapStr{
|
||||
"filter": []util.MapStr{
|
||||
{
|
||||
"range": util.MapStr{
|
||||
"timestamp": util.MapStr{
|
||||
"gte": fmt.Sprintf("now-%ds", lastSeconds),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
"must": mustQ,
|
||||
},
|
||||
},
|
||||
}
|
||||
q.RawQuery = util.MustToJSONBytes(queryDSL)
|
||||
err, result := orm.Search(event.Event{}, &q)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("query agent instance metric error: %w", err)
|
||||
}
|
||||
agentIDs := map[string]struct{}{}
|
||||
if len(result.Result) > 0 {
|
||||
searchRes := elastic.SearchResponse{}
|
||||
err = util.FromJSONBytes(result.Raw, &searchRes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
agentIDKeyPath := []string{"agent", "id"}
|
||||
for _, hit := range searchRes.Hits.Hits {
|
||||
agentID, _ := util.GetMapValueByKeys(agentIDKeyPath, hit.Source)
|
||||
if v, ok := agentID.(string); ok {
|
||||
agentIDs[v] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
return agentIDs, nil
|
||||
}
|
|
@ -261,6 +261,10 @@ func (processor *ActivityProcessor) HandleMessage(ctx *pipeline.Context, qConfig
|
|||
|
||||
ctx1, messages, timeout, err := queue.Consume(qConfig, consumer, offset)
|
||||
|
||||
if len(messages)==0{
|
||||
time.Sleep(time.Millisecond * time.Duration(500))
|
||||
}
|
||||
|
||||
if timeout {
|
||||
log.Tracef("timeout on queue:[%v]", qConfig.Name)
|
||||
ctx.Failed(fmt.Errorf("timeout on queue:[%v]", qConfig.Name))
|
||||
|
|
|
@ -285,6 +285,11 @@ func (processor *MetadataProcessor) HandleMessage(ctx *pipeline.Context, qConfig
|
|||
return
|
||||
}
|
||||
ctx1, messages, isTimeout, err := queue.Consume(qConfig, consumer, offset)
|
||||
|
||||
if len(messages)==0{
|
||||
time.Sleep(time.Millisecond * time.Duration(500))
|
||||
}
|
||||
|
||||
//if timeout{
|
||||
// log.Tracef("timeout on queue:[%v]",qConfig.Name)
|
||||
// ctx.Failed()
|
||||
|
|
|
@ -487,7 +487,7 @@ func (h *APIHandler) refreshIndex(w http.ResponseWriter, req *http.Request, ps h
|
|||
}, 200)
|
||||
}
|
||||
|
||||
func (h *APIHandler) getChildTaskInfosByIndex(taskItem *task2.Task, uniqueIndexName string) (subTasks []task2.Task, pipelineTaskIDs map[string][]string, pipelineSubParentIDs map[string]string, parentIDPipelineTasks map[string][]task2.Task, err error) {
|
||||
func (h *APIHandler) getChildTaskInfosByIndex(taskItem *task2.Task, uniqueIndexName string) (subTasks []task2.Task, runningPipelineTaskIDs map[string][]string, pipelineSubParentIDs map[string]string, parentIDPipelineTasks map[string][]task2.Task, err error) {
|
||||
queryDsl := util.MapStr{
|
||||
"size": 9999,
|
||||
"sort": []util.MapStr{
|
||||
|
@ -526,7 +526,7 @@ func (h *APIHandler) getChildTaskInfosByIndex(taskItem *task2.Task, uniqueIndexN
|
|||
return
|
||||
}
|
||||
|
||||
pipelineTaskIDs = map[string][]string{}
|
||||
runningPipelineTaskIDs = map[string][]string{}
|
||||
pipelineSubParentIDs = map[string]string{}
|
||||
parentIDPipelineTasks = map[string][]task2.Task{}
|
||||
|
||||
|
@ -543,9 +543,6 @@ func (h *APIHandler) getChildTaskInfosByIndex(taskItem *task2.Task, uniqueIndexN
|
|||
subTasks = append(subTasks, subTask)
|
||||
continue
|
||||
}
|
||||
if subTask.Status != task2.StatusRunning {
|
||||
continue
|
||||
}
|
||||
|
||||
// TODO: use more robust logic
|
||||
if pl := len(subTask.ParentId); pl != 2 {
|
||||
|
@ -558,7 +555,9 @@ func (h *APIHandler) getChildTaskInfosByIndex(taskItem *task2.Task, uniqueIndexN
|
|||
if instID == "" {
|
||||
continue
|
||||
}
|
||||
pipelineTaskIDs[instID] = append(pipelineTaskIDs[instID], subTask.ID)
|
||||
if subTask.Status == task2.StatusRunning {
|
||||
runningPipelineTaskIDs[instID] = append(runningPipelineTaskIDs[instID], subTask.ID)
|
||||
}
|
||||
parentIDPipelineTasks[parentID] = append(parentIDPipelineTasks[parentID], subTask)
|
||||
}
|
||||
|
||||
|
|
|
@ -76,6 +76,12 @@ func (p *processor) splitMajorTask(taskItem *task.Task) error {
|
|||
esSourceClient := elastic.GetClient(clusterComparisonTask.Cluster.Source.Id)
|
||||
esTargetClient := elastic.GetClient(clusterComparisonTask.Cluster.Target.Id)
|
||||
|
||||
err = migration_util.DeleteChildTasks(taskItem.ID)
|
||||
if err != nil {
|
||||
log.Warnf("failed to clear child tasks, err: %v", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, index := range clusterComparisonTask.Indices {
|
||||
sourceDump := migration_model.IndexComparisonDumpConfig{
|
||||
ClusterId: clusterComparisonTask.Cluster.Source.Id,
|
||||
|
|
|
@ -78,6 +78,12 @@ func (p *processor) splitMajorMigrationTask(taskItem *task.Task) error {
|
|||
esSourceClient := elastic.GetClient(clusterMigrationTask.Cluster.Source.Id)
|
||||
targetType := common.GetClusterDocType(clusterMigrationTask.Cluster.Target.Id)
|
||||
|
||||
err = migration_util.DeleteChildTasks(taskItem.ID)
|
||||
if err != nil {
|
||||
log.Warnf("failed to clear child tasks, err: %v", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, index := range clusterMigrationTask.Indices {
|
||||
source := migration_model.IndexMigrationSourceConfig{
|
||||
ClusterId: clusterMigrationTask.Cluster.Source.Id,
|
||||
|
|
|
@ -5,7 +5,6 @@ import (
|
|||
"time"
|
||||
|
||||
log "github.com/cihub/seelog"
|
||||
"infini.sh/console/model"
|
||||
migration_model "infini.sh/console/plugin/migration/model"
|
||||
migration_util "infini.sh/console/plugin/migration/util"
|
||||
|
||||
|
@ -64,6 +63,12 @@ func (p *processor) handleSplitSubTask(taskItem *task.Task) error {
|
|||
return fmt.Errorf("got wrong parent id of task [%v]", *taskItem)
|
||||
}
|
||||
|
||||
err = migration_util.DeleteChildTasks(taskItem.ID)
|
||||
if err != nil {
|
||||
log.Warnf("failed to clear child tasks, err: %v", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
var pids []string
|
||||
pids = append(pids, taskItem.ParentId...)
|
||||
pids = append(pids, taskItem.ID)
|
||||
|
@ -443,14 +448,12 @@ func (p *processor) getPipelineTasks(taskItem *task.Task, cfg *migration_model.I
|
|||
func (p *processor) cleanGatewayQueue(taskItem *task.Task) {
|
||||
log.Debugf("cleaning gateway queue for task [%s]", taskItem.ID)
|
||||
|
||||
var err error
|
||||
instance := model.Instance{}
|
||||
instance.ID, _ = util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"])
|
||||
if instance.ID == "" {
|
||||
instanceID, _ := util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"])
|
||||
if instanceID == "" {
|
||||
log.Debugf("task [%s] not scheduled yet, skip cleaning queue", taskItem.ID)
|
||||
return
|
||||
}
|
||||
_, err = orm.Get(&instance)
|
||||
instance, err := p.scheduler.GetInstance(instanceID)
|
||||
if err != nil {
|
||||
log.Errorf("failed to get instance, err: %v", err)
|
||||
return
|
||||
|
|
|
@ -7,7 +7,6 @@ import (
|
|||
|
||||
log "github.com/cihub/seelog"
|
||||
|
||||
"infini.sh/console/model"
|
||||
migration_model "infini.sh/console/plugin/migration/model"
|
||||
migration_util "infini.sh/console/plugin/migration/util"
|
||||
|
||||
|
@ -93,6 +92,13 @@ func (p *processor) handleSplitSubTask(taskItem *task.Task) error {
|
|||
if len(taskItem.ParentId) == 0 {
|
||||
return fmt.Errorf("got wrong parent id of task [%v]", *taskItem)
|
||||
}
|
||||
|
||||
err = migration_util.DeleteChildTasks(taskItem.ID)
|
||||
if err != nil {
|
||||
log.Warnf("failed to clear child tasks, err: %v", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
indexName := cfg.Source.Indices
|
||||
scrollTask := &task.Task{
|
||||
ParentId: pids,
|
||||
|
@ -503,14 +509,12 @@ func (p *processor) getScrollBulkPipelineTasks(taskItem *task.Task) (scrollTask
|
|||
func (p *processor) cleanGatewayQueue(taskItem *task.Task) {
|
||||
log.Debugf("cleaning gateway queue for task [%s]", taskItem.ID)
|
||||
|
||||
var err error
|
||||
instance := model.Instance{}
|
||||
instance.ID, _ = util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"])
|
||||
if instance.ID == "" {
|
||||
instanceID, _ := util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"])
|
||||
if instanceID == "" {
|
||||
log.Debugf("task [%s] not scheduled yet, skip cleaning queue", taskItem.ID)
|
||||
return
|
||||
}
|
||||
_, err = orm.Get(&instance)
|
||||
instance, err := p.scheduler.GetInstance(instanceID)
|
||||
if err != nil {
|
||||
log.Errorf("failed to get instance, err: %v", err)
|
||||
return
|
||||
|
@ -530,10 +534,8 @@ func (p *processor) cleanGatewayQueue(taskItem *task.Task) {
|
|||
func (p *processor) resetGatewayQueue(taskItem *task.Task) error {
|
||||
log.Debugf("resetting gateway queue offset for task [%s]", taskItem.ID)
|
||||
|
||||
var err error
|
||||
instance := model.Instance{}
|
||||
instance.ID, _ = util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"])
|
||||
_, err = orm.Get(&instance)
|
||||
instanceID, _ := util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"])
|
||||
instance, err := p.scheduler.GetInstance(instanceID)
|
||||
if err != nil {
|
||||
log.Errorf("failed to get instance, err: %v", err)
|
||||
return err
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
|
||||
type Scheduler interface {
|
||||
GetPreferenceInstance(config ExecutionConfig) (instance *model.Instance, err error)
|
||||
GetInstance(instanceID string) (instance *model.Instance, err error)
|
||||
IncrInstanceJobs(instanceID string)
|
||||
DecrInstanceJobs(instanceID string)
|
||||
RefreshInstanceJobsFromES() error
|
||||
|
|
|
@ -96,7 +96,7 @@ func newMigrationDispatcherProcessor(c *config.Config) (pipeline.Processor, erro
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
processor.pipelineTaskProcessor = pipeline_task.NewProcessor(cfg.Elasticsearch, cfg.IndexName, cfg.LogIndexName)
|
||||
processor.pipelineTaskProcessor = pipeline_task.NewProcessor(cfg.Elasticsearch, cfg.IndexName, cfg.LogIndexName, processor.scheduler)
|
||||
processor.indexMigrationTaskProcessor = index_migration.NewProcessor(cfg.Elasticsearch, cfg.IndexName, processor.scheduler)
|
||||
processor.clusterMigrationTaskProcessor = cluster_migration.NewProcessor(cfg.Elasticsearch, cfg.IndexName, processor.scheduler)
|
||||
processor.indexComparisonTaskProcessor = index_comparison.NewProcessor(cfg.Elasticsearch, cfg.IndexName, processor.scheduler)
|
||||
|
|
|
@ -14,7 +14,6 @@ import (
|
|||
migration_util "infini.sh/console/plugin/migration/util"
|
||||
|
||||
"infini.sh/framework/core/elastic"
|
||||
"infini.sh/framework/core/orm"
|
||||
"infini.sh/framework/core/task"
|
||||
"infini.sh/framework/core/util"
|
||||
)
|
||||
|
@ -23,13 +22,16 @@ type processor struct {
|
|||
Elasticsearch string
|
||||
IndexName string
|
||||
LogIndexName string
|
||||
|
||||
scheduler migration_model.Scheduler
|
||||
}
|
||||
|
||||
func NewProcessor(elasticsearch, indexName, logIndexName string) migration_model.Processor {
|
||||
func NewProcessor(elasticsearch, indexName, logIndexName string, scheduler migration_model.Scheduler) migration_model.Processor {
|
||||
return &processor{
|
||||
Elasticsearch: elasticsearch,
|
||||
IndexName: indexName,
|
||||
LogIndexName: logIndexName,
|
||||
scheduler: scheduler,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -167,9 +169,10 @@ func (p *processor) handlePendingStopPipelineTask(taskItem *task.Task) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (p *processor) cleanGatewayPipeline(taskItem *task.Task) (instance model.Instance, err error) {
|
||||
func (p *processor) cleanGatewayPipeline(taskItem *task.Task) (instance *model.Instance, err error) {
|
||||
instance, err = p.getPipelineExecutionInstance(taskItem)
|
||||
if err != nil {
|
||||
log.Errorf("failed to get execution instance for task [%s], err: %v", taskItem.ID, err)
|
||||
return
|
||||
}
|
||||
err = instance.DeletePipeline(taskItem.ID)
|
||||
|
@ -181,19 +184,13 @@ func (p *processor) cleanGatewayPipeline(taskItem *task.Task) (instance model.In
|
|||
return instance, nil
|
||||
}
|
||||
|
||||
func (p *processor) getPipelineExecutionInstance(taskItem *task.Task) (instance model.Instance, err error) {
|
||||
instanceID := taskItem.Metadata.Labels["execution_instance_id"]
|
||||
instance.ID, err = util.ExtractString(instanceID)
|
||||
func (p *processor) getPipelineExecutionInstance(taskItem *task.Task) (*model.Instance, error) {
|
||||
instanceID, _ := util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"])
|
||||
instance, err := p.scheduler.GetInstance(instanceID)
|
||||
if err != nil {
|
||||
log.Error("failed to get execution_instance_id")
|
||||
return
|
||||
return nil, err
|
||||
}
|
||||
_, err = orm.Get(&instance)
|
||||
if err != nil {
|
||||
log.Errorf("failed to get instance, err: %v", err)
|
||||
return
|
||||
}
|
||||
return
|
||||
return instance, nil
|
||||
}
|
||||
|
||||
func (p *processor) getPipelineLogs(taskItem *task.Task, status []string, timestampGte int64) ([]util.MapStr, error) {
|
||||
|
|
|
@ -1,8 +1,10 @@
|
|||
package scheduler
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -13,10 +15,13 @@ import (
|
|||
|
||||
"infini.sh/framework/core/elastic"
|
||||
"infini.sh/framework/core/orm"
|
||||
"infini.sh/framework/core/pipeline"
|
||||
"infini.sh/framework/core/task"
|
||||
"infini.sh/framework/core/util"
|
||||
)
|
||||
|
||||
const initializeInterval = 10 * time.Second
|
||||
|
||||
type scheduler struct {
|
||||
Elasticsearch string
|
||||
IndexName string
|
||||
|
@ -28,9 +33,11 @@ type scheduler struct {
|
|||
}
|
||||
|
||||
type DispatcherState struct {
|
||||
Total int
|
||||
Total int
|
||||
LastInitializedAt time.Time
|
||||
}
|
||||
|
||||
// NOTE: currently we assume task are scheduled sequentially, so GetInstance/GetPreferenceInstance doesn't need to handle locking
|
||||
func NewScheduler(elasticsearch, indexName string, checkInstanceAvailable bool, maxTasksPerInstance int) (migration_model.Scheduler, error) {
|
||||
scheduler := &scheduler{
|
||||
Elasticsearch: elasticsearch,
|
||||
|
@ -78,19 +85,147 @@ func (p *scheduler) GetPreferenceInstance(config migration_model.ExecutionConfig
|
|||
return nil, fmt.Errorf("no available instance")
|
||||
}
|
||||
|
||||
instance := model.Instance{}
|
||||
instance.ID = minID
|
||||
|
||||
_, err = orm.Get(&instance)
|
||||
instance, err := p.GetInstance(minID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if p.getInstanceState(minID).Total > p.MaxTasksPerInstance {
|
||||
return nil, migration_model.ErrHitMax
|
||||
}
|
||||
return instance, nil
|
||||
}
|
||||
|
||||
func (p *scheduler) GetInstance(instanceID string) (*model.Instance, error) {
|
||||
if instanceID == "" {
|
||||
return nil, errors.New("invalid instanceID")
|
||||
}
|
||||
instance := model.Instance{}
|
||||
instance.ID = instanceID
|
||||
|
||||
_, err := orm.Get(&instance)
|
||||
if err != nil {
|
||||
log.Errorf("failed to get instance [%s] from orm, err: %v", instance.ID, err)
|
||||
return nil, err
|
||||
}
|
||||
err = p.initializeInstance(&instance)
|
||||
if err != nil {
|
||||
log.Warnf("failed to initialized instance [%s], err: %v", instance.ID, err)
|
||||
}
|
||||
return &instance, nil
|
||||
}
|
||||
|
||||
func (p *scheduler) initializeInstance(instance *model.Instance) error {
|
||||
lastInitializedAt := p.getLastInitializedAt(instance.ID)
|
||||
if time.Now().Sub(lastInitializedAt) < initializeInterval {
|
||||
return nil
|
||||
}
|
||||
|
||||
status, err := instance.GetPipeline("pipeline_logging_merge")
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), "pipeline not found") {
|
||||
log.Infof("pipeline_logging_merge not found on instance [%s], initializing", instance.ID)
|
||||
err := p.createPipelineLoggingMerge(instance)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
} else if status.State == pipeline.STOPPED {
|
||||
log.Infof("pipeline_logging_merge stopped on instance [%s], starting", instance.ID)
|
||||
err = instance.StartPipeline("pipeline_logging_merge")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
status, err = instance.GetPipeline("ingest_pipeline_logging")
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), "pipeline not found") {
|
||||
log.Infof("ingest_pipeline_logging not found on instance [%s], initializing", instance.ID)
|
||||
err := p.createIngestPipelineLogging(instance)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
} else if status.State == pipeline.STOPPED {
|
||||
log.Infof("ingest_pipeline_logging stopped on instance [%s], starting", instance.ID)
|
||||
err = instance.StartPipeline("ingest_pipeline_logging")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
p.setLastInitializedAt(instance.ID, time.Now())
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO: now we're using the same configuraiton as the default gateway.yml
|
||||
// user could change the following configurations manually:
|
||||
// - input_queue (metrics.logging_queue)
|
||||
// - elasticsearch (elasticsearch.name)
|
||||
func (p *scheduler) createPipelineLoggingMerge(instance *model.Instance) error {
|
||||
cfg := &migration_model.PipelineTaskConfig{
|
||||
Name: "pipeline_logging_merge",
|
||||
AutoStart: true,
|
||||
KeepRunning: true,
|
||||
Processor: []util.MapStr{
|
||||
util.MapStr{
|
||||
"indexing_merge": util.MapStr{
|
||||
"input_queue": "logging",
|
||||
"idle_timeout_in_seconds": 1,
|
||||
"elasticsearch": "logging-server",
|
||||
"index_name": ".infini_logs",
|
||||
"output_queue": util.MapStr{
|
||||
"name": "gateway-pipeline-logs",
|
||||
"label": util.MapStr{
|
||||
"tag": "pipeline_logging",
|
||||
},
|
||||
},
|
||||
"worker_size": 1,
|
||||
"bulk_size_in_kb": 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
err := instance.CreatePipeline(util.MustToJSONBytes(cfg))
|
||||
if err != nil {
|
||||
log.Errorf("create pipeline_logging_merge [%s] failed, err: %+v", instance.ID, err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *scheduler) createIngestPipelineLogging(instance *model.Instance) error {
|
||||
cfg := &migration_model.PipelineTaskConfig{
|
||||
Name: "ingest_pipeline_logging",
|
||||
AutoStart: true,
|
||||
KeepRunning: true,
|
||||
Processor: []util.MapStr{
|
||||
util.MapStr{
|
||||
"bulk_indexing": util.MapStr{
|
||||
"bulk": util.MapStr{
|
||||
"compress": true,
|
||||
"batch_size_in_mb": 1,
|
||||
"batch_size_in_docs": 1,
|
||||
},
|
||||
"consumer": util.MapStr{
|
||||
"fetch_max_messages": 100,
|
||||
},
|
||||
"queues": util.MapStr{
|
||||
"type": "indexing_merge",
|
||||
"tag": "pipeline_logging",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
err := instance.CreatePipeline(util.MustToJSONBytes(cfg))
|
||||
if err != nil {
|
||||
log.Errorf("create ingest_pipeline_logging [%s] failed, err: %+v", instance.ID, err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *scheduler) RefreshInstanceJobsFromES() error {
|
||||
log.Debug("refreshing instance state from ES")
|
||||
p.stateLock.Lock()
|
||||
|
@ -123,6 +258,24 @@ func (p *scheduler) IncrInstanceJobs(instanceID string) {
|
|||
p.state[instanceID] = instanceState
|
||||
}
|
||||
|
||||
func (p *scheduler) getLastInitializedAt(instanceID string) time.Time {
|
||||
p.stateLock.Lock()
|
||||
defer p.stateLock.Unlock()
|
||||
if st, ok := p.state[instanceID]; ok {
|
||||
return st.LastInitializedAt
|
||||
}
|
||||
return time.Time{}
|
||||
}
|
||||
|
||||
func (p *scheduler) setLastInitializedAt(instanceID string, t time.Time) {
|
||||
p.stateLock.Lock()
|
||||
defer p.stateLock.Unlock()
|
||||
if st, ok := p.state[instanceID]; ok {
|
||||
st.LastInitializedAt = t
|
||||
p.state[instanceID] = st
|
||||
}
|
||||
}
|
||||
|
||||
func (p *scheduler) getInstanceState(instanceID string) DispatcherState {
|
||||
p.stateLock.Lock()
|
||||
defer p.stateLock.Unlock()
|
||||
|
|
|
@ -10,6 +10,29 @@ import (
|
|||
"infini.sh/framework/core/util"
|
||||
)
|
||||
|
||||
func DeleteChildTasks(taskID string) error {
|
||||
q := util.MapStr{
|
||||
"query": util.MapStr{
|
||||
"bool": util.MapStr{
|
||||
"must": []util.MapStr{
|
||||
{
|
||||
"term": util.MapStr{
|
||||
"parent_id": util.MapStr{
|
||||
"value": taskID,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
err := orm.DeleteBy(&task.Task{}, util.MustToJSONBytes(q))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func GetPendingChildTasks(elasticsearch, indexName string, taskID string, taskType string) ([]task.Task, error) {
|
||||
return GetChildTasks(elasticsearch, indexName, taskID, taskType, []string{task.StatusRunning, task.StatusPendingStop, task.StatusReady})
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue