fix build

This commit is contained in:
medcl 2023-10-16 17:51:45 +08:00
parent 3673791467
commit a315c4330d
25 changed files with 1780 additions and 1209 deletions

View File

@ -3,7 +3,7 @@
set -eo pipefail
function print_usage() {
echo "Usage: curl -sSL http://get.infini.sh/agent.html | sudo bash -s -- [-u url_for_download_program] [-v version_for_program ] [-t taget_install_dir] [-p prot_for_program]"
echo "Usage: curl -sSL http://get.infini.cloud/ | sudo bash -s -- [-u url_for_download_program] [-v version_for_program ] [-t target_install_dir] [-p port_for_program]"
echo "Options:"
echo " -u, --url <url> Download url of the program to install which default is http://localhost"
echo " -v, --version <version> Version of the program to install which default is latest from "
@ -226,6 +226,15 @@ resource_limit.memory.max_in_bytes: 533708800
stats:
include_storage_stats_in_api: false
elastic:
skip_init_metadata_on_start: true
health_check:
enabled: true
interval: 60s
availability_check:
enabled: false
interval: 60s
disk_queue:
max_msg_size: 20485760
max_bytes_per_file: 20485760
@ -254,6 +263,21 @@ badger:
value_log_max_entries: 1000000
value_log_file_size: 104857600
configs:
#for managed client's setting
managed: true # managed by remote servers
panic_on_config_error: false #ignore config error
interval: "1s"
servers: # config servers
- "http://localhost:9000"
max_backup_files: 5
tls: #for mTLS connection with config servers
enabled: true
cert_file: "config/client.crt"
key_file: "config/client.key"
ca_file: "config/ca.crt"
skip_insecure_verify: false
node:
major_ip_pattern: ".*"
EOF

View File

@ -0,0 +1,17 @@
configs.template:
- name: "cluster_a_node_b_task_config"
path: ./config/task_config.tpl
variable:
CLUSTER_ID: infini_default_ingest_cluster
CLUSTER_ENDPOINT: ["https://localhost:9200"]
CLUSTER_USERNAME: "admin"
CLUSTER_VER: "1.6.0"
CLUSTER_DISTRIBUTION: "easysearch"
INDEX_PREFIX: ".infini_"
CLUSTER_LEVEL_TASKS_ENABLED: false
NODE_LEVEL_TASKS_ENABLED: true
NODE_LOGS_PATH: "/opt/easysearch/logs/"
#MANAGED_CONFIG_VERSION: 19
#MANAGED: true

View File

@ -0,0 +1,14 @@
configs.template:
- name: "default_ingest_config"
path: ./config/ingest_config.tpl
variable:
INGEST_CLUSTER_ID: infini_default_ingest_cluster
INGEST_CLUSTER_ENDPOINT: [ "https://localhost:9200" ]
INGEST_CLUSTER_USERNAME: "admin"
CLUSTER_VER: "1.6.0"
CLUSTER_DISTRIBUTION: "easysearch"
INDEX_PREFIX: ".infini_"
#MANAGED_CONFIG_VERSION: 2
#MANAGED: true

31
config_repo/settings.yml Normal file
View File

@ -0,0 +1,31 @@
configs: #define configs group
general_ingest_template: #group name
files:
- ./templates/ingest_config.tpl
- ./templates/task_config.tpl
- ./configs/ingest_config.yml
- ./configs/cluster_xx_node_xx.yml
instances: #define which config instance should fetch
infini_default_system_cluster: #instance group
plugins:
- ingest
instances:
- ck0mkk805f5virpsejp0
- ckjrpdg05f5lrfp8qlng
configs:
- general_ingest_template
secrets:
- ingest_cluster_password
secrets:
ingest_cluster_password: #group name
keystore:
ingest_cluster_password:
type: plaintext
value: "d7cc48e69a41dac719fb"
infini_default_ingest_cluster_password:
type: plaintext
value: "d7cc48e69a41dac719fb"
# infini_default_ingest_cluster_password:
# type: credential
# value: "ckghspo05f5q7pr20ct0" #credential_id

View File

@ -0,0 +1,97 @@
elasticsearch:
- name: $[[INGEST_CLUSTER_ID]]
enabled: true
endpoints: $[[INGEST_CLUSTER_ENDPOINT]]
discovery:
enabled: false
basic_auth:
username: $[[INGEST_CLUSTER_USERNAME]]
password: $[[keystore.ingest_cluster_password]]
metrics:
enabled: true
queue: metrics
network:
enabled: true
summary: true
details: true
memory:
metrics:
- swap
- memory
disk:
metrics:
- iops
- usage
cpu:
metrics:
- idle
- system
- user
- iowait
- load
instance:
enabled: true
elastic:
availability_check:
enabled: false
pipeline:
- name: merge_logs
auto_start: true
keep_running: true
processor:
- indexing_merge:
elasticsearch: "$[[INGEST_CLUSTER_ID]]"
index_name: ".infini_logs"
type_name: "_doc"
input_queue: "logs"
idle_timeout_in_seconds: 10
output_queue:
name: "merged_requests"
worker_size: 1
bulk_size_in_mb: 5
- name: merge_metrics
auto_start: true
keep_running: true
processor:
- indexing_merge:
elasticsearch: "$[[INGEST_CLUSTER_ID]]"
index_name: ".infini_metrics"
type_name: "_doc"
input_queue: "metrics"
output_queue:
name: "merged_requests"
worker_size: 1
bulk_size_in_mb: 5
- name: ingest_merged_requests
enabled: true
auto_start: true
keep_running: true
processor:
- bulk_indexing:
max_worker_size: 1
verbose_bulk_result: false
bulk:
batch_size_in_mb: 5
batch_size_in_docs: 5000
max_retry_times: 0
invalid_queue: ""
response_handle:
include_index_stats: false
include_action_stats: false
output_bulk_stats: false
include_error_details: true
save_error_results: true
save_success_results: false
save_busy_results: false
consumer:
fetch_max_messages: 5
queues:
type: indexing_merge
when:
cluster_available: ["$[[INGEST_CLUSTER_ID]]"]
#MANAGED_CONFIG_VERSION: 16
#MANAGED: true

View File

@ -0,0 +1,87 @@
elasticsearch:
- id: $[[CLUSTER_ID]]
name: $[[CLUSTER_ID]]
enabled: true
endpoints: $[[CLUSTER_ENDPOINT]]
discovery:
enabled: false
basic_auth:
username: $[[CLUSTER_USERNAME]]
password: $[[keystore.$[[CLUSTER_ID]]_password]]
pipeline:
#clsuter level metrics
- auto_start: $[[CLUSTER_LEVEL_TASKS_ENABLED]]
enabled: $[[CLUSTER_LEVEL_TASKS_ENABLED]]
keep_running: true
singleton: true
name: collect_$[[CLUSTER_ID]]_es_cluster_stats
retry_delay_in_ms: 10000
processor:
- es_cluster_stats:
elasticsearch: $[[CLUSTER_ID]]
labels:
cluster_id: $[[CLUSTER_ID]]
when:
cluster_available: ["$[[CLUSTER_ID]]"]
- auto_start: $[[CLUSTER_LEVEL_TASKS_ENABLED]]
enabled: $[[CLUSTER_LEVEL_TASKS_ENABLED]]
keep_running: true
singleton: true
name: collect_$[[CLUSTER_ID]]_es_index_stats
retry_delay_in_ms: 10000
processor:
- es_index_stats:
elasticsearch: $[[CLUSTER_ID]]
labels:
cluster_id: $[[CLUSTER_ID]]
when:
cluster_available: ["$[[CLUSTER_ID]]"]
- auto_start: $[[CLUSTER_LEVEL_TASKS_ENABLED]]
enabled: $[[CLUSTER_LEVEL_TASKS_ENABLED]]
keep_running: true
singleton: true
name: collect_$[[CLUSTER_ID]]_es_cluster_health
retry_delay_in_ms: 10000
processor:
- es_cluster_health:
elasticsearch: $[[CLUSTER_ID]]
labels:
cluster_id: $[[CLUSTER_ID]]
when:
cluster_available: ["$[[CLUSTER_ID]]"]
#node level metrics
- auto_start: $[[NODE_LEVEL_TASKS_ENABLED]]
enabled: $[[NODE_LEVEL_TASKS_ENABLED]]
keep_running: true
name: collect_$[[CLUSTER_ID]]_es_node_stats
retry_delay_in_ms: 10000
processor:
- es_node_stats:
elasticsearch: $[[CLUSTER_ID]]
labels:
cluster_id: $[[CLUSTER_ID]]
when:
cluster_available: ["$[[CLUSTER_ID]]"]
#node logs
- auto_start: $[[NODE_LEVEL_TASKS_ENABLED]]
enabled: $[[NODE_LEVEL_TASKS_ENABLED]]
keep_running: true
name: collect_$[[CLUSTER_ID]]_es_logs
retry_delay_in_ms: 10000
processor:
- es_logs_processor:
elasticsearch: $[[CLUSTER_ID]]
labels:
cluster_id: $[[CLUSTER_ID]]
logs_path: $[[NODE_LOGS_PATH]]
queue_name: logs
when:
cluster_available: ["$[[CLUSTER_ID]]"]
#MANAGED_CONFIG_VERSION: 11
#MANAGED: true

View File

@ -1,6 +1,3 @@
path.configs: "config"
configs.auto_reload: true
#env:
# INFINI_CONSOLE_ENDPOINT: "http://127.0.0.1:9000"
# INGEST_CLUSTER_ENDPOINT: "https://127.0.0.1:9200"
@ -11,6 +8,20 @@ configs.auto_reload: true
# WECOM_WEBHOOK_ENDPOINT:
# FEISHU_WEBHOOK_ENDPOINT:
# must in major config file
path.configs: "config"
configs:
managed: true
auto_reload: true
manager:
local_configs_repo_path: ./config_repo/
tls: #for mTLS connection with config servers
enabled: true
ca_file: config/certs/ca.crt
cert_file: config/certs/ca.crt
key_file: config/certs/ca.key
skip_insecure_verify: false
web:
enabled: true
embedding_api: true
@ -62,6 +73,7 @@ badger:
value_log_max_entries: 1000000
value_log_file_size: 104857600
security:
enabled: true
# authc:

View File

@ -4,6 +4,7 @@ import (
"context"
"errors"
_ "expvar"
api3 "infini.sh/console/modules/agent/api"
"infini.sh/console/plugin/api/email"
model2 "infini.sh/framework/core/model"
_ "time/tzdata"
@ -35,9 +36,9 @@ import (
"infini.sh/framework/modules/task"
"infini.sh/framework/modules/ui"
_ "infini.sh/framework/plugins"
_ "infini.sh/framework/plugins/managed"
api2 "infini.sh/gateway/api"
_ "infini.sh/gateway/proxy"
_ "infini.sh/framework/plugins/managed"
)
var appConfig *config.AppConfig
@ -84,7 +85,7 @@ func main() {
if !global.Env().SetupRequired() {
for _, v := range modules {
module.RegisterModuleWithPriority(v.Value,v.Priority)
module.RegisterModuleWithPriority(v.Value, v.Priority)
}
} else {
for _, v := range modules {
@ -94,6 +95,8 @@ func main() {
api.RegisterAPI("")
api3.Init()
appConfig = &config.AppConfig{
UI: config.UIConfig{
LocalPath: ".public",
@ -103,7 +106,7 @@ func main() {
}
ok, err := env.ParseConfig("web", appConfig)
if err != nil {
if err != nil && global.Env().SystemConfig.Configs.PanicOnConfigError {
panic(err)
}
if !ok {

View File

@ -6,7 +6,7 @@ package model
import (
"fmt"
"infini.sh/framework/core/elastic"
"infini.sh/framework/core/model"
"infini.sh/framework/core/orm"
)
@ -16,7 +16,7 @@ type EmailServer struct {
Host string `json:"host" elastic_mapping:"host:{type:keyword}"`
Port int `json:"port" elastic_mapping:"port:{type:keyword}"`
TLS bool `json:"tls" elastic_mapping:"tls:{type:keyword}"`
Auth *elastic.BasicAuth `json:"auth" elastic_mapping:"auth:{type:object}"`
Auth *model.BasicAuth `json:"auth" elastic_mapping:"auth:{type:object}"`
Enabled bool `json:"enabled" elastic_mapping:"enabled:{type:boolean}"`
CredentialID string `json:"credential_id" elastic_mapping:"credential_id:{type:keyword}"`
}

View File

@ -0,0 +1,353 @@
/* Copyright © INFINI LTD. All rights reserved.
* Web: https://infinilabs.com
* Email: hello#infini.ltd */
package api
import (
"context"
"fmt"
log "github.com/cihub/seelog"
httprouter "infini.sh/framework/core/api/router"
"infini.sh/framework/core/elastic"
"infini.sh/framework/core/model"
"infini.sh/framework/core/orm"
"infini.sh/framework/core/util"
"net/http"
"time"
)
//func (h *APIHandler) refreshESNodesInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
// id := ps.MustGetParameter("instance_id")
// obj := model.Instance{}
// obj.ID = id
// exists, err := orm.Get(&obj)
// if !exists || err != nil {
// h.WriteJSON(w, util.MapStr{
// "_id": id,
// "found": false,
// }, http.StatusNotFound)
// return
// }
// _, err = refreshNodesInfo(&obj)
// if err != nil {
// log.Error(err)
// h.WriteError(w, err.Error(), http.StatusInternalServerError)
// return
// }
// h.WriteAckOKJSON(w)
//}
func refreshNodesInfo(inst *model.Instance) ([]*model.ESNodeInfo, error) {
oldNodesInfo, err := getNodesBindingToAgent(inst)
if err != nil {
return nil, fmt.Errorf("error on get binding nodes info: %w", err)
}
log.Error("oldNodesInfo:",util.MustToJSON(oldNodesInfo))
ctxTimeout, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
nodesInfo, err := GetElasticsearchNodesViaAgent(ctxTimeout, inst)
if err != nil {
//TODO return already biding nodes info ??
return nil, fmt.Errorf("error on get nodes info from agent: %w", err)
}
log.Error("nodesInfo:",util.MustToJSON(nodesInfo))
for _, node := range nodesInfo {
v,ok:=oldNodesInfo[node.NodeUUID]
if ok{
node.ClusterID=v.ClusterID
}
}
return nodesInfo, nil
}
//get nodes info via agent
func GetElasticsearchNodesViaAgent(ctx context.Context, instance *model.Instance) ([]*model.ESNodeInfo, error) {
req := &util.Request{
Method: http.MethodGet,
Path: "/elasticsearch/nodes/_discovery",
Context: ctx,
}
resBody := []*model.ESNodeInfo{}
err := doRequest(instance, req, &resBody)
if err != nil {
return nil, err
}
return resBody, nil
}
func AuthESNode(ctx context.Context, agentBaseURL string, cfg elastic.ElasticsearchConfig) (*model.ESNodeInfo, error) {
reqBody, err := util.ToJSONBytes(cfg)
if err != nil {
return nil, err
}
req := &util.Request{
Method: http.MethodPost,
Path: "/elasticsearch/_auth",
Context: ctx,
Body: reqBody,
}
resBody := &model.ESNodeInfo{}
err = DoRequest(req, resBody)
if err != nil {
return nil, err
}
return resBody, nil
}
func getNodeByPidOrUUID(nodes map[int]*model.ESNodeInfo, pid int, uuid string, port string) *model.ESNodeInfo {
if nodes[pid] != nil {
return nodes[pid]
}
for _, node := range nodes {
if node.NodeUUID != "" && node.NodeUUID == uuid {
return node
}
}
return nil
}
type BindingItem struct {
ClusterID string `json:"cluster_id"`
ClusterUUID string `json:"cluster_uuid"`
NodeUUID string `json:"node_uuid"`
}
//node -> binding item
func getNodesBindingToAgent(instance *model.Instance) (map[string]BindingItem, error) {
//get nodes settings where agent id = instance id
q := orm.Query{
Size: 1000,
Conds: orm.And(orm.Eq("metadata.category", "node_settings"),
orm.Eq("metadata.name", "agent"),
orm.Eq("metadata.labels.agent_id", instance.ID),
),
}
err, result := orm.Search(model.Setting{}, &q)
if err != nil {
return nil, err
}
ids := map[string]BindingItem{}
for _, row := range result.Result {
v, ok := row.(map[string]interface{})
if ok {
x, ok := v["metadata"]
if ok {
y, ok := x.(map[string]interface{})
if ok {
e, ok := y["labels"]
if ok {
f, ok := e.(map[string]interface{})
if ok {
nodeID, ok := f["node_uuid"].(string)
if ok {
item := BindingItem{}
item.ClusterID = f["cluster_id"].(string)
item.ClusterUUID = f["cluster_uuid"].(string)
item.NodeUUID = nodeID
ids[item.NodeUUID] = item
}
}
}
}
}
}
}
return ids, nil
}
func getUnAssociateNodes() (map[string][]model.ESNodeInfo, error) {
query := util.MapStr{
"size": 3000,
"query": util.MapStr{
"bool": util.MapStr{
"must_not": []util.MapStr{
{
"exists": util.MapStr{
"field": "cluster_id",
},
},
},
},
},
}
q := orm.Query{
RawQuery: util.MustToJSONBytes(query),
}
err, result := orm.Search(model.ESNodeInfo{}, &q)
if err != nil {
return nil, err
}
nodesInfo := map[string][]model.ESNodeInfo{}
for _, row := range result.Result {
node := model.ESNodeInfo{}
buf := util.MustToJSONBytes(row)
util.MustFromJSONBytes(buf, &node)
nodesInfo[node.AgentID] = append(nodesInfo[node.AgentID], node)
}
return nodesInfo, nil
}
func GetElasticLogFiles(ctx context.Context, instance *model.Instance, logsPath string) (interface{}, error) {
reqBody := util.MustToJSONBytes(util.MapStr{
"logs_path": logsPath,
})
req := &util.Request{
Method: http.MethodPost,
Path: "/elasticsearch/logs/_list",
Context: ctx,
Body: reqBody,
}
resBody := map[string]interface{}{}
err := doRequest(instance, req, &resBody)
if err != nil {
return nil, err
}
if resBody["success"] != true {
return nil, fmt.Errorf("get elasticsearch log files error: %v", resBody)
}
return resBody["result"], nil
}
func GetElasticLogFileContent(ctx context.Context, instance *model.Instance, body interface{}) (interface{}, error) {
req := &util.Request{
Method: http.MethodPost,
Path: "/elasticsearch/logs/_read",
Context: ctx,
Body: util.MustToJSONBytes(body),
}
resBody := map[string]interface{}{}
err := doRequest(instance, req, &resBody)
if err != nil {
return nil, err
}
if resBody["success"] != true {
return nil, fmt.Errorf("get elasticsearch log files error: %v", resBody["error"])
}
var hasMore bool
if v, ok := resBody["EOF"].(bool); ok && !v {
hasMore = true
}
return map[string]interface{}{
"lines": resBody["result"],
"has_more": hasMore,
}, nil
}
func (h *APIHandler) getLogFilesByNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
nodeID := ps.MustGetParameter("node_id")
inst, node, err := getAgentByNodeID(nodeID)
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err)
return
}
if inst == nil {
log.Error(fmt.Sprintf("can not find agent by node [%s]", nodeID))
h.WriteJSON(w, util.MapStr{
"success": false,
"reason": "AGENT_NOT_FOUND",
}, http.StatusOK)
return
}
logFiles, err := GetElasticLogFiles(nil, inst, node.Path.Logs)
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err)
return
}
h.WriteJSON(w, util.MapStr{
"success": true,
"log_files": logFiles,
}, http.StatusOK)
}
func (h *APIHandler) getLogFileContent(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
nodeID := ps.MustGetParameter("node_id")
inst, node, err := getAgentByNodeID(nodeID)
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err)
return
}
if inst == nil {
h.WriteError(w, fmt.Sprintf("can not find agent by node [%s]", nodeID), http.StatusInternalServerError)
return
}
reqBody := struct {
FileName string `json:"file_name"`
LogsPath string `json:"logs_path"`
Offset int `json:"offset"`
Lines int `json:"lines"`
StartLineNumber int64 `json:"start_line_number"`
}{}
err = h.DecodeJSON(req, &reqBody)
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err)
return
}
reqBody.LogsPath = node.Path.Logs
res, err := GetElasticLogFileContent(nil, inst, reqBody)
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err)
return
}
h.WriteJSON(w, res, http.StatusOK)
}
func getAgentByNodeID(nodeID string) (*model.Instance, *model.ESNodeInfo, error) {
queryDsl := util.MapStr{
"size": 1,
"query": util.MapStr{
"term": util.MapStr{
"node_uuid": util.MapStr{
"value": nodeID,
},
},
},
"sort": []util.MapStr{
{
"timestamp": util.MapStr{
"order": "desc",
},
},
},
}
q := &orm.Query{
RawQuery: util.MustToJSONBytes(queryDsl),
}
err, result := orm.Search(model.ESNodeInfo{}, q)
if err != nil {
return nil, nil, err
}
if len(result.Result) > 0 {
buf := util.MustToJSONBytes(result.Result[0])
v := &model.ESNodeInfo{}
err = util.FromJSONBytes(buf, v)
inst := &model.Instance{}
inst.ID = v.AgentID
_, err = orm.Get(inst)
if err != nil {
return nil, v, err
}
if inst.Name == "" {
return nil, v, nil
}
return inst, v, nil
}
return nil, nil, nil
}

View File

@ -4,223 +4,219 @@
package api
//
//func (h *APIHandler) enrollHost(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
// var reqBody []struct {
// AgentID string `json:"agent_id"`
// HostName string `json:"host_name"`
// IP string `json:"ip"`
// Source string `json:"source"`
// OSName string `json:"os_name"`
// OSArch string `json:"os_arch"`
// NodeID string `json:"node_uuid"`
// }
// err := h.DecodeJSON(req, &reqBody)
// if err != nil {
// log.Error(err)
// h.WriteError(w, err.Error(), http.StatusInternalServerError)
// return
// }
// errors := util.MapStr{}
// for i, hi := range reqBody {
// var (
// hostInfo *host.HostInfo
// )
// switch hi.Source {
// case "agent":
// hostInfo, err = enrollHostFromAgent(hi.AgentID)
// if err != nil {
// errors[hi.IP] = util.MapStr{
// "error": err.Error(),
// }
// log.Error(err)
// continue
// }
// hostInfo.IP = hi.IP
// hostInfo.AgentID = hi.AgentID
// err = orm.Create(nil, hostInfo)
// if err != nil {
// errors[hi.IP] = util.MapStr{
// "error": err.Error(),
// }
// log.Error(err)
// continue
// }
// case "es_node":
// hostInfo = &host.HostInfo{
// IP: hi.IP,
// OSInfo: host.OS{
// Platform: hi.OSName,
// KernelArch: hi.OSArch,
// },
// NodeID: hi.NodeID,
// }
// default:
// errors[hi.IP] = util.MapStr{
// "error": fmt.Errorf("unkonow source type"),
// }
// continue
// }
// hostInfo.Timestamp = time.Now()
// var ctx *orm.Context
// if i == len(reqBody) - 1 {
// ctx = &orm.Context{
// Refresh: "wait_for",
// }
// }
// hostInfo.OSInfo.Platform = strings.ToLower(hostInfo.OSInfo.Platform)
// err = orm.Create(ctx, hostInfo)
// if err != nil {
// errors[hi.IP] = util.MapStr{
// "error": err.Error(),
// }
// log.Error(err)
// continue
// }
// }
// resBody := util.MapStr{
// "success": true,
// }
// if len(errors) > 0 {
// resBody["errors"] = errors
// resBody["success"] = false
// }
//
// h.WriteJSON(w, resBody, http.StatusOK)
//}
//
//func (h *APIHandler) deleteHost(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
// hostID := ps.MustGetParameter("host_id")
// hostInfo, err := getHost(hostID)
// if err != nil {
// log.Error(err)
// h.WriteError(w, err.Error(), http.StatusInternalServerError)
// return
// }
// ctx := orm.Context{
// Refresh: "wait_for",
// }
// err = orm.Delete(&ctx, hostInfo)
// if err != nil {
// log.Error(err)
// h.WriteError(w, err.Error(), http.StatusInternalServerError)
// return
// }
// h.WriteDeletedOKJSON(w, hostID)
//}
//
//func (h *APIHandler) GetHostAgentInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
// hostID := ps.MustGetParameter("host_id")
// hostInfo, err := getHost(hostID)
// if err != nil {
// log.Error(err)
// h.WriteError(w, err.Error(), http.StatusInternalServerError)
// return
// }
// if hostInfo.AgentID == "" {
// h.WriteJSON(w, util.MapStr{}, http.StatusOK)
// return
// }
//
// sm := state.GetStateManager()
// ag, err := sm.GetAgent(hostInfo.AgentID)
// if err != nil {
// log.Error(err)
// h.WriteJSON(w, util.MapStr{}, http.StatusOK)
// return
// }
// aversion, err := ag.GetVersion()
// if err == nil {
// ag.Version = aversion
// orm.Save(nil, ag)
// }
// h.WriteJSON(w, util.MapStr{
// "host_id": hostID,
// "agent_id": ag.ID,
// "version": ag.Version,
// "status": hostInfo.AgentStatus,
// "endpoint": ag.GetEndpoint(),
// }, http.StatusOK)
//}
//
//func getHost(hostID string) (*host.HostInfo, error){
// hostInfo := &host.HostInfo{}
// hostInfo.ID = hostID
// exists, err := orm.Get(hostInfo)
// if err != nil {
// return nil, fmt.Errorf("get host info error: %w", err)
// }
// if !exists {
// return nil, fmt.Errorf("host [%s] not found", hostID)
// }
// return hostInfo, nil
//}
//
//func (h *APIHandler) GetHostElasticProcess(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
// hostID := ps.MustGetParameter("host_id")
// hostInfo := &host.HostInfo{}
// hostInfo.ID = hostID
// exists, err := orm.Get(hostInfo)
// if err != nil {
// log.Error(err)
// h.WriteError(w, err.Error(), http.StatusInternalServerError)
// return
// }
// if !exists {
// h.WriteError(w, fmt.Sprintf("host [%s] not found", hostID), http.StatusNotFound)
// return
// }
// if hostInfo.AgentID == "" {
// h.WriteJSON(w, util.MapStr{}, http.StatusOK)
// return
// }
// sm := state.GetStateManager()
// ag, err := sm.GetAgent(hostInfo.AgentID)
// if err != nil {
// log.Error(err)
// h.WriteJSON(w, util.MapStr{}, http.StatusOK)
// return
// }
// ctx,cancel := context.WithTimeout(context.Background(), time.Second * 10)
// defer cancel()
// esNodesInfo, err := sm.GetAgentClient().GetElasticsearchNodes(ctx, ag.GetEndpoint())
// if err != nil {
// log.Error(err)
// h.WriteError(w, err.Error(), http.StatusInternalServerError)
// return
// }
// var processes []util.MapStr
// for _, node := range esNodesInfo {
// processes = append(processes, util.MapStr{
// "pid": node.ProcessInfo.PID,
// "pid_status": node.ProcessInfo.Status,
// "cluster_name": node.ClusterName,
// "cluster_uuid": node.ClusterUuid,
// "cluster_id": node.ClusterID,
// "node_id": node.NodeUUID,
// "node_name": node.NodeName,
// "uptime_in_ms": time.Now().UnixMilli() - node.ProcessInfo.CreateTime,
// })
// }
// h.WriteJSON(w, util.MapStr{
// "elastic_processes": processes,
// }, http.StatusOK)
//}
//
//func enrollHostFromAgent(agentID string) (*host.HostInfo, error){
// sm := state.GetStateManager()
// ag, err := sm.GetAgent(agentID)
// if err != nil {
// return nil, err
// }
// if ag == nil {
// return nil, fmt.Errorf("can not found agent [%s]", agentID)
// }
// agentClient := sm.GetAgentClient()
// hostInfo, err := agentClient.GetHostInfo(nil, ag.GetEndpoint())
// if err != nil {
// return nil, err
// }
// hostInfo.AgentStatus = ag.Status
// return hostInfo, nil
//}
import (
"context"
"fmt"
log "github.com/cihub/seelog"
httprouter "infini.sh/framework/core/api/router"
"infini.sh/framework/core/host"
"infini.sh/framework/core/model"
"infini.sh/framework/core/orm"
"infini.sh/framework/core/util"
"net/http"
"strings"
"time"
)
func (h *APIHandler) enrollHost(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
var reqBody []struct {
AgentID string `json:"agent_id"`
HostName string `json:"host_name"`
IP string `json:"ip"`
Source string `json:"source"`
OSName string `json:"os_name"`
OSArch string `json:"os_arch"`
NodeID string `json:"node_uuid"`
}
err := h.DecodeJSON(req, &reqBody)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
errors := util.MapStr{}
for i, hi := range reqBody {
var (
hostInfo *host.HostInfo
)
switch hi.Source {
case "agent":
obj := model.Instance{}
obj.ID = hi.AgentID
exists, err := orm.Get(&obj)
if !exists || err != nil {
continue
}
hostInfo = &host.HostInfo{}
hostInfo.IP = hi.IP
hostInfo.AgentID = hi.AgentID
err = orm.Create(nil, hostInfo)
if err != nil {
errors[hi.IP] = util.MapStr{
"error": err.Error(),
}
log.Error(err)
continue
}
case "es_node":
hostInfo = &host.HostInfo{
IP: hi.IP,
OSInfo: host.OS{
Platform: hi.OSName,
KernelArch: hi.OSArch,
},
NodeID: hi.NodeID,
}
default:
errors[hi.IP] = util.MapStr{
"error": fmt.Errorf("unkonow source type"),
}
continue
}
hostInfo.Timestamp = time.Now()
var ctx *orm.Context
if i == len(reqBody) - 1 {
ctx = &orm.Context{
Refresh: "wait_for",
}
}
hostInfo.OSInfo.Platform = strings.ToLower(hostInfo.OSInfo.Platform)
err = orm.Create(ctx, hostInfo)
if err != nil {
errors[hi.IP] = util.MapStr{
"error": err.Error(),
}
log.Error(err)
continue
}
}
resBody := util.MapStr{
"success": true,
}
if len(errors) > 0 {
resBody["errors"] = errors
resBody["success"] = false
}
h.WriteJSON(w, resBody, http.StatusOK)
}
func (h *APIHandler) deleteHost(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
hostID := ps.MustGetParameter("host_id")
hostInfo, err := getHost(hostID)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
ctx := orm.Context{
Refresh: "wait_for",
}
err = orm.Delete(&ctx, hostInfo)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
h.WriteDeletedOKJSON(w, hostID)
}
func (h *APIHandler) GetHostAgentInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
hostID := ps.MustGetParameter("host_id")
hostInfo, err := getHost(hostID)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
if hostInfo.AgentID == "" {
h.WriteJSON(w, util.MapStr{}, http.StatusOK)
return
}
obj := model.Instance{}
obj.ID = hostInfo.AgentID
exists, err := orm.Get(&obj)
if !exists || err != nil {
h.WriteJSON(w, util.MapStr{
"_id": hostInfo.AgentID,
"found": false,
}, http.StatusNotFound)
return
}
h.WriteJSON(w, util.MapStr{
"host_id": hostID,
"agent_id": obj.ID,
"version": obj.Application.Version,
"status": hostInfo.AgentStatus,
"endpoint": obj.GetEndpoint(),
}, http.StatusOK)
}
func getHost(hostID string) (*host.HostInfo, error){
hostInfo := &host.HostInfo{}
hostInfo.ID = hostID
exists, err := orm.Get(hostInfo)
if err != nil {
return nil, fmt.Errorf("get host info error: %w", err)
}
if !exists {
return nil, fmt.Errorf("host [%s] not found", hostID)
}
return hostInfo, nil
}
func (h *APIHandler) GetHostElasticProcess(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
hostID := ps.MustGetParameter("host_id")
hostInfo := &host.HostInfo{}
hostInfo.ID = hostID
exists, err := orm.Get(hostInfo)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
if !exists {
h.WriteError(w, fmt.Sprintf("host [%s] not found", hostID), http.StatusNotFound)
return
}
if hostInfo.AgentID == "" {
h.WriteJSON(w, util.MapStr{}, http.StatusOK)
return
}
obj := model.Instance{}
obj.ID = hostInfo.AgentID
exists, err = orm.Get(&obj)
if !exists || err != nil {
h.WriteJSON(w, util.MapStr{
"_id": hostInfo.AgentID,
"found": false,
}, http.StatusNotFound)
return
}
esNodesInfo, err := GetElasticsearchNodesViaAgent(context.Background(), &obj)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
var processes []util.MapStr
for _, node := range esNodesInfo {
processes = append(processes, util.MapStr{
"pid": node.ProcessInfo.PID,
"pid_status": node.ProcessInfo.Status,
"cluster_name": node.ClusterName,
"cluster_uuid": node.ClusterUuid,
"cluster_id": node.ClusterID,
"node_id": node.NodeUUID,
"node_name": node.NodeName,
"uptime_in_ms": time.Now().UnixMilli() - node.ProcessInfo.CreateTime,
})
}
h.WriteJSON(w, util.MapStr{
"elastic_processes": processes,
}, http.StatusOK)
}

View File

@ -4,32 +4,29 @@
package api
func Init() {
//handler := APIHandler{}
//api.HandleAPIMethod(api.POST, "/instance", handler.registerInstance) //new
import (
"infini.sh/framework/core/api"
"infini.sh/framework/core/api/rbac/enum"
)
func Init() {
handler := APIHandler{}
api.HandleAPIMethod(api.POST, "/host/_enroll", handler.enrollHost)
api.HandleAPIMethod(api.GET, "/host/:host_id/agent/info", handler.GetHostAgentInfo)
api.HandleAPIMethod(api.GET, "/host/:host_id/processes", handler.GetHostElasticProcess)
api.HandleAPIMethod(api.DELETE, "/host/:host_id", handler.deleteHost)
api.HandleAPIMethod(api.POST, "/instance/:instance_id/node/_associate", handler.RequirePermission(handler.associateESNode, enum.PermissionAgentInstanceWrite))
api.HandleAPIMethod(api.POST, "/auto_associate", handler.RequirePermission(handler.autoAssociateESNode, enum.PermissionAgentInstanceWrite))
//api.HandleAPIMethod(api.POST, "/instance/:instance_id/node/_auth", handler.RequirePermission(handler.authESNode, enum.PermissionAgentInstanceWrite))
api.HandleAPIMethod(api.DELETE, "/instance/:instance_id/_nodes", handler.RequirePermission(handler.deleteESNode, enum.PermissionAgentInstanceWrite))
api.HandleAPIMethod(api.GET, "/instance/:instance_id/_nodes", handler.RequirePermission(handler.getESNodesInfo, enum.PermissionAgentInstanceRead))
//get elasticsearch node logs, direct fetch or via stored logs(TODO)
api.HandleAPIMethod(api.GET, "/elasticsearch/:id/node/:node_id/logs/_list", handler.RequirePermission(handler.getLogFilesByNode, enum.PermissionAgentInstanceRead))
api.HandleAPIMethod(api.POST, "/elasticsearch/:id/node/:node_id/logs/_read", handler.RequirePermission(handler.getLogFileContent, enum.PermissionAgentInstanceRead))
//api.HandleAPIMethod(api.POST, "/agent/instance", handler.registerInstance)
//api.HandleAPIMethod(api.GET, "/agent/instance/_search", handler.RequirePermission(handler.searchInstance, enum.PermissionAgentInstanceRead))
//api.HandleAPIMethod(api.GET, "/agent/instance/:instance_id", handler.getInstance)
//api.HandleAPIMethod(api.PUT, "/agent/instance/:instance_id", handler.updateInstance)
//api.HandleAPIMethod(api.DELETE, "/agent/instance/:instance_id", handler.RequirePermission(handler.deleteInstance, enum.PermissionAgentInstanceWrite))
//api.HandleAPIMethod(api.POST, "/agent/instance/_stats", handler.RequirePermission(handler.getInstanceStats, enum.PermissionAgentInstanceRead))
//api.HandleAPIMethod(api.GET, "/agent/log/node/:node_id/files", handler.RequirePermission(handler.getLogFilesByNode, enum.PermissionAgentInstanceRead))
//api.HandleAPIMethod(api.POST, "/agent/log/node/:node_id/_scroll", handler.RequirePermission(handler.getLogFileContent, enum.PermissionAgentInstanceRead))
//api.HandleAPIMethod(api.GET, "/agent/instance/:instance_id/_nodes", handler.RequirePermission(handler.getESNodesInfo, enum.PermissionAgentInstanceRead))
//api.HandleAPIMethod(api.POST, "/agent/instance/:instance_id/_nodes/_refresh", handler.RequirePermission(handler.refreshESNodesInfo, enum.PermissionAgentInstanceWrite))
//api.HandleAPIMethod(api.POST, "/agent/instance/:instance_id/node/_auth", handler.RequirePermission(handler.authESNode, enum.PermissionAgentInstanceWrite))
//api.HandleAPIMethod(api.DELETE, "/agent/instance/:instance_id/_nodes", handler.RequirePermission(handler.deleteESNode, enum.PermissionAgentInstanceWrite))
//api.HandleAPIMethod(api.POST, "/agent/instance/:instance_id/node/_associate", handler.RequirePermission(handler.associateESNode, enum.PermissionAgentInstanceWrite))
//api.HandleAPIMethod(api.POST, "/agent/instance/try_connect", handler.RequireLogin(handler.tryConnect))
//api.HandleAPIMethod(api.POST, "/agent/auto_associate", handler.RequirePermission(handler.autoAssociateESNode, enum.PermissionAgentInstanceWrite))
//
//api.HandleAPIMethod(api.POST, "/host/_enroll", handler.enrollHost)
//api.HandleAPIMethod(api.GET, "/host/:host_id/agent/info",handler.GetHostAgentInfo)
//api.HandleAPIMethod(api.GET, "/host/:host_id/processes",handler.GetHostElasticProcess)
//api.HandleAPIMethod(api.DELETE, "/host/:host_id",handler.deleteHost)
//
//
//api.HandleAPIMethod(api.POST, "/agent/install_command", handler.RequireLogin(handler.generateInstallCommand))
//api.HandleAPIMethod(api.GET, "/agent/install.sh", handler.getInstallScript)
}

View File

@ -11,3 +11,4 @@ import (
type APIHandler struct {
api.Handler
}

View File

@ -1,124 +0,0 @@
/* Copyright © INFINI Ltd. All rights reserved.
* Web: https://infinilabs.com
* Email: hello#infini.ltd */
package api
import (
"fmt"
log "github.com/cihub/seelog"
"infini.sh/console/modules/agent/client"
"infini.sh/console/modules/agent/state"
httprouter "infini.sh/framework/core/api/router"
"infini.sh/framework/core/model"
"infini.sh/framework/core/orm"
"infini.sh/framework/core/util"
"net/http"
)
func (h *APIHandler) getLogFilesByNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
nodeID := ps.MustGetParameter("node_id")
inst, node, err := getAgentByNodeID(nodeID)
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err)
return
}
if inst == nil {
log.Error(fmt.Sprintf("can not find agent by node [%s]", nodeID))
h.WriteJSON(w, util.MapStr{
"success": false,
"reason": "AGENT_NOT_FOUND",
}, http.StatusOK)
return
}
logFiles, err := client.GetClient().GetElasticLogFiles(nil, inst.GetEndpoint(), node.Path.Logs)
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err)
return
}
h.WriteJSON(w, util.MapStr{
"success": true,
"log_files": logFiles,
}, http.StatusOK)
}
func (h *APIHandler) getLogFileContent(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
nodeID := ps.MustGetParameter("node_id")
inst, node, err := getAgentByNodeID(nodeID)
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err)
return
}
if inst == nil {
h.WriteError(w, fmt.Sprintf("can not find agent by node [%s]", nodeID), http.StatusInternalServerError)
return
}
reqBody := struct {
FileName string `json:"file_name"`
LogsPath string `json:"logs_path"`
Offset int `json:"offset"`
Lines int `json:"lines"`
StartLineNumber int64 `json:"start_line_number"`
}{}
err = h.DecodeJSON(req, &reqBody)
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err)
return
}
reqBody.LogsPath = node.Path.Logs
sm := state.GetStateManager()
res, err := sm.GetAgentClient().GetElasticLogFileContent(nil, inst.GetEndpoint(), reqBody)
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err)
return
}
h.WriteJSON(w, res, http.StatusOK)
}
func getAgentByNodeID(nodeID string) (*model.Instance, *model.ESNodeInfo, error){
queryDsl := util.MapStr{
"size":1,
"query": util.MapStr{
"term": util.MapStr{
"node_uuid": util.MapStr{
"value": nodeID,
},
},
},
"sort": []util.MapStr{
{
"timestamp": util.MapStr{
"order": "desc",
},
},
},
}
q := &orm.Query{
RawQuery: util.MustToJSONBytes(queryDsl),
}
err, result := orm.Search(model.ESNodeInfo{}, q)
if err != nil {
return nil,nil, err
}
if len(result.Result) > 0 {
buf := util.MustToJSONBytes(result.Result[0])
v := &model.ESNodeInfo{}
err = util.FromJSONBytes(buf, v)
inst := &model.Instance{}
inst.ID = v.AgentID
_, err = orm.Get(inst)
if err != nil {
return nil, v, err
}
if inst.Name == "" {
return nil, v, nil
}
return inst, v, nil
}
return nil, nil, nil
}

881
modules/agent/api/tod.go Normal file
View File

@ -0,0 +1,881 @@
/* Copyright © INFINI LTD. All rights reserved.
* Web: https://infinilabs.com
* Email: hello#infini.ltd */
package api
import (
"context"
"errors"
"fmt"
log "github.com/cihub/seelog"
common2 "infini.sh/console/modules/agent/common"
model3 "infini.sh/console/modules/agent/model"
httprouter "infini.sh/framework/core/api/router"
"infini.sh/framework/core/elastic"
"infini.sh/framework/core/global"
"infini.sh/framework/core/host"
"infini.sh/framework/core/model"
"infini.sh/framework/core/orm"
"infini.sh/framework/core/util"
"infini.sh/framework/modules/elastic/common"
"net/http"
"net/url"
"strings"
"sync"
)
//func (h *APIHandler) updateInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
// id := ps.MustGetParameter("instance_id")
// oldInst := model.Instance{}
// oldInst.ID = id
// _, err := orm.Get(&oldInst)
// if err != nil {
// if err == elastic2.ErrNotFound {
// h.WriteJSON(w, util.MapStr{
// "_id": id,
// "result": "not_found",
// }, http.StatusNotFound)
// return
// }
// h.WriteError(w, err.Error(), http.StatusInternalServerError)
// log.Error(err)
// return
// }
//
// obj := model.Instance{}
// err = h.DecodeJSON(req, &obj)
// if err != nil {
// h.WriteError(w, err.Error(), http.StatusInternalServerError)
// log.Error(err)
// return
// }
//
// oldInst.Name = obj.Name
// oldInst.Endpoint = obj.Endpoint
// oldInst.Description = obj.Description
// oldInst.Tags = obj.Tags
// oldInst.BasicAuth = obj.BasicAuth
// err = orm.Update(&orm.Context{
// Refresh: "wait_for",
// }, &oldInst)
// if err != nil {
// h.WriteError(w, err.Error(), http.StatusInternalServerError)
// log.Error(err)
// return
// }
//
// h.WriteJSON(w, util.MapStr{
// "_id": obj.ID,
// "result": "updated",
// }, 200)
//}
//
//func (h *APIHandler) searchInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
//
// var (
// keyword = h.GetParameterOrDefault(req, "keyword", "")
// //queryDSL = `{"query":{"bool":{"must":[%s]}}, "size": %d, "from": %d}`
// strSize = h.GetParameterOrDefault(req, "size", "20")
// strFrom = h.GetParameterOrDefault(req, "from", "0")
// )
//
// var (
// mustQ []interface{}
// )
//
// if keyword != "" {
// mustQ = append(mustQ, util.MapStr{
// "query_string": util.MapStr{
// "default_field": "*",
// "query": keyword,
// },
// })
// }
// size, _ := strconv.Atoi(strSize)
// if size <= 0 {
// size = 20
// }
// from, _ := strconv.Atoi(strFrom)
// if from < 0 {
// from = 0
// }
//
// queryDSL := util.MapStr{
// "size": size,
// "from": from,
// }
// if len(mustQ) > 0 {
// queryDSL["query"] = util.MapStr{
// "bool": util.MapStr{
// "must": mustQ,
// },
// }
// }
//
// q := orm.Query{}
// q.RawQuery = util.MustToJSONBytes(queryDSL)
//
// err, res := orm.Search(&model.Instance{}, &q)
// if err != nil {
// log.Error(err)
// h.WriteError(w, err.Error(), http.StatusInternalServerError)
// return
// }
//
// h.Write(w, res.Raw)
//}
func (h *APIHandler) getESNodesInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
id := ps.MustGetParameter("instance_id")
obj := model.Instance{}
obj.ID = id
exists, err := orm.Get(&obj)
if !exists || err != nil {
h.WriteJSON(w, util.MapStr{
"_id": id,
"found": false,
}, http.StatusNotFound)
return
}
nodes, err := refreshNodesInfo(&obj)
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
h.WriteJSON(w, nodes, http.StatusOK)
}
func (h *APIHandler) authESNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
id := ps.MustGetParameter("instance_id")
inst := model.Instance{}
inst.ID = id
exists, err := orm.Get(&inst)
if !exists || err != nil {
h.WriteJSON(w, util.MapStr{
"_id": id,
"found": false,
}, http.StatusNotFound)
return
}
reqBody := struct {
NodeID string `json:"node_id"`
ESConfig *elastic.ElasticsearchConfig `json:"es_config"`
}{}
err = h.DecodeJSON(req, &reqBody)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
//node id maybe is missing
if reqBody.NodeID != "" {
//verify the node id, if the node is is actually the node of the instance
oldNodeInfo := &model.ESNodeInfo{
ID: reqBody.NodeID,
}
exists, err = orm.Get(oldNodeInfo)
if !exists || err != nil {
h.WriteJSON(w, fmt.Sprintf("node [%s] of agent [%s] was not found", oldNodeInfo.ID, inst.Name), http.StatusInternalServerError)
return
}
}else{
//find out the node id with credentials
cfg := reqBody.ESConfig
if cfg.Endpoint == "" {
cfg.Endpoint = cfg.GetAnyEndpoint()
}
basicAuth, err := common.GetBasicAuth(cfg)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
cfg.BasicAuth = basicAuth
nodeInfo, err := AuthESNode(context.Background(), inst.GetEndpoint(), *cfg)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
//host, port, err := net.SplitHostPort(nodeInfo.PublishAddress)
//if err != nil {
// log.Error(err)
// h.WriteError(w, err.Error(), http.StatusInternalServerError)
// return
//}
//if !util.StringInArray(inst.Network.IP, host) && !net.ParseIP(host).IsLoopback() {
// h.WriteError(w, fmt.Sprintf("got node host %s not match any ip of %v", host, inst.Network.IP), http.StatusInternalServerError)
// return
//}
//if oldNodeInfo.HttpPort != port {
// h.WriteError(w, fmt.Sprintf("port mismatch, got: %sexpected: %s", port, oldNodeInfo.HttpPort), http.StatusInternalServerError)
// return
//}
//if oldNodeInfo.ProcessInfo.PID != nodeInfo.ProcessInfo.PID {
// h.WriteError(w, fmt.Sprintf("process id mismatch, got: %dexpected: %d", nodeInfo.ProcessInfo.PID, oldNodeInfo.ProcessInfo.PID), http.StatusInternalServerError)
// return
//}
reqBody.NodeID=nodeInfo.NodeUUID
}
//nodeInfo:=elastic.NodeConfig{}
//nodeInfo.ID = reqBody.NodeID
//nodeInfo.AgentID = inst.ID
//err = orm.Update(nil, nodeInfo) //update node's info and agent_id
//if err != nil {
// log.Error(err)
// h.WriteError(w, err.Error(), http.StatusInternalServerError)
// return
//}
//h.WriteJSON(w, nodeInfo, http.StatusOK)
}
func NewClusterSettings(clusterID string) *model.Setting {
settings := model.Setting{
Metadata: model.SettingsMetadata{
Category: Cluster,
},
}
settings.ID = fmt.Sprintf("%v_%v_%v", settings.Metadata.Category, settings.Metadata.Name, clusterID)
settings.Metadata.Labels = util.MapStr{
"cluster_id": clusterID,
}
return &settings
}
func NewNodeAgentSettings(clusterID, clusterUUID, nodeUUID, agentID, agentCredential string) *model.Setting {
settings := model.Setting{
Metadata: model.SettingsMetadata{
Category: Node,
Name: "agent",
},
}
settings.ID = fmt.Sprintf("%v_%v_%v", settings.Metadata.Category, settings.Metadata.Name, nodeUUID)
settings.Metadata.Labels = util.MapStr{
"cluster_id": clusterID,
"cluster_uuid": clusterUUID,
"node_uuid": nodeUUID,
"agent_id": agentID,
"agent_credential": agentCredential,
}
return &settings
}
func NewIndexSettings(clusterID, nodeID, agentID, indexName, indexID string) *model.Setting {
settings := model.Setting{
Metadata: model.SettingsMetadata{
Category: Index,
},
}
settings.ID = fmt.Sprintf("%v_%v_%v", settings.Metadata.Category, settings.Metadata.Name, nodeID)
settings.Metadata.Labels = util.MapStr{
"cluster_id": clusterID,
"node_id": nodeID,
"agent_id": agentID,
"index_name": indexName,
"index_id": indexID,
}
return &settings
}
const Cluster = "cluster_settings"
const Node = "node_settings"
const Index = "index_settings"
func (h *APIHandler) associateESNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
//agent id
instID := ps.MustGetParameter("instance_id")
//node id and cluster id
reqBody := struct {
ClusterUUID string `json:"cluster_uuid"`
NodeUUID string `json:"node_uuid"`
//infini system assigned id
ClusterID string `json:"cluster_id"`
}{}
err := h.DecodeJSON(req, &reqBody)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
//update node's setting
settings := NewNodeAgentSettings(reqBody.ClusterID, reqBody.ClusterUUID, reqBody.NodeUUID, instID, "node.AgentCredential")
err = orm.Update(&orm.Context{
Refresh: "wait_for",
}, settings)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
//settings, err := common2.GetAgentSettings(instID, 0)
//if err != nil {
// log.Error(err)
// h.WriteError(w, err.Error(), http.StatusInternalServerError)
// return
//}
//setting := pickAgentSettings(settings, node)
//if setting == nil {
// setting, err = getAgentTaskSetting(instID, node)
// if err != nil {
// log.Error("get agent task setting error: ", err)
// }
// err = orm.Create(nil, setting)
// if err != nil {
// log.Error("save agent task setting error: ", err)
// }
//}
h.WriteAckOKJSON(w)
}
func (h *APIHandler) autoAssociateESNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
reqBody := struct {
ClusterIDs []string `json:"cluster_ids"`
}{}
err := h.DecodeJSON(req, &reqBody)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
// query not associated nodes info
nodesM, err := getUnAssociateNodes()
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
if len(nodesM) == 0 {
h.WriteAckOKJSON(w)
return
}
agentIds := make([]string, 0, len(nodesM))
for agentID := range nodesM {
agentIds = append(agentIds, agentID)
}
agents, err := getAgentByIds(agentIds)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
for _, clusterID := range reqBody.ClusterIDs {
// query cluster basicauth
cfg := elastic.GetConfig(clusterID)
basicAuth, err := common.GetBasicAuth(cfg)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
taskSetting, err := getSettingsByClusterID(cfg.ID)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
for agentID, nodes := range nodesM {
var (
inst *model.Instance
ok bool
)
if inst, ok = agents[agentID]; !ok {
log.Warnf("agent [%v] was not found", agentID)
continue
}
settings, err := common2.GetAgentSettings(agentID, 0)
if err != nil {
log.Error(err)
continue
}
for _, v := range nodes {
host := v.PublishAddress
var endpoint string
if strings.HasPrefix(host, "::") { //for ipv6
instURL, err := url.Parse(inst.Endpoint)
if err != nil {
log.Error(err)
continue
}
host = instURL.Hostname()
endpoint = fmt.Sprintf("%s://[%s]:%s", v.Schema, host, v.HttpPort)
} else {
endpoint = fmt.Sprintf("%s://%s", v.Schema, host)
}
escfg := elastic.ElasticsearchConfig{
Endpoint: endpoint,
BasicAuth: basicAuth,
}
nodeInfo, err := AuthESNode(context.Background(), inst.GetEndpoint(), escfg)
if err != nil {
log.Warn(err)
continue
}
//matched
if nodeInfo.ClusterUuid == cfg.ClusterUUID {
//update node info
nodeInfo.ID = v.ID
nodeInfo.AgentID = inst.ID
nodeInfo.ClusterID = cfg.ID
err = orm.Save(nil, nodeInfo)
if err != nil {
log.Error(err)
continue
}
setting := pickAgentSettings(settings, v)
if setting == nil {
tsetting := model3.TaskSetting{
NodeStats: &model3.NodeStatsTask{
Enabled: true,
},
Logs: &model3.LogsTask{
Enabled: true,
LogsPath: nodeInfo.Path.Logs,
},
}
if taskSetting.IndexStats != nil {
tsetting.IndexStats = taskSetting.IndexStats
taskSetting.IndexStats = nil
}
if taskSetting.ClusterHealth != nil {
tsetting.ClusterHealth = taskSetting.ClusterHealth
taskSetting.ClusterHealth = nil
}
if taskSetting.ClusterStats != nil {
tsetting.ClusterStats = taskSetting.ClusterStats
taskSetting.ClusterStats = nil
}
setting = &model.Setting{
Metadata: model.SettingsMetadata{
Category: "agent",
Name: "task",
Labels: util.MapStr{
"agent_id": agentID,
"cluster_uuid": nodeInfo.ClusterUuid,
"cluster_id": nodeInfo.ClusterID,
"node_uuid": nodeInfo.NodeUUID,
"endpoint": fmt.Sprintf("%s://%s", nodeInfo.Schema, nodeInfo.PublishAddress),
},
},
Payload: util.MapStr{
"task": tsetting,
},
}
err = orm.Create(nil, setting)
if err != nil {
log.Error("save agent task setting error: ", err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
}
}
}
}
}
h.WriteAckOKJSON(w)
}
func getAgentByIds(agentIDs []string) (map[string]*model.Instance, error) {
query := util.MapStr{
"size": len(agentIDs),
"query": util.MapStr{
"terms": util.MapStr{
"id": agentIDs,
},
},
}
q := orm.Query{
RawQuery: util.MustToJSONBytes(query),
}
err, result := orm.Search(model.Instance{}, &q)
if err != nil {
return nil, err
}
agents := map[string]*model.Instance{}
for _, row := range result.Result {
inst := model.Instance{}
buf := util.MustToJSONBytes(row)
util.MustFromJSONBytes(buf, &inst)
agents[inst.ID] = &inst
}
return agents, nil
}
func (h *APIHandler) deleteESNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
id := ps.MustGetParameter("instance_id")
nodeIDs := []string{}
err := h.DecodeJSON(req, &nodeIDs)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
if len(nodeIDs) > 0 {
q := util.MapStr{
"query": util.MapStr{
"bool": util.MapStr{
"must": []util.MapStr{
{
"terms": util.MapStr{
"id": nodeIDs,
},
},
{
"term": util.MapStr{
"agent_id": util.MapStr{
"value": id,
},
},
},
},
},
},
}
err = orm.DeleteBy(model.ESNodeInfo{}, util.MustToJSONBytes(q))
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
q = util.MapStr{
"query": util.MapStr{
"bool": util.MapStr{
"must": []util.MapStr{
{
"terms": util.MapStr{
"metadata.labels.node_uuid": nodeIDs,
},
},
{
"term": util.MapStr{
"metadata.labels.agent_id": util.MapStr{
"value": id,
},
},
},
},
},
},
}
}
h.WriteAckOKJSON(w)
}
//
//func (h *APIHandler) tryConnect(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
// var reqBody = struct {
// Endpoint string `json:"endpoint"`
// BasicAuth model.BasicAuth
// }{}
// err := h.DecodeJSON(req, &reqBody)
// if err != nil {
// h.WriteError(w, err.Error(), http.StatusInternalServerError)
// return
// }
// connectRes, err := client.GetClient().GetInstanceBasicInfo(context.Background(), reqBody.Endpoint)
// if err != nil {
// h.WriteError(w, err.Error(), http.StatusInternalServerError)
// return
// }
// h.WriteJSON(w, connectRes, http.StatusOK)
//}
func pickAgentSettings(settings []model.Setting, nodeInfo model.ESNodeInfo) *model.Setting {
for _, setting := range settings {
if setting.Metadata.Labels["node_uuid"] == nodeInfo.NodeUUID {
return &setting
}
}
return nil
}
func getAgentTaskSetting(agentID string, v model.ESNodeInfo) (*model.Setting, error) {
taskSetting, err := getSettingsByClusterID(v.ClusterID)
if err != nil {
return nil, err
}
taskSetting.Logs = &model3.LogsTask{
Enabled: true,
LogsPath: v.Path.Logs,
}
return &model.Setting{
Metadata: model.SettingsMetadata{
Category: "agent",
Name: "task",
Labels: util.MapStr{
"agent_id": agentID,
"cluster_uuid": v.ClusterUuid,
"cluster_id": v.ClusterID,
"node_uuid": v.NodeUUID,
"endpoint": fmt.Sprintf("%s://%s", v.Schema, v.PublishAddress),
},
},
Payload: util.MapStr{
"task": taskSetting,
},
}, nil
}
// getSettingsByClusterID query agent task settings with cluster id
func getSettingsByClusterID(clusterID string) (*model3.TaskSetting, error) {
err, result := querySettingsByClusterID(clusterID)
if err != nil {
return nil, err
}
setting := &model3.TaskSetting{
NodeStats: &model3.NodeStatsTask{
Enabled: true,
},
}
var (
clusterStats = true
indexStats = true
clusterHealth = true
)
keys := []string{"payload.task.cluster_stats.enabled", "payload.task.cluster_health.enabled", "payload.task.index_stats.enabled"}
for _, row := range result.Result {
if v, ok := row.(map[string]interface{}); ok {
vm := util.MapStr(v)
for _, key := range keys {
tv, _ := vm.GetValue(key)
if tv == true {
switch key {
case "payload.task.cluster_stats.enabled":
clusterStats = false
case "payload.task.index_stats.enabled":
indexStats = false
case "payload.task.cluster_health.enabled":
clusterHealth = false
}
}
}
}
}
if clusterStats {
setting.ClusterStats = &model3.ClusterStatsTask{
Enabled: true,
}
}
if indexStats {
setting.IndexStats = &model3.IndexStatsTask{
Enabled: true,
}
}
if clusterHealth {
setting.ClusterHealth = &model3.ClusterHealthTask{
Enabled: true,
}
}
return setting, nil
}
func querySettingsByClusterID(clusterID string) (error, orm.Result) {
queryDsl := util.MapStr{
"size": 500,
"query": util.MapStr{
"bool": util.MapStr{
"must": []util.MapStr{
{
"term": util.MapStr{
"metadata.labels.cluster_id": util.MapStr{
"value": clusterID,
},
},
},
},
"minimum_should_match": 1,
"should": []util.MapStr{
{
"term": util.MapStr{
"payload.task.cluster_health.enabled": util.MapStr{
"value": true,
},
},
},
{
"term": util.MapStr{
"payload.task.cluster_stats.enabled": util.MapStr{
"value": true,
},
},
},
{
"term": util.MapStr{
"payload.task.index_stats.enabled": util.MapStr{
"value": true,
},
},
},
},
},
},
}
q := orm.Query{
RawQuery: util.MustToJSONBytes(queryDsl),
}
return orm.Search(model.Setting{}, &q)
}
func GetHostInfo(ctx context.Context, agentBaseURL string) (*host.HostInfo, error) {
req := &util.Request{
Method: http.MethodGet,
Path: "/agent/host/_basic",
Context: ctx,
}
resBody := struct {
Success bool `json:"success"`
Error string `json:"error"`
HostInfo *host.HostInfo `json:"result"`
}{}
req.Body = util.MustToJSONBytes(resBody)
err := DoRequest(req, &resBody)
if err != nil {
return nil, err
}
if resBody.Success != true {
return nil, fmt.Errorf("enroll error from client: %v", resBody.Error)
}
return resBody.HostInfo, nil
}
func GetElasticProcess(ctx context.Context, agentBaseURL string, agentID string) (interface{}, error) {
req := &util.Request{
Method: http.MethodGet,
Path: fmt.Sprintf("/elasticsearch/%s/process/_elastic", agentID),
Context: ctx,
}
resBody := map[string]interface{}{}
err := DoRequest(req, &resBody)
if err != nil {
return nil, err
}
if resBody["success"] != true {
return nil, fmt.Errorf("discover host callback error: %v", resBody["error"])
}
return resBody["elastic_process"], nil
}
func GetInstanceBasicInfo(ctx context.Context, agentBaseURL string) (*model.Instance, error) {
req := &util.Request{
Method: http.MethodGet,
Path: "/_info",
Context: ctx,
}
resBody := &model.Instance{}
err := DoRequest(req, &resBody)
return resBody, err
}
func RegisterElasticsearch(ctx context.Context, agentBaseURL string, cfgs []elastic.ElasticsearchConfig) error {
reqBody, err := util.ToJSONBytes(cfgs)
if err != nil {
return err
}
req := &util.Request{
Method: http.MethodPost,
Path: "/elasticsearch/_register",
Context: ctx,
Body: reqBody,
}
resBody := util.MapStr{}
err = DoRequest(req, &resBody)
if err != nil {
return err
}
if resBody["acknowledged"] != true {
return fmt.Errorf("%v", resBody["error"])
}
return nil
}
func CreatePipeline(ctx context.Context, agentBaseURL string, body []byte) error {
req := &util.Request{
Method: http.MethodPost,
Path: "/pipeline/tasks/",
Body: body,
Context: ctx,
}
resBody := util.MapStr{}
return DoRequest(req, &resBody)
}
func DeletePipeline(ctx context.Context, agentBaseURL, pipelineID string) error {
req := &util.Request{
Method: http.MethodDelete,
Path: fmt.Sprintf("/pipeline/task/%s", pipelineID),
Context: ctx,
}
return DoRequest(req, nil)
}
func DoRequest(req *util.Request, obj interface{}) error {
panic("implement me")
}
var mTLSClient *http.Client //TODO get mTLSClient
var initOnce = sync.Once{}
func doRequest(instance *model.Instance, req *util.Request, obj interface{}) error {
var err error
var res *util.Result
initOnce.Do(func() {
if global.Env().SystemConfig.Configs.TLSConfig.TLSEnabled && global.Env().SystemConfig.Configs.TLSConfig.TLSCAFile != "" {
//init client
hClient, err := util.NewMTLSClient(
global.Env().SystemConfig.Configs.TLSConfig.TLSCAFile,
global.Env().SystemConfig.Configs.TLSConfig.TLSCertFile,
global.Env().SystemConfig.Configs.TLSConfig.TLSKeyFile)
if err != nil {
panic(err)
}
mTLSClient = hClient
}
})
req.Url, err = url.JoinPath(instance.GetEndpoint(), req.Path)
res, err = util.ExecuteRequestWithCatchFlag(mTLSClient, req, true)
if err != nil || res.StatusCode != 200 {
body := ""
if res != nil {
body = string(res.Body)
}
return errors.New(fmt.Sprintf("request error: %v, %v", err, body))
}
if res != nil {
if res.Body != nil {
return util.FromJSONBytes(res.Body, obj)
}
}
return nil
}

View File

@ -1,279 +0,0 @@
/* Copyright © INFINI Ltd. All rights reserved.
* Web: https://infinilabs.com
* Email: hello#infini.ltd */
package client
import (
"context"
"fmt"
"infini.sh/console/modules/agent/common"
"infini.sh/framework/core/model"
"infini.sh/framework/core/elastic"
"infini.sh/framework/core/host"
"infini.sh/framework/core/util"
"net/http"
)
var defaultClient ClientAPI
func GetClient() ClientAPI {
if defaultClient == nil {
panic("agent client not init")
}
return defaultClient
}
func RegisterClient(client ClientAPI) {
defaultClient = client
}
type ClientAPI interface {
GetHostInfo(ctx context.Context, agentBaseURL string) (*host.HostInfo, error)
GetElasticProcess(ctx context.Context, agentBaseURL string, agentID string)(interface{}, error)
GetElasticLogFiles(ctx context.Context, agentBaseURL string, logsPath string)(interface{}, error)
GetElasticLogFileContent(ctx context.Context, agentBaseURL string, body interface{})(interface{}, error)
GetInstanceBasicInfo(ctx context.Context, agentBaseURL string) (*model.Instance, error)
RegisterElasticsearch(ctx context.Context, agentBaseURL string, cfgs []elastic.ElasticsearchConfig) error
GetElasticsearchNodes(ctx context.Context, agentBaseURL string) ([]model.ESNodeInfo, error)
AuthESNode(ctx context.Context, agentBaseURL string, cfg elastic.ElasticsearchConfig) (*model.ESNodeInfo, error)
CreatePipeline(ctx context.Context, agentBaseURL string, body []byte) error
DeletePipeline(ctx context.Context, agentBaseURL, pipelineID string) error
SetKeystoreValue(ctx context.Context, agentBaseURL string, key, value string) error
SaveDynamicConfig(ctx context.Context, agentBaseURL string, name, content string) error
SaveIngestConfig(ctx context.Context, agentBaseURL string) error
DoRequest(req *util.Request, respObj interface{}) error
}
type Client struct {
Executor Executor
}
//func (client *Client) GetHostInfo(ctx context.Context, agentBaseURL string) (*host.HostInfo, error) {
// req := &util.Request{
// Method: http.MethodGet,
// Url: fmt.Sprintf("%s/agent/host/_basic", agentBaseURL),
// Context: ctx,
// }
// resBody := struct {
// Success bool `json:"success"`
// Error string `json:"error"`
// HostInfo *host.HostInfo `json:"result"`
// }{}
// err := client.DoRequest(req, &resBody)
// if err != nil {
// return nil, err
// }
// if resBody.Success != true {
// return nil, fmt.Errorf("enroll error from client: %v", resBody.Error)
// }
// return resBody.HostInfo, nil
//}
//TODO
func (client *Client) GetElasticProcess(ctx context.Context, agentBaseURL string, agentID string)(interface{}, error) {
panic("implement me")
req := &util.Request{
Method: http.MethodGet,
Url: fmt.Sprintf("%s/elasticsearch/%s/process/_elastic", agentBaseURL, agentID),
Context: ctx,
}
resBody := map[string]interface{}{}
err := client.DoRequest(req, &resBody)
if err != nil {
return nil, err
}
if resBody["success"] != true {
return nil, fmt.Errorf("discover host callback error: %v", resBody["error"])
}
return resBody["elastic_process"], nil
}
func (client *Client) GetElasticLogFiles(ctx context.Context, agentBaseURL string, logsPath string)(interface{}, error) {
panic("implement me")
reqBody := util.MustToJSONBytes(util.MapStr{
"logs_path": logsPath,
})
req := &util.Request{
Method: http.MethodPost,
Url: fmt.Sprintf("%s/agent/logs/elastic/list", agentBaseURL),
Context: ctx,
Body: reqBody,
}
resBody := map[string]interface{}{}
err := client.DoRequest(req, &resBody)
if err != nil {
return nil, err
}
if resBody["success"] != true {
return nil, fmt.Errorf("get elasticsearch log files error: %v", resBody["error"])
}
return resBody["result"], nil
}
func (client *Client) GetElasticLogFileContent(ctx context.Context, agentBaseURL string, body interface{})(interface{}, error) {
req := &util.Request{
Method: http.MethodPost,
Url: fmt.Sprintf("%s/agent/logs/elastic/_read", agentBaseURL),
Context: ctx,
Body: util.MustToJSONBytes(body),
}
resBody := map[string]interface{}{}
err := client.DoRequest(req, &resBody)
if err != nil {
return nil, err
}
if resBody["success"] != true {
return nil, fmt.Errorf("get elasticsearch log files error: %v", resBody["error"])
}
var hasMore bool
if v, ok := resBody["EOF"].(bool); ok && !v {
hasMore = true
}
return map[string]interface{}{
"lines": resBody["result"],
"has_more": hasMore,
} , nil
}
func (client *Client) GetInstanceBasicInfo(ctx context.Context, agentBaseURL string) (*model.Instance, error){
req := &util.Request{
Method: http.MethodGet,
Url: fmt.Sprintf("%s/_info", agentBaseURL ),
Context: ctx,
}
resBody := &model.Instance{}
err := client.DoRequest(req, &resBody)
return resBody, err
}
func (client *Client) RegisterElasticsearch(ctx context.Context, agentBaseURL string, cfgs []elastic.ElasticsearchConfig) error {
reqBody, err := util.ToJSONBytes(cfgs)
if err != nil {
return err
}
req := &util.Request{
Method: http.MethodPost,
Url: fmt.Sprintf("%s/elasticsearch/_register", agentBaseURL ),
Context: ctx,
Body: reqBody,
}
resBody := util.MapStr{}
err = client.DoRequest(req, &resBody)
if err != nil {
return err
}
if resBody["acknowledged"] != true {
return fmt.Errorf("%v", resBody["error"])
}
return nil
}
func (client *Client) GetElasticsearchNodes(ctx context.Context, agentBaseURL string) ([]model.ESNodeInfo, error) {
req := &util.Request{
Method: http.MethodGet,
Url: fmt.Sprintf("%s/elasticsearch/nodes/_discovery", agentBaseURL ),
Context: ctx,
}
resBody := []model.ESNodeInfo{}
err := client.DoRequest(req, &resBody)
if err != nil {
return nil, err
}
return resBody, nil
}
func (client *Client) AuthESNode(ctx context.Context, agentBaseURL string, cfg elastic.ElasticsearchConfig) (*model.ESNodeInfo, error) {
reqBody, err := util.ToJSONBytes(cfg)
if err != nil {
return nil, err
}
req := &util.Request{
Method: http.MethodPost,
Url: fmt.Sprintf("%s/elasticsearch/_auth", agentBaseURL ),
Context: ctx,
Body: reqBody,
}
resBody := &model.ESNodeInfo{}
err = client.DoRequest(req, resBody)
if err != nil {
return nil, err
}
return resBody, nil
}
func (client *Client) CreatePipeline(ctx context.Context, agentBaseURL string, body []byte) error{
req := &util.Request{
Method: http.MethodPost,
Url: agentBaseURL + "/pipeline/tasks/",
Body: body,
Context: ctx,
}
resBody := util.MapStr{}
return client.DoRequest(req, &resBody)
}
func (client *Client) DeletePipeline(ctx context.Context, agentBaseURL, pipelineID string) error{
req := &util.Request{
Method: http.MethodDelete,
Url: fmt.Sprintf("%s/pipeline/task/%s", agentBaseURL, pipelineID),
Context: ctx,
}
return client.DoRequest(req, nil)
}
func (client *Client) SetKeystoreValue(ctx context.Context, agentBaseURL string, key, value string) error{
body := util.MapStr{
"key": key,
"value": value,
}
req := &util.Request{
Method: http.MethodPost,
Url: fmt.Sprintf("%s/keystore", agentBaseURL),
Context: ctx,
Body: util.MustToJSONBytes(body),
}
return client.DoRequest(req, nil)
}
func (client *Client) SaveDynamicConfig(ctx context.Context, agentBaseURL string, filename, content string) error{
body := util.MapStr{
"configs": util.MapStr{
filename: content,
},
}
req := &util.Request{
Method: http.MethodPost,
Url: fmt.Sprintf("%s/config/_update", agentBaseURL),
Context: ctx,
Body: util.MustToJSONBytes(body),
}
return client.DoRequest(req, nil)
}
func (client *Client) SaveIngestConfig(ctx context.Context, agentBaseURL string) error {
ingestCfg, basicAuth, err := common.GetAgentIngestConfig()
if err != nil {
return err
}
if basicAuth != nil && basicAuth.Password != "" {
err = client.SetKeystoreValue(ctx, agentBaseURL, "ingest_cluster_password", basicAuth.Password)
if err != nil {
return fmt.Errorf("set keystore value to agent error: %w", err)
}
}
err = client.SaveDynamicConfig(context.Background(), agentBaseURL, "ingest_variables.yml", ingestCfg )
if err != nil {
return fmt.Errorf("save dynamic config to agent error: %w", err)
}
return nil
}
func (client *Client) DoRequest(req *util.Request, respObj interface{}) error {
return client.Executor.DoRequest(req, respObj)
}

View File

@ -1,100 +0,0 @@
/* Copyright © INFINI Ltd. All rights reserved.
* Web: https://infinilabs.com
* Email: hello#infini.ltd */
package client
import (
"bytes"
"fmt"
"infini.sh/console/modules/agent/common"
"infini.sh/framework/core/util"
"io"
"net/http"
)
type Executor interface {
DoRequest(req *util.Request, respObj interface{}) error
}
type HttpExecutor struct {
}
func (executor *HttpExecutor) DoRequest(req *util.Request, respObj interface{}) error {
result, err := util.ExecuteRequest(req)
if err != nil {
return err
}
if result.StatusCode != 200 {
return fmt.Errorf(string(result.Body))
}
if respObj == nil {
return nil
}
return util.FromJSONBytes(result.Body, respObj)
}
func NewMTLSExecutor(caCertFile, caKeyFile string) (*MTLSExecutor, error){
var (
instanceCrt string
instanceKey string
)
instanceCrt, instanceKey, err := common.GetAgentInstanceCerts(caCertFile, caKeyFile)
if err != nil {
return nil, fmt.Errorf("generate tls cert error: %w", err)
}
hClient, err := util.NewMTLSClient(caCertFile, instanceCrt, instanceKey)
if err != nil {
return nil, err
}
return &MTLSExecutor{
CaCertFile: caCertFile,
CAKeyFile: caKeyFile,
client: hClient,
}, nil
}
type MTLSExecutor struct {
CaCertFile string
CAKeyFile string
client *http.Client
}
func (executor *MTLSExecutor) DoRequest(req *util.Request, respObj interface{}) error {
var reader io.Reader
if len(req.Body) > 0 {
reader = bytes.NewReader(req.Body)
}
var (
hr *http.Request
err error
)
if req.Context == nil {
hr, err = http.NewRequest(req.Method, req.Url, reader)
}else{
hr, err = http.NewRequestWithContext(req.Context, req.Method, req.Url, reader)
}
if err != nil {
return err
}
res, err := executor.client.Do(hr)
if err != nil {
return err
}
defer res.Body.Close()
buf, err := io.ReadAll(res.Body)
if err != nil {
return err
}
if res.StatusCode != 200 {
return fmt.Errorf(string(buf))
}
if respObj != nil {
err = util.FromJSONBytes(buf, respObj)
if err != nil {
return err
}
}
return nil
}

View File

@ -5,15 +5,10 @@
package common
import (
"crypto/x509"
"encoding/pem"
log "github.com/cihub/seelog"
"infini.sh/console/modules/agent/model"
"infini.sh/framework/core/env"
"infini.sh/framework/core/global"
"infini.sh/framework/core/util"
"os"
"path"
"infini.sh/framework/plugins/managed/common"
)
@ -30,37 +25,10 @@ func GetAgentConfig() *model.AgentConfig {
log.Debug("agent config not found: %v", err)
}
if agentCfg.Setup.CACertFile == "" && agentCfg.Setup.CAKeyFile == "" {
agentCfg.Setup.CACertFile, agentCfg.Setup.CAKeyFile, err = GetOrInitDefaultCaCerts()
agentCfg.Setup.CACertFile, agentCfg.Setup.CAKeyFile, err = common.GetOrInitDefaultCaCerts()
if err != nil {
log.Errorf("generate default ca certs error: %v", err)
}
}
return agentCfg
}
func GetOrInitDefaultCaCerts()(string, string, error){
dataDir := global.Env().GetDataDir()
caFile := path.Join(dataDir, "certs/ca.crt")
caKey := path.Join(dataDir, "certs/ca.key")
if !(util.FileExists(caFile) && util.FileExists(caKey) ) {
err := os.MkdirAll(path.Join(dataDir, "certs"), 0775)
if err != nil {
return "", "", err
}
log.Info("auto generating cert files")
_, rootKey, rootCertPEM := util.GetRootCert()
caKeyPEM := pem.EncodeToMemory(&pem.Block{
Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(rootKey),
})
_, err = util.FilePutContentWithByte(caKey, caKeyPEM)
if err != nil {
return "", "", err
}
_, err = util.FilePutContentWithByte(caFile, rootCertPEM)
if err != nil {
return "", "", err
}
}
return caFile, caKey, nil
}

View File

@ -401,7 +401,7 @@ func GetLatestOnlineAgentIDs(agentIds []string, lastSeconds int) (map[string]str
return agentIDs, nil
}
func GetAgentIngestConfig() (string, *elastic.BasicAuth, error) {
func GetAgentIngestConfig() (string, *model.BasicAuth, error) {
agCfg := GetAgentConfig()
var (
endpoint string
@ -422,7 +422,7 @@ func GetAgentIngestConfig() (string, *elastic.BasicAuth, error) {
}
var (
basicAuth elastic.BasicAuth
basicAuth model.BasicAuth
)
if agCfg.Setup.IngestClusterCredentialID != "" {
cred := credential.Credential{}
@ -435,7 +435,7 @@ func GetAgentIngestConfig() (string, *elastic.BasicAuth, error) {
if err != nil {
return "", nil, fmt.Errorf("decode credential [%s] error: %w", cred.ID, err)
}
if basicAuth, ok = info.(elastic.BasicAuth); !ok {
if basicAuth, ok = info.(model.BasicAuth); !ok {
log.Debug("invalid credential: ", cred)
}
}else{

View File

@ -1,41 +0,0 @@
/* Copyright © INFINI Ltd. All rights reserved.
* Web: https://infinilabs.com
* Email: hello#infini.ltd */
package common
import (
"fmt"
"gopkg.in/yaml.v2"
"infini.sh/framework/core/util"
"testing"
)
func TestTransformSettingsToConfig(t *testing.T) {
setting := TaskSetting{
ClusterHealth: ClusterHealthTask{
Enabled: true,
},
ClusterStats: ClusterStatsTask {
Enabled: true,
},
IndexStats: IndexStatsTask{
Enabled: true,
},
NodeStats: NodeStatsTask{
Enabled: true,
NodeIDs: []string{"ddddnnnn"},
},
}
pipelines, err := transformSettingsToConfig(&setting, "testxxx")
if err !=nil {
t.Fatal(err)
}
buf, err := yaml.Marshal(util.MapStr{
"pipeline": pipelines,
})
if err !=nil {
t.Fatal(err)
}
fmt.Println(string(buf))
}

View File

@ -1,15 +0,0 @@
/* Copyright © INFINI Ltd. All rights reserved.
* Web: https://infinilabs.com
* Email: hello#infini.ltd */
package model
const (
StatusOnline string = "online"
StatusOffline = "offline"
)
const (
KVAgentIngestConfigChanged = "agent_ingest_config_changed"
KVSyncDynamicTaskSettings = "agent_sync_dynamic_task_settings"
)

View File

@ -1,352 +0,0 @@
/* Copyright © INFINI Ltd. All rights reserved.
* Web: https://infinilabs.com
* Email: hello#infini.ltd */
package state
import (
"context"
"fmt"
"github.com/buger/jsonparser"
log "github.com/cihub/seelog"
"gopkg.in/yaml.v2"
"infini.sh/console/modules/agent/client"
"infini.sh/console/modules/agent/common"
model2 "infini.sh/console/modules/agent/model"
"infini.sh/framework/core/model"
"infini.sh/framework/core/host"
"infini.sh/framework/core/kv"
"infini.sh/framework/core/orm"
"infini.sh/framework/core/util"
"infini.sh/framework/modules/elastic"
"runtime"
"runtime/debug"
"strconv"
"sync"
"time"
)
var stateManager IStateManager
func GetStateManager() IStateManager {
if stateManager == nil {
panic("agent state manager not init")
}
return stateManager
}
func RegisterStateManager(sm IStateManager) {
stateManager = sm
}
func IsEnabled() bool {
return stateManager != nil
}
type IStateManager interface {
GetAgent(ID string) (*model.Instance, error)
UpdateAgent(inst *model.Instance, syncToES bool) (*model.Instance, error)
GetTaskAgent(clusterID string) (*model.Instance, error)
DeleteAgent(agentID string) error
LoopState()
Stop()
GetAgentClient() client.ClientAPI
}
type StateManager struct {
TTL time.Duration // kv ttl
KVKey string
stopC chan struct{}
stopCompleteC chan struct{}
agentClient *client.Client
agentIds map[string]string
agentMutex sync.Mutex
workerChan chan struct{}
}
func NewStateManager(TTL time.Duration, kvKey string, agentIds map[string]string, agentClient *client.Client) *StateManager {
return &StateManager{
TTL: TTL,
KVKey: kvKey,
stopC: make(chan struct{}),
stopCompleteC: make(chan struct{}),
agentClient: agentClient,
agentIds: agentIds,
workerChan: make(chan struct{}, runtime.NumCPU()),
}
}
func (sm *StateManager) checkAgentStatus() {
onlineAgentIDs, err := common.GetLatestOnlineAgentIDs(nil, int(sm.TTL.Seconds()))
if err != nil {
log.Error(err)
return
}
//add new agent to state
sm.agentMutex.Lock()
for agentID := range onlineAgentIDs {
if _, ok := sm.agentIds[agentID]; !ok {
log.Infof("status of agent [%s] changed to online", agentID)
sm.agentIds[agentID] = model2.StatusOnline
}
}
sm.agentMutex.Unlock()
for agentID, status := range sm.agentIds {
sm.workerChan <- struct{}{}
go func(agentID string) {
defer func() {
if err := recover(); err != nil {
log.Errorf("check agent [%s] status recover form panic error: %v", agentID, err)
debug.PrintStack()
}
<-sm.workerChan
}()
sm.syncSettings(agentID)
sm.syncIngestSettings(agentID)
if _, ok := onlineAgentIDs[agentID]; ok {
host.UpdateHostAgentStatus(agentID, model2.StatusOnline)
if status == model2.StatusOnline {
return
}
// status change to online
sm.agentMutex.Lock()
sm.agentIds[agentID] = model2.StatusOnline
sm.agentMutex.Unlock()
log.Infof("status of agent [%s] changed to online", agentID)
return
}else{
// already offline
if status == model2.StatusOffline {
return
}
}
// status change to offline
sm.agentMutex.Lock()
sm.agentIds[agentID] = model2.StatusOffline
sm.agentMutex.Unlock()
ag, err := sm.GetAgent(agentID)
if err != nil {
if err != elastic.ErrNotFound {
log.Error(err)
}
return
}
ag.Status = model2.StatusOffline
log.Infof("agent [%s] is offline", ag.Endpoint)
_, err = sm.UpdateAgent(ag, true)
if err != nil {
log.Error(err)
return
}
//update host agent status
host.UpdateHostAgentStatus(ag.ID, model2.StatusOffline)
}(agentID)
}
}
func (sm *StateManager) getLastSyncSettingsTimestamp(agentID string) int64{
vbytes, err := kv.GetValue(model2.KVSyncDynamicTaskSettings, []byte(agentID))
if err != nil {
log.Error(err)
}
if vbytes == nil {
return 0
}
t, err := strconv.ParseInt(string(vbytes), 10, 64)
if err != nil {
log.Error(err)
}
return t
}
func (sm *StateManager) syncSettings(agentID string) {
ag, err := sm.GetAgent(agentID)
if err != nil {
if err != elastic.ErrNotFound {
log.Errorf("get agent error: %v", err)
}
return
}
newTimestamp := time.Now().UnixMilli()
lastSyncTimestamp := sm.getLastSyncSettingsTimestamp(agentID)
settings, err := common.GetAgentSettings(agentID, lastSyncTimestamp)
if err != nil {
log.Errorf("query agent settings error: %v", err)
return
}
if len(settings) == 0 {
log.Debugf("got no settings of agent [%s]", agentID)
return
}
parseResult, err := common.ParseAgentSettings(settings)
if err != nil {
log.Errorf("parse agent settings error: %v", err)
return
}
agClient := sm.GetAgentClient()
var clusterCfgs []util.MapStr
if len(parseResult.ClusterConfigs) > 0 {
for _, cfg := range parseResult.ClusterConfigs {
clusterCfg := util.MapStr{
"name": cfg.ID,
"enabled": true,
"endpoint": cfg.GetAnyEndpoint(),
}
if cfg.BasicAuth != nil && cfg.BasicAuth.Password != ""{
cid := cfg.ID
if cfg.ClusterUUID != "" {
cid = cfg.ClusterUUID
}
err = agClient.SetKeystoreValue(context.Background(), ag.GetEndpoint(), fmt.Sprintf("%s_password", cid), cfg.BasicAuth.Password)
if err != nil {
log.Errorf("set keystore value error: %v", err)
continue
}
clusterCfg["basic_auth"] = util.MapStr{
"username": cfg.BasicAuth.Username,
"password": fmt.Sprintf("$[[keystore.%s_password]]", cid),
}
}
clusterCfgs = append(clusterCfgs, clusterCfg)
}
}
var dynamicCfg = util.MapStr{}
if len(clusterCfgs) > 0 {
dynamicCfg["elasticsearch"] = clusterCfgs
}
if len(parseResult.Pipelines) > 0 {
dynamicCfg["pipeline"] = parseResult.Pipelines
}
cfgBytes, err := yaml.Marshal(dynamicCfg)
if err != nil {
log.Error("serialize config to yaml error: ", err)
return
}
//TODO
err = agClient.SaveDynamicConfig(context.Background(), ag.GetEndpoint(), "dynamic_task.yml", string(cfgBytes))
newTimestampStr := strconv.FormatInt(newTimestamp, 10)
err = kv.AddValue(model2.KVSyncDynamicTaskSettings, []byte(agentID), []byte(newTimestampStr))
if err != nil {
log.Error(err)
}
}
func (sm *StateManager) syncIngestSettings(agentID string) {
v, err := kv.GetValue(model2.KVAgentIngestConfigChanged, []byte(agentID))
if err != nil {
log.Error(err)
}
if string(v) != "1" {
return
}
ag, err := sm.GetAgent(agentID)
if err != nil {
if err != elastic.ErrNotFound {
log.Errorf("get agent error: %v", err)
}
return
}
err = sm.agentClient.SaveIngestConfig(context.Background(), ag.GetEndpoint())
if err == nil {
kv.AddValue(model2.KVAgentIngestConfigChanged,[]byte(agentID), []byte("0"))
}
}
func (sm *StateManager) getAvailableAgent(clusterID string) (*model.Instance, error) {
agents, err := common.LoadAgentsFromES(clusterID)
if err != nil {
return nil, err
}
if len(agents) == 0 {
return nil, nil
}
for _, ag := range agents {
if ag.Status == "offline" {
continue
}
}
return nil, nil
}
func (sm *StateManager) LoopState() {
t := time.NewTicker(30 * time.Second)
defer t.Stop()
MAINLOOP:
for {
select {
case <-sm.stopC:
sm.stopCompleteC <- struct{}{}
close(sm.workerChan)
break MAINLOOP
case <-t.C:
sm.checkAgentStatus()
}
}
}
func (sm *StateManager) Stop() {
sm.stopC <- struct{}{}
<-sm.stopCompleteC
}
func (sm *StateManager) GetAgent(ID string) (*model.Instance, error) {
buf, err := kv.GetValue(sm.KVKey, []byte(ID))
if err != nil {
return nil, err
}
strTime, _ := jsonparser.GetString(buf, "timestamp")
timestamp, _ := time.Parse(time.RFC3339, strTime)
inst := &model.Instance{}
inst.ID = ID
if time.Since(timestamp) > sm.TTL {
exists, err := orm.Get(inst)
if err != nil {
return nil, err
}
if !exists {
return nil, fmt.Errorf("can not found agent [%s]", ID)
}
//inst.Timestamp = time.Now()
err = kv.AddValue(sm.KVKey, []byte(ID), util.MustToJSONBytes(inst))
if err != nil {
log.Errorf("save agent [%s] to kv error: %v", ID, err)
}
return inst, nil
}
err = util.FromJSONBytes(buf, inst)
return inst, err
}
func (sm *StateManager) UpdateAgent(inst *model.Instance, syncToES bool) (*model.Instance, error) {
//inst.Timestamp = time.Now()
err := kv.AddValue(sm.KVKey, []byte(inst.ID), util.MustToJSONBytes(inst))
if syncToES {
ctx := orm.Context{
Refresh: "wait_for",
}
err = orm.Update(&ctx, inst)
if err != nil {
return nil, err
}
}
return inst, err
}
func (sm *StateManager) GetTaskAgent(clusterID string) (*model.Instance, error) {
return nil, nil
}
func (sm *StateManager) DeleteAgent(agentID string) error {
sm.agentMutex.Lock()
delete(sm.agentIds, agentID)
sm.agentMutex.Unlock()
log.Infof("delete agent [%s] from state", agentID)
return kv.DeleteKey(sm.KVKey, []byte(agentID))
}
func (sm *StateManager) GetAgentClient() client.ClientAPI {
return sm.agentClient
}

View File

@ -7,11 +7,11 @@ package common
import (
"infini.sh/console/model"
"infini.sh/framework/core/credential"
"infini.sh/framework/core/elastic"
model2 "infini.sh/framework/core/model"
"infini.sh/framework/core/orm"
)
func GetBasicAuth(srv *model.EmailServer) (basicAuth elastic.BasicAuth, err error) {
func GetBasicAuth(srv *model.EmailServer) (basicAuth model2.BasicAuth, err error) {
if srv.Auth != nil && srv.Auth.Username != "" {
basicAuth = *srv.Auth
return
@ -28,7 +28,7 @@ func GetBasicAuth(srv *model.EmailServer) (basicAuth elastic.BasicAuth, err erro
if err != nil {
return
}
if auth, ok := dv.(elastic.BasicAuth); ok {
if auth, ok := dv.(model2.BasicAuth); ok {
basicAuth = auth
}
}

View File

@ -35,7 +35,7 @@ func (handler *LicenseAPI) RequestTrialLicense(w http.ResponseWriter, req *http.
}
//TODO implement config for the api endpoint
request:=util.NewPostRequest("https://api.infini.sh/_license/request_trial", util.MustToJSONBytes(v))
request:=util.NewPostRequest("https://api.infini.cloud/_license/request_trial", util.MustToJSONBytes(v))
response,err:=util.ExecuteRequest(request)
if err!=nil{
handler.WriteError(w,err.Error(),response.StatusCode)

View File

@ -6,6 +6,7 @@ import (
"encoding/hex"
"fmt"
"infini.sh/framework/core/kv"
"infini.sh/framework/core/model"
"io"
"net/http"
uri2 "net/url"
@ -88,7 +89,7 @@ func (module *Module) Start() error {
log.Error(err)
return
}
if basicAuth, ok := bv.(elastic.BasicAuth); ok {
if basicAuth, ok := bv.(model.BasicAuth); ok {
err = keystore.SetValue("SYSTEM_CLUSTER_PASS", []byte(basicAuth.Password))
if err != nil {
log.Error(err)
@ -203,7 +204,7 @@ func (module *Module) validate(w http.ResponseWriter, r *http.Request, ps httpro
}
cfg1 = elastic1.ORMConfig{}
exist, err := env.ParseConfig("elastic.orm", &cfg1)
if exist && err != nil {
if exist && err != nil &&global.Env().SystemConfig.Configs.PanicOnConfigError{
panic(err)
}
@ -272,7 +273,7 @@ func (module *Module) initTempClient(r *http.Request) (error, elastic.API, Setup
Enabled: true,
Reserved: true,
Endpoint: request.Cluster.Endpoint,
BasicAuth: &elastic.BasicAuth{
BasicAuth: &model.BasicAuth{
Username: request.Cluster.Username,
Password: request.Cluster.Password,
},