From a315c4330d9c581bdedb211afff8d6cf2afd2b8b Mon Sep 17 00:00:00 2001 From: medcl Date: Mon, 16 Oct 2023 17:51:45 +0800 Subject: [PATCH] fix build --- config/install_agent.tpl | 26 +- config_repo/configs/cluster_xx_node_xx.yml | 17 + config_repo/configs/ingest_config.yml | 14 + config_repo/settings.yml | 31 + config_repo/templates/ingest_config.tpl | 97 +++ config_repo/templates/task_config.tpl | 87 ++ console.yml | 18 +- main.go | 9 +- model/email_server.go | 4 +- modules/agent/api/elasticsearch.go | 353 +++++++++ modules/agent/api/host.go | 436 +++++----- modules/agent/api/init.go | 47 +- modules/agent/api/instance.go | 1 + modules/agent/api/log.go | 124 --- modules/agent/api/tod.go | 881 +++++++++++++++++++++ modules/agent/client/client.go | 279 ------- modules/agent/client/executor.go | 100 --- modules/agent/common/config.go | 36 +- modules/agent/common/helper.go | 6 +- modules/agent/common/helper_test.go | 41 - modules/agent/model/const.go | 15 - modules/agent/state/state.go | 352 -------- plugin/api/email/common/auth.go | 6 +- plugin/api/license/api.go | 2 +- plugin/setup/setup.go | 7 +- 25 files changed, 1780 insertions(+), 1209 deletions(-) create mode 100644 config_repo/configs/cluster_xx_node_xx.yml create mode 100644 config_repo/configs/ingest_config.yml create mode 100644 config_repo/settings.yml create mode 100644 config_repo/templates/ingest_config.tpl create mode 100644 config_repo/templates/task_config.tpl create mode 100644 modules/agent/api/elasticsearch.go delete mode 100644 modules/agent/api/log.go create mode 100644 modules/agent/api/tod.go delete mode 100644 modules/agent/client/client.go delete mode 100644 modules/agent/client/executor.go delete mode 100644 modules/agent/common/helper_test.go delete mode 100644 modules/agent/model/const.go delete mode 100644 modules/agent/state/state.go diff --git a/config/install_agent.tpl b/config/install_agent.tpl index e764d685..1b0881d5 100644 --- a/config/install_agent.tpl +++ b/config/install_agent.tpl @@ -3,7 +3,7 @@ set -eo pipefail function print_usage() { - echo "Usage: curl -sSL http://get.infini.sh/agent.html | sudo bash -s -- [-u url_for_download_program] [-v version_for_program ] [-t taget_install_dir] [-p prot_for_program]" + echo "Usage: curl -sSL http://get.infini.cloud/ | sudo bash -s -- [-u url_for_download_program] [-v version_for_program ] [-t target_install_dir] [-p port_for_program]" echo "Options:" echo " -u, --url Download url of the program to install which default is http://localhost" echo " -v, --version Version of the program to install which default is latest from " @@ -226,6 +226,15 @@ resource_limit.memory.max_in_bytes: 533708800 stats: include_storage_stats_in_api: false +elastic: + skip_init_metadata_on_start: true + health_check: + enabled: true + interval: 60s + availability_check: + enabled: false + interval: 60s + disk_queue: max_msg_size: 20485760 max_bytes_per_file: 20485760 @@ -254,6 +263,21 @@ badger: value_log_max_entries: 1000000 value_log_file_size: 104857600 +configs: + #for managed client's setting + managed: true # managed by remote servers + panic_on_config_error: false #ignore config error + interval: "1s" + servers: # config servers + - "http://localhost:9000" + max_backup_files: 5 + tls: #for mTLS connection with config servers + enabled: true + cert_file: "config/client.crt" + key_file: "config/client.key" + ca_file: "config/ca.crt" + skip_insecure_verify: false + node: major_ip_pattern: ".*" EOF diff --git a/config_repo/configs/cluster_xx_node_xx.yml b/config_repo/configs/cluster_xx_node_xx.yml new file mode 100644 index 00000000..d67e7cc6 --- /dev/null +++ b/config_repo/configs/cluster_xx_node_xx.yml @@ -0,0 +1,17 @@ +configs.template: + - name: "cluster_a_node_b_task_config" + path: ./config/task_config.tpl + variable: + CLUSTER_ID: infini_default_ingest_cluster + CLUSTER_ENDPOINT: ["https://localhost:9200"] + CLUSTER_USERNAME: "admin" + CLUSTER_VER: "1.6.0" + CLUSTER_DISTRIBUTION: "easysearch" + INDEX_PREFIX: ".infini_" + CLUSTER_LEVEL_TASKS_ENABLED: false + NODE_LEVEL_TASKS_ENABLED: true + NODE_LOGS_PATH: "/opt/easysearch/logs/" + + +#MANAGED_CONFIG_VERSION: 19 +#MANAGED: true \ No newline at end of file diff --git a/config_repo/configs/ingest_config.yml b/config_repo/configs/ingest_config.yml new file mode 100644 index 00000000..bb2440e9 --- /dev/null +++ b/config_repo/configs/ingest_config.yml @@ -0,0 +1,14 @@ +configs.template: + - name: "default_ingest_config" + path: ./config/ingest_config.tpl + variable: + INGEST_CLUSTER_ID: infini_default_ingest_cluster + INGEST_CLUSTER_ENDPOINT: [ "https://localhost:9200" ] + INGEST_CLUSTER_USERNAME: "admin" + CLUSTER_VER: "1.6.0" + CLUSTER_DISTRIBUTION: "easysearch" + INDEX_PREFIX: ".infini_" + + +#MANAGED_CONFIG_VERSION: 2 +#MANAGED: true \ No newline at end of file diff --git a/config_repo/settings.yml b/config_repo/settings.yml new file mode 100644 index 00000000..41d42024 --- /dev/null +++ b/config_repo/settings.yml @@ -0,0 +1,31 @@ +configs: #define configs group + general_ingest_template: #group name + files: + - ./templates/ingest_config.tpl + - ./templates/task_config.tpl + - ./configs/ingest_config.yml + - ./configs/cluster_xx_node_xx.yml +instances: #define which config instance should fetch + infini_default_system_cluster: #instance group + plugins: + - ingest + instances: + - ck0mkk805f5virpsejp0 + - ckjrpdg05f5lrfp8qlng + configs: + - general_ingest_template + secrets: + - ingest_cluster_password + +secrets: + ingest_cluster_password: #group name + keystore: + ingest_cluster_password: + type: plaintext + value: "d7cc48e69a41dac719fb" + infini_default_ingest_cluster_password: + type: plaintext + value: "d7cc48e69a41dac719fb" +# infini_default_ingest_cluster_password: +# type: credential +# value: "ckghspo05f5q7pr20ct0" #credential_id \ No newline at end of file diff --git a/config_repo/templates/ingest_config.tpl b/config_repo/templates/ingest_config.tpl new file mode 100644 index 00000000..258dfe06 --- /dev/null +++ b/config_repo/templates/ingest_config.tpl @@ -0,0 +1,97 @@ +elasticsearch: + - name: $[[INGEST_CLUSTER_ID]] + enabled: true + endpoints: $[[INGEST_CLUSTER_ENDPOINT]] + discovery: + enabled: false + basic_auth: + username: $[[INGEST_CLUSTER_USERNAME]] + password: $[[keystore.ingest_cluster_password]] + +metrics: + enabled: true + queue: metrics + network: + enabled: true + summary: true + details: true + memory: + metrics: + - swap + - memory + disk: + metrics: + - iops + - usage + cpu: + metrics: + - idle + - system + - user + - iowait + - load + instance: + enabled: true + +elastic: + availability_check: + enabled: false + +pipeline: + - name: merge_logs + auto_start: true + keep_running: true + processor: + - indexing_merge: + elasticsearch: "$[[INGEST_CLUSTER_ID]]" + index_name: ".infini_logs" + type_name: "_doc" + input_queue: "logs" + idle_timeout_in_seconds: 10 + output_queue: + name: "merged_requests" + worker_size: 1 + bulk_size_in_mb: 5 + - name: merge_metrics + auto_start: true + keep_running: true + processor: + - indexing_merge: + elasticsearch: "$[[INGEST_CLUSTER_ID]]" + index_name: ".infini_metrics" + type_name: "_doc" + input_queue: "metrics" + output_queue: + name: "merged_requests" + worker_size: 1 + bulk_size_in_mb: 5 + - name: ingest_merged_requests + enabled: true + auto_start: true + keep_running: true + processor: + - bulk_indexing: + max_worker_size: 1 + verbose_bulk_result: false + bulk: + batch_size_in_mb: 5 + batch_size_in_docs: 5000 + max_retry_times: 0 + invalid_queue: "" + response_handle: + include_index_stats: false + include_action_stats: false + output_bulk_stats: false + include_error_details: true + save_error_results: true + save_success_results: false + save_busy_results: false + consumer: + fetch_max_messages: 5 + queues: + type: indexing_merge + when: + cluster_available: ["$[[INGEST_CLUSTER_ID]]"] + +#MANAGED_CONFIG_VERSION: 16 +#MANAGED: true \ No newline at end of file diff --git a/config_repo/templates/task_config.tpl b/config_repo/templates/task_config.tpl new file mode 100644 index 00000000..119d2fc8 --- /dev/null +++ b/config_repo/templates/task_config.tpl @@ -0,0 +1,87 @@ +elasticsearch: + - id: $[[CLUSTER_ID]] + name: $[[CLUSTER_ID]] + enabled: true + endpoints: $[[CLUSTER_ENDPOINT]] + discovery: + enabled: false + basic_auth: + username: $[[CLUSTER_USERNAME]] + password: $[[keystore.$[[CLUSTER_ID]]_password]] + +pipeline: +#clsuter level metrics +- auto_start: $[[CLUSTER_LEVEL_TASKS_ENABLED]] + enabled: $[[CLUSTER_LEVEL_TASKS_ENABLED]] + keep_running: true + singleton: true + name: collect_$[[CLUSTER_ID]]_es_cluster_stats + retry_delay_in_ms: 10000 + processor: + - es_cluster_stats: + elasticsearch: $[[CLUSTER_ID]] + labels: + cluster_id: $[[CLUSTER_ID]] + when: + cluster_available: ["$[[CLUSTER_ID]]"] + +- auto_start: $[[CLUSTER_LEVEL_TASKS_ENABLED]] + enabled: $[[CLUSTER_LEVEL_TASKS_ENABLED]] + keep_running: true + singleton: true + name: collect_$[[CLUSTER_ID]]_es_index_stats + retry_delay_in_ms: 10000 + processor: + - es_index_stats: + elasticsearch: $[[CLUSTER_ID]] + labels: + cluster_id: $[[CLUSTER_ID]] + when: + cluster_available: ["$[[CLUSTER_ID]]"] + +- auto_start: $[[CLUSTER_LEVEL_TASKS_ENABLED]] + enabled: $[[CLUSTER_LEVEL_TASKS_ENABLED]] + keep_running: true + singleton: true + name: collect_$[[CLUSTER_ID]]_es_cluster_health + retry_delay_in_ms: 10000 + processor: + - es_cluster_health: + elasticsearch: $[[CLUSTER_ID]] + labels: + cluster_id: $[[CLUSTER_ID]] + when: + cluster_available: ["$[[CLUSTER_ID]]"] + +#node level metrics +- auto_start: $[[NODE_LEVEL_TASKS_ENABLED]] + enabled: $[[NODE_LEVEL_TASKS_ENABLED]] + keep_running: true + name: collect_$[[CLUSTER_ID]]_es_node_stats + retry_delay_in_ms: 10000 + processor: + - es_node_stats: + elasticsearch: $[[CLUSTER_ID]] + labels: + cluster_id: $[[CLUSTER_ID]] + when: + cluster_available: ["$[[CLUSTER_ID]]"] + +#node logs +- auto_start: $[[NODE_LEVEL_TASKS_ENABLED]] + enabled: $[[NODE_LEVEL_TASKS_ENABLED]] + keep_running: true + name: collect_$[[CLUSTER_ID]]_es_logs + retry_delay_in_ms: 10000 + processor: + - es_logs_processor: + elasticsearch: $[[CLUSTER_ID]] + labels: + cluster_id: $[[CLUSTER_ID]] + logs_path: $[[NODE_LOGS_PATH]] + queue_name: logs + when: + cluster_available: ["$[[CLUSTER_ID]]"] + +#MANAGED_CONFIG_VERSION: 11 +#MANAGED: true \ No newline at end of file diff --git a/console.yml b/console.yml index b10556bd..d1aa5231 100644 --- a/console.yml +++ b/console.yml @@ -1,6 +1,3 @@ -path.configs: "config" -configs.auto_reload: true - #env: # INFINI_CONSOLE_ENDPOINT: "http://127.0.0.1:9000" # INGEST_CLUSTER_ENDPOINT: "https://127.0.0.1:9200" @@ -11,6 +8,20 @@ configs.auto_reload: true # WECOM_WEBHOOK_ENDPOINT: # FEISHU_WEBHOOK_ENDPOINT: + +# must in major config file +path.configs: "config" +configs: + managed: true + auto_reload: true + manager: + local_configs_repo_path: ./config_repo/ + tls: #for mTLS connection with config servers + enabled: true + ca_file: config/certs/ca.crt + cert_file: config/certs/ca.crt + key_file: config/certs/ca.key + skip_insecure_verify: false web: enabled: true embedding_api: true @@ -62,6 +73,7 @@ badger: value_log_max_entries: 1000000 value_log_file_size: 104857600 + security: enabled: true # authc: diff --git a/main.go b/main.go index dd01f93e..095ffbba 100644 --- a/main.go +++ b/main.go @@ -4,6 +4,7 @@ import ( "context" "errors" _ "expvar" + api3 "infini.sh/console/modules/agent/api" "infini.sh/console/plugin/api/email" model2 "infini.sh/framework/core/model" _ "time/tzdata" @@ -35,9 +36,9 @@ import ( "infini.sh/framework/modules/task" "infini.sh/framework/modules/ui" _ "infini.sh/framework/plugins" + _ "infini.sh/framework/plugins/managed" api2 "infini.sh/gateway/api" _ "infini.sh/gateway/proxy" - _ "infini.sh/framework/plugins/managed" ) var appConfig *config.AppConfig @@ -84,7 +85,7 @@ func main() { if !global.Env().SetupRequired() { for _, v := range modules { - module.RegisterModuleWithPriority(v.Value,v.Priority) + module.RegisterModuleWithPriority(v.Value, v.Priority) } } else { for _, v := range modules { @@ -94,6 +95,8 @@ func main() { api.RegisterAPI("") + api3.Init() + appConfig = &config.AppConfig{ UI: config.UIConfig{ LocalPath: ".public", @@ -103,7 +106,7 @@ func main() { } ok, err := env.ParseConfig("web", appConfig) - if err != nil { + if err != nil && global.Env().SystemConfig.Configs.PanicOnConfigError { panic(err) } if !ok { diff --git a/model/email_server.go b/model/email_server.go index 6c15545d..cc88dda5 100644 --- a/model/email_server.go +++ b/model/email_server.go @@ -6,7 +6,7 @@ package model import ( "fmt" - "infini.sh/framework/core/elastic" + "infini.sh/framework/core/model" "infini.sh/framework/core/orm" ) @@ -16,7 +16,7 @@ type EmailServer struct { Host string `json:"host" elastic_mapping:"host:{type:keyword}"` Port int `json:"port" elastic_mapping:"port:{type:keyword}"` TLS bool `json:"tls" elastic_mapping:"tls:{type:keyword}"` - Auth *elastic.BasicAuth `json:"auth" elastic_mapping:"auth:{type:object}"` + Auth *model.BasicAuth `json:"auth" elastic_mapping:"auth:{type:object}"` Enabled bool `json:"enabled" elastic_mapping:"enabled:{type:boolean}"` CredentialID string `json:"credential_id" elastic_mapping:"credential_id:{type:keyword}"` } diff --git a/modules/agent/api/elasticsearch.go b/modules/agent/api/elasticsearch.go new file mode 100644 index 00000000..dc5dfb4c --- /dev/null +++ b/modules/agent/api/elasticsearch.go @@ -0,0 +1,353 @@ +/* Copyright © INFINI LTD. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package api + +import ( + "context" + "fmt" + log "github.com/cihub/seelog" + httprouter "infini.sh/framework/core/api/router" + "infini.sh/framework/core/elastic" + "infini.sh/framework/core/model" + "infini.sh/framework/core/orm" + "infini.sh/framework/core/util" + "net/http" + "time" +) + +//func (h *APIHandler) refreshESNodesInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { +// id := ps.MustGetParameter("instance_id") +// obj := model.Instance{} +// obj.ID = id +// exists, err := orm.Get(&obj) +// if !exists || err != nil { +// h.WriteJSON(w, util.MapStr{ +// "_id": id, +// "found": false, +// }, http.StatusNotFound) +// return +// } +// _, err = refreshNodesInfo(&obj) +// if err != nil { +// log.Error(err) +// h.WriteError(w, err.Error(), http.StatusInternalServerError) +// return +// } +// h.WriteAckOKJSON(w) +//} + +func refreshNodesInfo(inst *model.Instance) ([]*model.ESNodeInfo, error) { + oldNodesInfo, err := getNodesBindingToAgent(inst) + if err != nil { + return nil, fmt.Errorf("error on get binding nodes info: %w", err) + } + + log.Error("oldNodesInfo:",util.MustToJSON(oldNodesInfo)) + + ctxTimeout, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + nodesInfo, err := GetElasticsearchNodesViaAgent(ctxTimeout, inst) + if err != nil { + //TODO return already biding nodes info ?? + return nil, fmt.Errorf("error on get nodes info from agent: %w", err) + } + + log.Error("nodesInfo:",util.MustToJSON(nodesInfo)) + + for _, node := range nodesInfo { + v,ok:=oldNodesInfo[node.NodeUUID] + if ok{ + node.ClusterID=v.ClusterID + } + } + + return nodesInfo, nil +} + +//get nodes info via agent +func GetElasticsearchNodesViaAgent(ctx context.Context, instance *model.Instance) ([]*model.ESNodeInfo, error) { + req := &util.Request{ + Method: http.MethodGet, + Path: "/elasticsearch/nodes/_discovery", + Context: ctx, + } + resBody := []*model.ESNodeInfo{} + err := doRequest(instance, req, &resBody) + if err != nil { + return nil, err + } + + return resBody, nil +} + +func AuthESNode(ctx context.Context, agentBaseURL string, cfg elastic.ElasticsearchConfig) (*model.ESNodeInfo, error) { + reqBody, err := util.ToJSONBytes(cfg) + if err != nil { + return nil, err + } + req := &util.Request{ + Method: http.MethodPost, + Path: "/elasticsearch/_auth", + Context: ctx, + Body: reqBody, + } + resBody := &model.ESNodeInfo{} + err = DoRequest(req, resBody) + if err != nil { + return nil, err + } + return resBody, nil +} + +func getNodeByPidOrUUID(nodes map[int]*model.ESNodeInfo, pid int, uuid string, port string) *model.ESNodeInfo { + if nodes[pid] != nil { + return nodes[pid] + } + for _, node := range nodes { + if node.NodeUUID != "" && node.NodeUUID == uuid { + return node + } + } + return nil +} + +type BindingItem struct { + ClusterID string `json:"cluster_id"` + ClusterUUID string `json:"cluster_uuid"` + NodeUUID string `json:"node_uuid"` +} + +//node -> binding item +func getNodesBindingToAgent(instance *model.Instance) (map[string]BindingItem, error) { + + //get nodes settings where agent id = instance id + q := orm.Query{ + Size: 1000, + Conds: orm.And(orm.Eq("metadata.category", "node_settings"), + orm.Eq("metadata.name", "agent"), + orm.Eq("metadata.labels.agent_id", instance.ID), + ), + } + + err, result := orm.Search(model.Setting{}, &q) + + if err != nil { + return nil, err + } + + ids := map[string]BindingItem{} + for _, row := range result.Result { + v, ok := row.(map[string]interface{}) + if ok { + x, ok := v["metadata"] + if ok { + y, ok := x.(map[string]interface{}) + if ok { + e, ok := y["labels"] + if ok { + f, ok := e.(map[string]interface{}) + if ok { + nodeID, ok := f["node_uuid"].(string) + if ok { + item := BindingItem{} + item.ClusterID = f["cluster_id"].(string) + item.ClusterUUID = f["cluster_uuid"].(string) + item.NodeUUID = nodeID + ids[item.NodeUUID] = item + } + } + } + } + } + } + } + return ids, nil +} + +func getUnAssociateNodes() (map[string][]model.ESNodeInfo, error) { + query := util.MapStr{ + "size": 3000, + "query": util.MapStr{ + "bool": util.MapStr{ + "must_not": []util.MapStr{ + { + "exists": util.MapStr{ + "field": "cluster_id", + }, + }, + }, + }, + }, + } + q := orm.Query{ + RawQuery: util.MustToJSONBytes(query), + } + + err, result := orm.Search(model.ESNodeInfo{}, &q) + if err != nil { + return nil, err + } + nodesInfo := map[string][]model.ESNodeInfo{} + for _, row := range result.Result { + node := model.ESNodeInfo{} + buf := util.MustToJSONBytes(row) + util.MustFromJSONBytes(buf, &node) + nodesInfo[node.AgentID] = append(nodesInfo[node.AgentID], node) + } + return nodesInfo, nil +} + +func GetElasticLogFiles(ctx context.Context, instance *model.Instance, logsPath string) (interface{}, error) { + + reqBody := util.MustToJSONBytes(util.MapStr{ + "logs_path": logsPath, + }) + req := &util.Request{ + Method: http.MethodPost, + Path: "/elasticsearch/logs/_list", + Context: ctx, + Body: reqBody, + } + resBody := map[string]interface{}{} + err := doRequest(instance, req, &resBody) + if err != nil { + return nil, err + } + if resBody["success"] != true { + return nil, fmt.Errorf("get elasticsearch log files error: %v", resBody) + } + return resBody["result"], nil +} + +func GetElasticLogFileContent(ctx context.Context, instance *model.Instance, body interface{}) (interface{}, error) { + req := &util.Request{ + Method: http.MethodPost, + Path: "/elasticsearch/logs/_read", + Context: ctx, + Body: util.MustToJSONBytes(body), + } + resBody := map[string]interface{}{} + err := doRequest(instance, req, &resBody) + if err != nil { + return nil, err + } + if resBody["success"] != true { + return nil, fmt.Errorf("get elasticsearch log files error: %v", resBody["error"]) + } + var hasMore bool + if v, ok := resBody["EOF"].(bool); ok && !v { + hasMore = true + } + return map[string]interface{}{ + "lines": resBody["result"], + "has_more": hasMore, + }, nil +} + +func (h *APIHandler) getLogFilesByNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + nodeID := ps.MustGetParameter("node_id") + inst, node, err := getAgentByNodeID(nodeID) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + if inst == nil { + log.Error(fmt.Sprintf("can not find agent by node [%s]", nodeID)) + h.WriteJSON(w, util.MapStr{ + "success": false, + "reason": "AGENT_NOT_FOUND", + }, http.StatusOK) + return + } + logFiles, err := GetElasticLogFiles(nil, inst, node.Path.Logs) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + h.WriteJSON(w, util.MapStr{ + "success": true, + "log_files": logFiles, + }, http.StatusOK) +} + +func (h *APIHandler) getLogFileContent(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + nodeID := ps.MustGetParameter("node_id") + inst, node, err := getAgentByNodeID(nodeID) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + if inst == nil { + h.WriteError(w, fmt.Sprintf("can not find agent by node [%s]", nodeID), http.StatusInternalServerError) + return + } + reqBody := struct { + FileName string `json:"file_name"` + LogsPath string `json:"logs_path"` + Offset int `json:"offset"` + Lines int `json:"lines"` + StartLineNumber int64 `json:"start_line_number"` + }{} + err = h.DecodeJSON(req, &reqBody) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + reqBody.LogsPath = node.Path.Logs + res, err := GetElasticLogFileContent(nil, inst, reqBody) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + h.WriteJSON(w, res, http.StatusOK) +} + +func getAgentByNodeID(nodeID string) (*model.Instance, *model.ESNodeInfo, error) { + queryDsl := util.MapStr{ + "size": 1, + "query": util.MapStr{ + "term": util.MapStr{ + "node_uuid": util.MapStr{ + "value": nodeID, + }, + }, + }, + "sort": []util.MapStr{ + { + "timestamp": util.MapStr{ + "order": "desc", + }, + }, + }, + } + q := &orm.Query{ + RawQuery: util.MustToJSONBytes(queryDsl), + } + err, result := orm.Search(model.ESNodeInfo{}, q) + if err != nil { + return nil, nil, err + } + if len(result.Result) > 0 { + buf := util.MustToJSONBytes(result.Result[0]) + v := &model.ESNodeInfo{} + err = util.FromJSONBytes(buf, v) + inst := &model.Instance{} + inst.ID = v.AgentID + _, err = orm.Get(inst) + if err != nil { + return nil, v, err + } + if inst.Name == "" { + return nil, v, nil + } + return inst, v, nil + } + return nil, nil, nil +} diff --git a/modules/agent/api/host.go b/modules/agent/api/host.go index f6341ebc..16419022 100644 --- a/modules/agent/api/host.go +++ b/modules/agent/api/host.go @@ -4,223 +4,219 @@ package api -// -//func (h *APIHandler) enrollHost(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { -// var reqBody []struct { -// AgentID string `json:"agent_id"` -// HostName string `json:"host_name"` -// IP string `json:"ip"` -// Source string `json:"source"` -// OSName string `json:"os_name"` -// OSArch string `json:"os_arch"` -// NodeID string `json:"node_uuid"` -// } -// err := h.DecodeJSON(req, &reqBody) -// if err != nil { -// log.Error(err) -// h.WriteError(w, err.Error(), http.StatusInternalServerError) -// return -// } -// errors := util.MapStr{} -// for i, hi := range reqBody { -// var ( -// hostInfo *host.HostInfo -// ) -// switch hi.Source { -// case "agent": -// hostInfo, err = enrollHostFromAgent(hi.AgentID) -// if err != nil { -// errors[hi.IP] = util.MapStr{ -// "error": err.Error(), -// } -// log.Error(err) -// continue -// } -// hostInfo.IP = hi.IP -// hostInfo.AgentID = hi.AgentID -// err = orm.Create(nil, hostInfo) -// if err != nil { -// errors[hi.IP] = util.MapStr{ -// "error": err.Error(), -// } -// log.Error(err) -// continue -// } -// case "es_node": -// hostInfo = &host.HostInfo{ -// IP: hi.IP, -// OSInfo: host.OS{ -// Platform: hi.OSName, -// KernelArch: hi.OSArch, -// }, -// NodeID: hi.NodeID, -// } -// default: -// errors[hi.IP] = util.MapStr{ -// "error": fmt.Errorf("unkonow source type"), -// } -// continue -// } -// hostInfo.Timestamp = time.Now() -// var ctx *orm.Context -// if i == len(reqBody) - 1 { -// ctx = &orm.Context{ -// Refresh: "wait_for", -// } -// } -// hostInfo.OSInfo.Platform = strings.ToLower(hostInfo.OSInfo.Platform) -// err = orm.Create(ctx, hostInfo) -// if err != nil { -// errors[hi.IP] = util.MapStr{ -// "error": err.Error(), -// } -// log.Error(err) -// continue -// } -// } -// resBody := util.MapStr{ -// "success": true, -// } -// if len(errors) > 0 { -// resBody["errors"] = errors -// resBody["success"] = false -// } -// -// h.WriteJSON(w, resBody, http.StatusOK) -//} -// -//func (h *APIHandler) deleteHost(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { -// hostID := ps.MustGetParameter("host_id") -// hostInfo, err := getHost(hostID) -// if err != nil { -// log.Error(err) -// h.WriteError(w, err.Error(), http.StatusInternalServerError) -// return -// } -// ctx := orm.Context{ -// Refresh: "wait_for", -// } -// err = orm.Delete(&ctx, hostInfo) -// if err != nil { -// log.Error(err) -// h.WriteError(w, err.Error(), http.StatusInternalServerError) -// return -// } -// h.WriteDeletedOKJSON(w, hostID) -//} -// -//func (h *APIHandler) GetHostAgentInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { -// hostID := ps.MustGetParameter("host_id") -// hostInfo, err := getHost(hostID) -// if err != nil { -// log.Error(err) -// h.WriteError(w, err.Error(), http.StatusInternalServerError) -// return -// } -// if hostInfo.AgentID == "" { -// h.WriteJSON(w, util.MapStr{}, http.StatusOK) -// return -// } -// -// sm := state.GetStateManager() -// ag, err := sm.GetAgent(hostInfo.AgentID) -// if err != nil { -// log.Error(err) -// h.WriteJSON(w, util.MapStr{}, http.StatusOK) -// return -// } -// aversion, err := ag.GetVersion() -// if err == nil { -// ag.Version = aversion -// orm.Save(nil, ag) -// } -// h.WriteJSON(w, util.MapStr{ -// "host_id": hostID, -// "agent_id": ag.ID, -// "version": ag.Version, -// "status": hostInfo.AgentStatus, -// "endpoint": ag.GetEndpoint(), -// }, http.StatusOK) -//} -// -//func getHost(hostID string) (*host.HostInfo, error){ -// hostInfo := &host.HostInfo{} -// hostInfo.ID = hostID -// exists, err := orm.Get(hostInfo) -// if err != nil { -// return nil, fmt.Errorf("get host info error: %w", err) -// } -// if !exists { -// return nil, fmt.Errorf("host [%s] not found", hostID) -// } -// return hostInfo, nil -//} -// -//func (h *APIHandler) GetHostElasticProcess(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { -// hostID := ps.MustGetParameter("host_id") -// hostInfo := &host.HostInfo{} -// hostInfo.ID = hostID -// exists, err := orm.Get(hostInfo) -// if err != nil { -// log.Error(err) -// h.WriteError(w, err.Error(), http.StatusInternalServerError) -// return -// } -// if !exists { -// h.WriteError(w, fmt.Sprintf("host [%s] not found", hostID), http.StatusNotFound) -// return -// } -// if hostInfo.AgentID == "" { -// h.WriteJSON(w, util.MapStr{}, http.StatusOK) -// return -// } -// sm := state.GetStateManager() -// ag, err := sm.GetAgent(hostInfo.AgentID) -// if err != nil { -// log.Error(err) -// h.WriteJSON(w, util.MapStr{}, http.StatusOK) -// return -// } -// ctx,cancel := context.WithTimeout(context.Background(), time.Second * 10) -// defer cancel() -// esNodesInfo, err := sm.GetAgentClient().GetElasticsearchNodes(ctx, ag.GetEndpoint()) -// if err != nil { -// log.Error(err) -// h.WriteError(w, err.Error(), http.StatusInternalServerError) -// return -// } -// var processes []util.MapStr -// for _, node := range esNodesInfo { -// processes = append(processes, util.MapStr{ -// "pid": node.ProcessInfo.PID, -// "pid_status": node.ProcessInfo.Status, -// "cluster_name": node.ClusterName, -// "cluster_uuid": node.ClusterUuid, -// "cluster_id": node.ClusterID, -// "node_id": node.NodeUUID, -// "node_name": node.NodeName, -// "uptime_in_ms": time.Now().UnixMilli() - node.ProcessInfo.CreateTime, -// }) -// } -// h.WriteJSON(w, util.MapStr{ -// "elastic_processes": processes, -// }, http.StatusOK) -//} -// -//func enrollHostFromAgent(agentID string) (*host.HostInfo, error){ -// sm := state.GetStateManager() -// ag, err := sm.GetAgent(agentID) -// if err != nil { -// return nil, err -// } -// if ag == nil { -// return nil, fmt.Errorf("can not found agent [%s]", agentID) -// } -// agentClient := sm.GetAgentClient() -// hostInfo, err := agentClient.GetHostInfo(nil, ag.GetEndpoint()) -// if err != nil { -// return nil, err -// } -// hostInfo.AgentStatus = ag.Status -// return hostInfo, nil -//} \ No newline at end of file +import ( + "context" + "fmt" + log "github.com/cihub/seelog" + httprouter "infini.sh/framework/core/api/router" + "infini.sh/framework/core/host" + "infini.sh/framework/core/model" + "infini.sh/framework/core/orm" + "infini.sh/framework/core/util" + "net/http" + "strings" + "time" +) + +func (h *APIHandler) enrollHost(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + var reqBody []struct { + AgentID string `json:"agent_id"` + HostName string `json:"host_name"` + IP string `json:"ip"` + Source string `json:"source"` + OSName string `json:"os_name"` + OSArch string `json:"os_arch"` + NodeID string `json:"node_uuid"` + } + err := h.DecodeJSON(req, &reqBody) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + errors := util.MapStr{} + for i, hi := range reqBody { + var ( + hostInfo *host.HostInfo + ) + switch hi.Source { + case "agent": + obj := model.Instance{} + obj.ID = hi.AgentID + exists, err := orm.Get(&obj) + if !exists || err != nil { + continue + } + hostInfo = &host.HostInfo{} + hostInfo.IP = hi.IP + hostInfo.AgentID = hi.AgentID + err = orm.Create(nil, hostInfo) + if err != nil { + errors[hi.IP] = util.MapStr{ + "error": err.Error(), + } + log.Error(err) + continue + } + case "es_node": + hostInfo = &host.HostInfo{ + IP: hi.IP, + OSInfo: host.OS{ + Platform: hi.OSName, + KernelArch: hi.OSArch, + }, + NodeID: hi.NodeID, + } + default: + errors[hi.IP] = util.MapStr{ + "error": fmt.Errorf("unkonow source type"), + } + continue + } + hostInfo.Timestamp = time.Now() + var ctx *orm.Context + if i == len(reqBody) - 1 { + ctx = &orm.Context{ + Refresh: "wait_for", + } + } + hostInfo.OSInfo.Platform = strings.ToLower(hostInfo.OSInfo.Platform) + err = orm.Create(ctx, hostInfo) + if err != nil { + errors[hi.IP] = util.MapStr{ + "error": err.Error(), + } + log.Error(err) + continue + } + } + resBody := util.MapStr{ + "success": true, + } + if len(errors) > 0 { + resBody["errors"] = errors + resBody["success"] = false + } + + h.WriteJSON(w, resBody, http.StatusOK) +} + +func (h *APIHandler) deleteHost(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + hostID := ps.MustGetParameter("host_id") + hostInfo, err := getHost(hostID) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + ctx := orm.Context{ + Refresh: "wait_for", + } + err = orm.Delete(&ctx, hostInfo) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + h.WriteDeletedOKJSON(w, hostID) +} + +func (h *APIHandler) GetHostAgentInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + hostID := ps.MustGetParameter("host_id") + hostInfo, err := getHost(hostID) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + if hostInfo.AgentID == "" { + h.WriteJSON(w, util.MapStr{}, http.StatusOK) + return + } + + obj := model.Instance{} + obj.ID = hostInfo.AgentID + exists, err := orm.Get(&obj) + if !exists || err != nil { + h.WriteJSON(w, util.MapStr{ + "_id": hostInfo.AgentID, + "found": false, + }, http.StatusNotFound) + return + } + + h.WriteJSON(w, util.MapStr{ + "host_id": hostID, + "agent_id": obj.ID, + "version": obj.Application.Version, + "status": hostInfo.AgentStatus, + "endpoint": obj.GetEndpoint(), + }, http.StatusOK) +} + +func getHost(hostID string) (*host.HostInfo, error){ + hostInfo := &host.HostInfo{} + hostInfo.ID = hostID + exists, err := orm.Get(hostInfo) + if err != nil { + return nil, fmt.Errorf("get host info error: %w", err) + } + if !exists { + return nil, fmt.Errorf("host [%s] not found", hostID) + } + return hostInfo, nil +} + +func (h *APIHandler) GetHostElasticProcess(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + hostID := ps.MustGetParameter("host_id") + hostInfo := &host.HostInfo{} + hostInfo.ID = hostID + exists, err := orm.Get(hostInfo) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + if !exists { + h.WriteError(w, fmt.Sprintf("host [%s] not found", hostID), http.StatusNotFound) + return + } + if hostInfo.AgentID == "" { + h.WriteJSON(w, util.MapStr{}, http.StatusOK) + return + } + + obj := model.Instance{} + obj.ID = hostInfo.AgentID + exists, err = orm.Get(&obj) + if !exists || err != nil { + h.WriteJSON(w, util.MapStr{ + "_id": hostInfo.AgentID, + "found": false, + }, http.StatusNotFound) + return + } + + esNodesInfo, err := GetElasticsearchNodesViaAgent(context.Background(), &obj) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + var processes []util.MapStr + for _, node := range esNodesInfo { + processes = append(processes, util.MapStr{ + "pid": node.ProcessInfo.PID, + "pid_status": node.ProcessInfo.Status, + "cluster_name": node.ClusterName, + "cluster_uuid": node.ClusterUuid, + "cluster_id": node.ClusterID, + "node_id": node.NodeUUID, + "node_name": node.NodeName, + "uptime_in_ms": time.Now().UnixMilli() - node.ProcessInfo.CreateTime, + }) + } + h.WriteJSON(w, util.MapStr{ + "elastic_processes": processes, + }, http.StatusOK) +} \ No newline at end of file diff --git a/modules/agent/api/init.go b/modules/agent/api/init.go index 254d25f7..209c514a 100644 --- a/modules/agent/api/init.go +++ b/modules/agent/api/init.go @@ -4,32 +4,29 @@ package api -func Init() { - //handler := APIHandler{} - //api.HandleAPIMethod(api.POST, "/instance", handler.registerInstance) //new +import ( + "infini.sh/framework/core/api" + "infini.sh/framework/core/api/rbac/enum" +) + +func Init() { + handler := APIHandler{} + api.HandleAPIMethod(api.POST, "/host/_enroll", handler.enrollHost) + api.HandleAPIMethod(api.GET, "/host/:host_id/agent/info", handler.GetHostAgentInfo) + api.HandleAPIMethod(api.GET, "/host/:host_id/processes", handler.GetHostElasticProcess) + api.HandleAPIMethod(api.DELETE, "/host/:host_id", handler.deleteHost) + + api.HandleAPIMethod(api.POST, "/instance/:instance_id/node/_associate", handler.RequirePermission(handler.associateESNode, enum.PermissionAgentInstanceWrite)) + api.HandleAPIMethod(api.POST, "/auto_associate", handler.RequirePermission(handler.autoAssociateESNode, enum.PermissionAgentInstanceWrite)) + + //api.HandleAPIMethod(api.POST, "/instance/:instance_id/node/_auth", handler.RequirePermission(handler.authESNode, enum.PermissionAgentInstanceWrite)) + api.HandleAPIMethod(api.DELETE, "/instance/:instance_id/_nodes", handler.RequirePermission(handler.deleteESNode, enum.PermissionAgentInstanceWrite)) + api.HandleAPIMethod(api.GET, "/instance/:instance_id/_nodes", handler.RequirePermission(handler.getESNodesInfo, enum.PermissionAgentInstanceRead)) + + //get elasticsearch node logs, direct fetch or via stored logs(TODO) + api.HandleAPIMethod(api.GET, "/elasticsearch/:id/node/:node_id/logs/_list", handler.RequirePermission(handler.getLogFilesByNode, enum.PermissionAgentInstanceRead)) + api.HandleAPIMethod(api.POST, "/elasticsearch/:id/node/:node_id/logs/_read", handler.RequirePermission(handler.getLogFileContent, enum.PermissionAgentInstanceRead)) - //api.HandleAPIMethod(api.POST, "/agent/instance", handler.registerInstance) - //api.HandleAPIMethod(api.GET, "/agent/instance/_search", handler.RequirePermission(handler.searchInstance, enum.PermissionAgentInstanceRead)) - //api.HandleAPIMethod(api.GET, "/agent/instance/:instance_id", handler.getInstance) - //api.HandleAPIMethod(api.PUT, "/agent/instance/:instance_id", handler.updateInstance) - //api.HandleAPIMethod(api.DELETE, "/agent/instance/:instance_id", handler.RequirePermission(handler.deleteInstance, enum.PermissionAgentInstanceWrite)) - //api.HandleAPIMethod(api.POST, "/agent/instance/_stats", handler.RequirePermission(handler.getInstanceStats, enum.PermissionAgentInstanceRead)) - //api.HandleAPIMethod(api.GET, "/agent/log/node/:node_id/files", handler.RequirePermission(handler.getLogFilesByNode, enum.PermissionAgentInstanceRead)) - //api.HandleAPIMethod(api.POST, "/agent/log/node/:node_id/_scroll", handler.RequirePermission(handler.getLogFileContent, enum.PermissionAgentInstanceRead)) - //api.HandleAPIMethod(api.GET, "/agent/instance/:instance_id/_nodes", handler.RequirePermission(handler.getESNodesInfo, enum.PermissionAgentInstanceRead)) - //api.HandleAPIMethod(api.POST, "/agent/instance/:instance_id/_nodes/_refresh", handler.RequirePermission(handler.refreshESNodesInfo, enum.PermissionAgentInstanceWrite)) - //api.HandleAPIMethod(api.POST, "/agent/instance/:instance_id/node/_auth", handler.RequirePermission(handler.authESNode, enum.PermissionAgentInstanceWrite)) - //api.HandleAPIMethod(api.DELETE, "/agent/instance/:instance_id/_nodes", handler.RequirePermission(handler.deleteESNode, enum.PermissionAgentInstanceWrite)) - //api.HandleAPIMethod(api.POST, "/agent/instance/:instance_id/node/_associate", handler.RequirePermission(handler.associateESNode, enum.PermissionAgentInstanceWrite)) - //api.HandleAPIMethod(api.POST, "/agent/instance/try_connect", handler.RequireLogin(handler.tryConnect)) - //api.HandleAPIMethod(api.POST, "/agent/auto_associate", handler.RequirePermission(handler.autoAssociateESNode, enum.PermissionAgentInstanceWrite)) - // - //api.HandleAPIMethod(api.POST, "/host/_enroll", handler.enrollHost) - //api.HandleAPIMethod(api.GET, "/host/:host_id/agent/info",handler.GetHostAgentInfo) - //api.HandleAPIMethod(api.GET, "/host/:host_id/processes",handler.GetHostElasticProcess) - //api.HandleAPIMethod(api.DELETE, "/host/:host_id",handler.deleteHost) - // - // //api.HandleAPIMethod(api.POST, "/agent/install_command", handler.RequireLogin(handler.generateInstallCommand)) //api.HandleAPIMethod(api.GET, "/agent/install.sh", handler.getInstallScript) } diff --git a/modules/agent/api/instance.go b/modules/agent/api/instance.go index a6ebe0ed..563ce8cf 100644 --- a/modules/agent/api/instance.go +++ b/modules/agent/api/instance.go @@ -11,3 +11,4 @@ import ( type APIHandler struct { api.Handler } + diff --git a/modules/agent/api/log.go b/modules/agent/api/log.go deleted file mode 100644 index e4bb6341..00000000 --- a/modules/agent/api/log.go +++ /dev/null @@ -1,124 +0,0 @@ -/* Copyright © INFINI Ltd. All rights reserved. - * Web: https://infinilabs.com - * Email: hello#infini.ltd */ - -package api - -import ( - "fmt" - log "github.com/cihub/seelog" - "infini.sh/console/modules/agent/client" - "infini.sh/console/modules/agent/state" - httprouter "infini.sh/framework/core/api/router" - "infini.sh/framework/core/model" - "infini.sh/framework/core/orm" - "infini.sh/framework/core/util" - "net/http" -) - -func (h *APIHandler) getLogFilesByNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - nodeID := ps.MustGetParameter("node_id") - inst, node, err := getAgentByNodeID(nodeID) - if err != nil { - h.WriteError(w, err.Error(), http.StatusInternalServerError) - log.Error(err) - return - } - if inst == nil { - log.Error(fmt.Sprintf("can not find agent by node [%s]", nodeID)) - h.WriteJSON(w, util.MapStr{ - "success": false, - "reason": "AGENT_NOT_FOUND", - }, http.StatusOK) - return - } - logFiles, err := client.GetClient().GetElasticLogFiles(nil, inst.GetEndpoint(), node.Path.Logs) - if err != nil { - h.WriteError(w, err.Error(), http.StatusInternalServerError) - log.Error(err) - return - } - h.WriteJSON(w, util.MapStr{ - "success": true, - "log_files": logFiles, - }, http.StatusOK) -} - -func (h *APIHandler) getLogFileContent(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - nodeID := ps.MustGetParameter("node_id") - inst, node, err := getAgentByNodeID(nodeID) - if err != nil { - h.WriteError(w, err.Error(), http.StatusInternalServerError) - log.Error(err) - return - } - if inst == nil { - h.WriteError(w, fmt.Sprintf("can not find agent by node [%s]", nodeID), http.StatusInternalServerError) - return - } - reqBody := struct { - FileName string `json:"file_name"` - LogsPath string `json:"logs_path"` - Offset int `json:"offset"` - Lines int `json:"lines"` - StartLineNumber int64 `json:"start_line_number"` - }{} - err = h.DecodeJSON(req, &reqBody) - if err != nil { - h.WriteError(w, err.Error(), http.StatusInternalServerError) - log.Error(err) - return - } - reqBody.LogsPath = node.Path.Logs - sm := state.GetStateManager() - res, err := sm.GetAgentClient().GetElasticLogFileContent(nil, inst.GetEndpoint(), reqBody) - if err != nil { - h.WriteError(w, err.Error(), http.StatusInternalServerError) - log.Error(err) - return - } - h.WriteJSON(w, res, http.StatusOK) -} - -func getAgentByNodeID(nodeID string) (*model.Instance, *model.ESNodeInfo, error){ - queryDsl := util.MapStr{ - "size":1, - "query": util.MapStr{ - "term": util.MapStr{ - "node_uuid": util.MapStr{ - "value": nodeID, - }, - }, - }, - "sort": []util.MapStr{ - { - "timestamp": util.MapStr{ - "order": "desc", - }, - }, - }, - } - q := &orm.Query{ - RawQuery: util.MustToJSONBytes(queryDsl), - } - err, result := orm.Search(model.ESNodeInfo{}, q) - if err != nil { - return nil,nil, err - } - if len(result.Result) > 0 { - buf := util.MustToJSONBytes(result.Result[0]) - v := &model.ESNodeInfo{} - err = util.FromJSONBytes(buf, v) - inst := &model.Instance{} - inst.ID = v.AgentID - _, err = orm.Get(inst) - if err != nil { - return nil, v, err - } - if inst.Name == "" { - return nil, v, nil - } - return inst, v, nil - } - return nil, nil, nil -} diff --git a/modules/agent/api/tod.go b/modules/agent/api/tod.go new file mode 100644 index 00000000..72827ef4 --- /dev/null +++ b/modules/agent/api/tod.go @@ -0,0 +1,881 @@ +/* Copyright © INFINI LTD. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package api + +import ( + "context" + "errors" + "fmt" + log "github.com/cihub/seelog" + common2 "infini.sh/console/modules/agent/common" + model3 "infini.sh/console/modules/agent/model" + httprouter "infini.sh/framework/core/api/router" + "infini.sh/framework/core/elastic" + "infini.sh/framework/core/global" + "infini.sh/framework/core/host" + "infini.sh/framework/core/model" + "infini.sh/framework/core/orm" + "infini.sh/framework/core/util" + "infini.sh/framework/modules/elastic/common" + "net/http" + "net/url" + "strings" + "sync" +) + +//func (h *APIHandler) updateInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { +// id := ps.MustGetParameter("instance_id") +// oldInst := model.Instance{} +// oldInst.ID = id +// _, err := orm.Get(&oldInst) +// if err != nil { +// if err == elastic2.ErrNotFound { +// h.WriteJSON(w, util.MapStr{ +// "_id": id, +// "result": "not_found", +// }, http.StatusNotFound) +// return +// } +// h.WriteError(w, err.Error(), http.StatusInternalServerError) +// log.Error(err) +// return +// } +// +// obj := model.Instance{} +// err = h.DecodeJSON(req, &obj) +// if err != nil { +// h.WriteError(w, err.Error(), http.StatusInternalServerError) +// log.Error(err) +// return +// } +// +// oldInst.Name = obj.Name +// oldInst.Endpoint = obj.Endpoint +// oldInst.Description = obj.Description +// oldInst.Tags = obj.Tags +// oldInst.BasicAuth = obj.BasicAuth +// err = orm.Update(&orm.Context{ +// Refresh: "wait_for", +// }, &oldInst) +// if err != nil { +// h.WriteError(w, err.Error(), http.StatusInternalServerError) +// log.Error(err) +// return +// } +// +// h.WriteJSON(w, util.MapStr{ +// "_id": obj.ID, +// "result": "updated", +// }, 200) +//} +// +//func (h *APIHandler) searchInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { +// +// var ( +// keyword = h.GetParameterOrDefault(req, "keyword", "") +// //queryDSL = `{"query":{"bool":{"must":[%s]}}, "size": %d, "from": %d}` +// strSize = h.GetParameterOrDefault(req, "size", "20") +// strFrom = h.GetParameterOrDefault(req, "from", "0") +// ) +// +// var ( +// mustQ []interface{} +// ) +// +// if keyword != "" { +// mustQ = append(mustQ, util.MapStr{ +// "query_string": util.MapStr{ +// "default_field": "*", +// "query": keyword, +// }, +// }) +// } +// size, _ := strconv.Atoi(strSize) +// if size <= 0 { +// size = 20 +// } +// from, _ := strconv.Atoi(strFrom) +// if from < 0 { +// from = 0 +// } +// +// queryDSL := util.MapStr{ +// "size": size, +// "from": from, +// } +// if len(mustQ) > 0 { +// queryDSL["query"] = util.MapStr{ +// "bool": util.MapStr{ +// "must": mustQ, +// }, +// } +// } +// +// q := orm.Query{} +// q.RawQuery = util.MustToJSONBytes(queryDSL) +// +// err, res := orm.Search(&model.Instance{}, &q) +// if err != nil { +// log.Error(err) +// h.WriteError(w, err.Error(), http.StatusInternalServerError) +// return +// } +// +// h.Write(w, res.Raw) +//} + +func (h *APIHandler) getESNodesInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + + id := ps.MustGetParameter("instance_id") + obj := model.Instance{} + obj.ID = id + exists, err := orm.Get(&obj) + if !exists || err != nil { + h.WriteJSON(w, util.MapStr{ + "_id": id, + "found": false, + }, http.StatusNotFound) + return + } + + nodes, err := refreshNodesInfo(&obj) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + h.WriteJSON(w, nodes, http.StatusOK) +} + +func (h *APIHandler) authESNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + id := ps.MustGetParameter("instance_id") + inst := model.Instance{} + inst.ID = id + exists, err := orm.Get(&inst) + if !exists || err != nil { + h.WriteJSON(w, util.MapStr{ + "_id": id, + "found": false, + }, http.StatusNotFound) + return + } + reqBody := struct { + NodeID string `json:"node_id"` + ESConfig *elastic.ElasticsearchConfig `json:"es_config"` + }{} + err = h.DecodeJSON(req, &reqBody) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + //node id maybe is missing + if reqBody.NodeID != "" { + //verify the node id, if the node is is actually the node of the instance + oldNodeInfo := &model.ESNodeInfo{ + ID: reqBody.NodeID, + } + exists, err = orm.Get(oldNodeInfo) + if !exists || err != nil { + h.WriteJSON(w, fmt.Sprintf("node [%s] of agent [%s] was not found", oldNodeInfo.ID, inst.Name), http.StatusInternalServerError) + return + } + }else{ + //find out the node id with credentials + cfg := reqBody.ESConfig + if cfg.Endpoint == "" { + cfg.Endpoint = cfg.GetAnyEndpoint() + } + basicAuth, err := common.GetBasicAuth(cfg) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + cfg.BasicAuth = basicAuth + nodeInfo, err := AuthESNode(context.Background(), inst.GetEndpoint(), *cfg) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + //host, port, err := net.SplitHostPort(nodeInfo.PublishAddress) + //if err != nil { + // log.Error(err) + // h.WriteError(w, err.Error(), http.StatusInternalServerError) + // return + //} + //if !util.StringInArray(inst.Network.IP, host) && !net.ParseIP(host).IsLoopback() { + // h.WriteError(w, fmt.Sprintf("got node host %s not match any ip of %v", host, inst.Network.IP), http.StatusInternalServerError) + // return + //} + //if oldNodeInfo.HttpPort != port { + // h.WriteError(w, fmt.Sprintf("port mismatch, got: %s,expected: %s", port, oldNodeInfo.HttpPort), http.StatusInternalServerError) + // return + //} + //if oldNodeInfo.ProcessInfo.PID != nodeInfo.ProcessInfo.PID { + // h.WriteError(w, fmt.Sprintf("process id mismatch, got: %d,expected: %d", nodeInfo.ProcessInfo.PID, oldNodeInfo.ProcessInfo.PID), http.StatusInternalServerError) + // return + //} + + reqBody.NodeID=nodeInfo.NodeUUID + } + + + //nodeInfo:=elastic.NodeConfig{} + //nodeInfo.ID = reqBody.NodeID + //nodeInfo.AgentID = inst.ID + //err = orm.Update(nil, nodeInfo) //update node's info and agent_id + //if err != nil { + // log.Error(err) + // h.WriteError(w, err.Error(), http.StatusInternalServerError) + // return + //} + //h.WriteJSON(w, nodeInfo, http.StatusOK) +} + +func NewClusterSettings(clusterID string) *model.Setting { + settings := model.Setting{ + Metadata: model.SettingsMetadata{ + Category: Cluster, + }, + } + settings.ID = fmt.Sprintf("%v_%v_%v", settings.Metadata.Category, settings.Metadata.Name, clusterID) + + settings.Metadata.Labels = util.MapStr{ + "cluster_id": clusterID, + } + + return &settings +} + +func NewNodeAgentSettings(clusterID, clusterUUID, nodeUUID, agentID, agentCredential string) *model.Setting { + + settings := model.Setting{ + Metadata: model.SettingsMetadata{ + Category: Node, + Name: "agent", + }, + } + settings.ID = fmt.Sprintf("%v_%v_%v", settings.Metadata.Category, settings.Metadata.Name, nodeUUID) + + settings.Metadata.Labels = util.MapStr{ + "cluster_id": clusterID, + "cluster_uuid": clusterUUID, + "node_uuid": nodeUUID, + "agent_id": agentID, + "agent_credential": agentCredential, + } + + return &settings +} + +func NewIndexSettings(clusterID, nodeID, agentID, indexName, indexID string) *model.Setting { + + settings := model.Setting{ + Metadata: model.SettingsMetadata{ + Category: Index, + }, + } + settings.ID = fmt.Sprintf("%v_%v_%v", settings.Metadata.Category, settings.Metadata.Name, nodeID) + + settings.Metadata.Labels = util.MapStr{ + "cluster_id": clusterID, + "node_id": nodeID, + "agent_id": agentID, + "index_name": indexName, + "index_id": indexID, + } + + return &settings +} + +const Cluster = "cluster_settings" +const Node = "node_settings" +const Index = "index_settings" + +func (h *APIHandler) associateESNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + + //agent id + instID := ps.MustGetParameter("instance_id") + + //node id and cluster id + reqBody := struct { + ClusterUUID string `json:"cluster_uuid"` + NodeUUID string `json:"node_uuid"` + + //infini system assigned id + ClusterID string `json:"cluster_id"` + }{} + err := h.DecodeJSON(req, &reqBody) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + //update node's setting + settings := NewNodeAgentSettings(reqBody.ClusterID, reqBody.ClusterUUID, reqBody.NodeUUID, instID, "node.AgentCredential") + err = orm.Update(&orm.Context{ + Refresh: "wait_for", + }, settings) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + //settings, err := common2.GetAgentSettings(instID, 0) + //if err != nil { + // log.Error(err) + // h.WriteError(w, err.Error(), http.StatusInternalServerError) + // return + //} + //setting := pickAgentSettings(settings, node) + //if setting == nil { + // setting, err = getAgentTaskSetting(instID, node) + // if err != nil { + // log.Error("get agent task setting error: ", err) + // } + // err = orm.Create(nil, setting) + // if err != nil { + // log.Error("save agent task setting error: ", err) + // } + //} + + h.WriteAckOKJSON(w) +} + +func (h *APIHandler) autoAssociateESNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + reqBody := struct { + ClusterIDs []string `json:"cluster_ids"` + }{} + err := h.DecodeJSON(req, &reqBody) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + // query not associated nodes info + nodesM, err := getUnAssociateNodes() + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + if len(nodesM) == 0 { + h.WriteAckOKJSON(w) + return + } + agentIds := make([]string, 0, len(nodesM)) + for agentID := range nodesM { + agentIds = append(agentIds, agentID) + } + agents, err := getAgentByIds(agentIds) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + for _, clusterID := range reqBody.ClusterIDs { + // query cluster basicauth + cfg := elastic.GetConfig(clusterID) + basicAuth, err := common.GetBasicAuth(cfg) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + taskSetting, err := getSettingsByClusterID(cfg.ID) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + for agentID, nodes := range nodesM { + var ( + inst *model.Instance + ok bool + ) + if inst, ok = agents[agentID]; !ok { + log.Warnf("agent [%v] was not found", agentID) + continue + } + settings, err := common2.GetAgentSettings(agentID, 0) + if err != nil { + log.Error(err) + continue + } + for _, v := range nodes { + host := v.PublishAddress + var endpoint string + if strings.HasPrefix(host, "::") { //for ipv6 + instURL, err := url.Parse(inst.Endpoint) + if err != nil { + log.Error(err) + continue + } + host = instURL.Hostname() + endpoint = fmt.Sprintf("%s://[%s]:%s", v.Schema, host, v.HttpPort) + } else { + endpoint = fmt.Sprintf("%s://%s", v.Schema, host) + } + escfg := elastic.ElasticsearchConfig{ + Endpoint: endpoint, + BasicAuth: basicAuth, + } + nodeInfo, err := AuthESNode(context.Background(), inst.GetEndpoint(), escfg) + if err != nil { + log.Warn(err) + continue + } + //matched + if nodeInfo.ClusterUuid == cfg.ClusterUUID { + //update node info + nodeInfo.ID = v.ID + nodeInfo.AgentID = inst.ID + nodeInfo.ClusterID = cfg.ID + err = orm.Save(nil, nodeInfo) + if err != nil { + log.Error(err) + continue + } + setting := pickAgentSettings(settings, v) + if setting == nil { + tsetting := model3.TaskSetting{ + NodeStats: &model3.NodeStatsTask{ + Enabled: true, + }, + Logs: &model3.LogsTask{ + Enabled: true, + LogsPath: nodeInfo.Path.Logs, + }, + } + if taskSetting.IndexStats != nil { + tsetting.IndexStats = taskSetting.IndexStats + taskSetting.IndexStats = nil + } + if taskSetting.ClusterHealth != nil { + tsetting.ClusterHealth = taskSetting.ClusterHealth + taskSetting.ClusterHealth = nil + } + if taskSetting.ClusterStats != nil { + tsetting.ClusterStats = taskSetting.ClusterStats + taskSetting.ClusterStats = nil + } + setting = &model.Setting{ + Metadata: model.SettingsMetadata{ + Category: "agent", + Name: "task", + Labels: util.MapStr{ + "agent_id": agentID, + "cluster_uuid": nodeInfo.ClusterUuid, + "cluster_id": nodeInfo.ClusterID, + "node_uuid": nodeInfo.NodeUUID, + "endpoint": fmt.Sprintf("%s://%s", nodeInfo.Schema, nodeInfo.PublishAddress), + }, + }, + Payload: util.MapStr{ + "task": tsetting, + }, + } + err = orm.Create(nil, setting) + if err != nil { + log.Error("save agent task setting error: ", err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + } + } + } + + } + } + h.WriteAckOKJSON(w) +} + +func getAgentByIds(agentIDs []string) (map[string]*model.Instance, error) { + query := util.MapStr{ + "size": len(agentIDs), + "query": util.MapStr{ + "terms": util.MapStr{ + "id": agentIDs, + }, + }, + } + q := orm.Query{ + RawQuery: util.MustToJSONBytes(query), + } + err, result := orm.Search(model.Instance{}, &q) + if err != nil { + return nil, err + } + agents := map[string]*model.Instance{} + for _, row := range result.Result { + inst := model.Instance{} + buf := util.MustToJSONBytes(row) + util.MustFromJSONBytes(buf, &inst) + agents[inst.ID] = &inst + } + return agents, nil +} + +func (h *APIHandler) deleteESNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + id := ps.MustGetParameter("instance_id") + nodeIDs := []string{} + err := h.DecodeJSON(req, &nodeIDs) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + if len(nodeIDs) > 0 { + q := util.MapStr{ + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "terms": util.MapStr{ + "id": nodeIDs, + }, + }, + { + "term": util.MapStr{ + "agent_id": util.MapStr{ + "value": id, + }, + }, + }, + }, + }, + }, + } + err = orm.DeleteBy(model.ESNodeInfo{}, util.MustToJSONBytes(q)) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + q = util.MapStr{ + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "terms": util.MapStr{ + "metadata.labels.node_uuid": nodeIDs, + }, + }, + { + "term": util.MapStr{ + "metadata.labels.agent_id": util.MapStr{ + "value": id, + }, + }, + }, + }, + }, + }, + } + } + h.WriteAckOKJSON(w) +} + +// +//func (h *APIHandler) tryConnect(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { +// var reqBody = struct { +// Endpoint string `json:"endpoint"` +// BasicAuth model.BasicAuth +// }{} +// err := h.DecodeJSON(req, &reqBody) +// if err != nil { +// h.WriteError(w, err.Error(), http.StatusInternalServerError) +// return +// } +// connectRes, err := client.GetClient().GetInstanceBasicInfo(context.Background(), reqBody.Endpoint) +// if err != nil { +// h.WriteError(w, err.Error(), http.StatusInternalServerError) +// return +// } +// h.WriteJSON(w, connectRes, http.StatusOK) +//} + +func pickAgentSettings(settings []model.Setting, nodeInfo model.ESNodeInfo) *model.Setting { + for _, setting := range settings { + if setting.Metadata.Labels["node_uuid"] == nodeInfo.NodeUUID { + return &setting + } + } + return nil +} + +func getAgentTaskSetting(agentID string, v model.ESNodeInfo) (*model.Setting, error) { + taskSetting, err := getSettingsByClusterID(v.ClusterID) + if err != nil { + return nil, err + } + taskSetting.Logs = &model3.LogsTask{ + Enabled: true, + LogsPath: v.Path.Logs, + } + return &model.Setting{ + Metadata: model.SettingsMetadata{ + Category: "agent", + Name: "task", + Labels: util.MapStr{ + "agent_id": agentID, + "cluster_uuid": v.ClusterUuid, + "cluster_id": v.ClusterID, + "node_uuid": v.NodeUUID, + "endpoint": fmt.Sprintf("%s://%s", v.Schema, v.PublishAddress), + }, + }, + Payload: util.MapStr{ + "task": taskSetting, + }, + }, nil +} + +// getSettingsByClusterID query agent task settings with cluster id +func getSettingsByClusterID(clusterID string) (*model3.TaskSetting, error) { + err, result := querySettingsByClusterID(clusterID) + if err != nil { + return nil, err + } + + setting := &model3.TaskSetting{ + NodeStats: &model3.NodeStatsTask{ + Enabled: true, + }, + } + var ( + clusterStats = true + indexStats = true + clusterHealth = true + ) + keys := []string{"payload.task.cluster_stats.enabled", "payload.task.cluster_health.enabled", "payload.task.index_stats.enabled"} + for _, row := range result.Result { + if v, ok := row.(map[string]interface{}); ok { + vm := util.MapStr(v) + for _, key := range keys { + tv, _ := vm.GetValue(key) + if tv == true { + switch key { + case "payload.task.cluster_stats.enabled": + clusterStats = false + case "payload.task.index_stats.enabled": + indexStats = false + case "payload.task.cluster_health.enabled": + clusterHealth = false + } + } + } + } + } + if clusterStats { + setting.ClusterStats = &model3.ClusterStatsTask{ + Enabled: true, + } + } + if indexStats { + setting.IndexStats = &model3.IndexStatsTask{ + Enabled: true, + } + } + if clusterHealth { + setting.ClusterHealth = &model3.ClusterHealthTask{ + Enabled: true, + } + } + return setting, nil +} + +func querySettingsByClusterID(clusterID string) (error, orm.Result) { + queryDsl := util.MapStr{ + "size": 500, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "metadata.labels.cluster_id": util.MapStr{ + "value": clusterID, + }, + }, + }, + }, + "minimum_should_match": 1, + "should": []util.MapStr{ + { + "term": util.MapStr{ + "payload.task.cluster_health.enabled": util.MapStr{ + "value": true, + }, + }, + }, + { + "term": util.MapStr{ + "payload.task.cluster_stats.enabled": util.MapStr{ + "value": true, + }, + }, + }, + { + "term": util.MapStr{ + "payload.task.index_stats.enabled": util.MapStr{ + "value": true, + }, + }, + }, + }, + }, + }, + } + q := orm.Query{ + RawQuery: util.MustToJSONBytes(queryDsl), + } + return orm.Search(model.Setting{}, &q) +} + +func GetHostInfo(ctx context.Context, agentBaseURL string) (*host.HostInfo, error) { + req := &util.Request{ + Method: http.MethodGet, + Path: "/agent/host/_basic", + Context: ctx, + } + resBody := struct { + Success bool `json:"success"` + Error string `json:"error"` + HostInfo *host.HostInfo `json:"result"` + }{} + + req.Body = util.MustToJSONBytes(resBody) + + err := DoRequest(req, &resBody) + if err != nil { + return nil, err + } + + if resBody.Success != true { + return nil, fmt.Errorf("enroll error from client: %v", resBody.Error) + } + return resBody.HostInfo, nil +} + +func GetElasticProcess(ctx context.Context, agentBaseURL string, agentID string) (interface{}, error) { + req := &util.Request{ + Method: http.MethodGet, + Path: fmt.Sprintf("/elasticsearch/%s/process/_elastic", agentID), + Context: ctx, + } + resBody := map[string]interface{}{} + err := DoRequest(req, &resBody) + if err != nil { + return nil, err + } + if resBody["success"] != true { + return nil, fmt.Errorf("discover host callback error: %v", resBody["error"]) + } + return resBody["elastic_process"], nil +} + +func GetInstanceBasicInfo(ctx context.Context, agentBaseURL string) (*model.Instance, error) { + req := &util.Request{ + Method: http.MethodGet, + Path: "/_info", + Context: ctx, + } + resBody := &model.Instance{} + err := DoRequest(req, &resBody) + return resBody, err +} + +func RegisterElasticsearch(ctx context.Context, agentBaseURL string, cfgs []elastic.ElasticsearchConfig) error { + reqBody, err := util.ToJSONBytes(cfgs) + if err != nil { + return err + } + req := &util.Request{ + Method: http.MethodPost, + Path: "/elasticsearch/_register", + Context: ctx, + Body: reqBody, + } + resBody := util.MapStr{} + err = DoRequest(req, &resBody) + if err != nil { + return err + } + if resBody["acknowledged"] != true { + return fmt.Errorf("%v", resBody["error"]) + } + return nil +} + +func CreatePipeline(ctx context.Context, agentBaseURL string, body []byte) error { + req := &util.Request{ + Method: http.MethodPost, + Path: "/pipeline/tasks/", + Body: body, + Context: ctx, + } + resBody := util.MapStr{} + return DoRequest(req, &resBody) +} + +func DeletePipeline(ctx context.Context, agentBaseURL, pipelineID string) error { + req := &util.Request{ + Method: http.MethodDelete, + Path: fmt.Sprintf("/pipeline/task/%s", pipelineID), + Context: ctx, + } + return DoRequest(req, nil) +} + +func DoRequest(req *util.Request, obj interface{}) error { + panic("implement me") +} + +var mTLSClient *http.Client //TODO get mTLSClient +var initOnce = sync.Once{} + +func doRequest(instance *model.Instance, req *util.Request, obj interface{}) error { + var err error + var res *util.Result + + initOnce.Do(func() { + if global.Env().SystemConfig.Configs.TLSConfig.TLSEnabled && global.Env().SystemConfig.Configs.TLSConfig.TLSCAFile != "" { + + //init client + hClient, err := util.NewMTLSClient( + global.Env().SystemConfig.Configs.TLSConfig.TLSCAFile, + global.Env().SystemConfig.Configs.TLSConfig.TLSCertFile, + global.Env().SystemConfig.Configs.TLSConfig.TLSKeyFile) + if err != nil { + panic(err) + } + mTLSClient = hClient + } + }) + + req.Url, err = url.JoinPath(instance.GetEndpoint(), req.Path) + res, err = util.ExecuteRequestWithCatchFlag(mTLSClient, req, true) + if err != nil || res.StatusCode != 200 { + body := "" + if res != nil { + body = string(res.Body) + } + return errors.New(fmt.Sprintf("request error: %v, %v", err, body)) + } + + if res != nil { + if res.Body != nil { + return util.FromJSONBytes(res.Body, obj) + } + } + + return nil +} diff --git a/modules/agent/client/client.go b/modules/agent/client/client.go deleted file mode 100644 index 7f26092d..00000000 --- a/modules/agent/client/client.go +++ /dev/null @@ -1,279 +0,0 @@ -/* Copyright © INFINI Ltd. All rights reserved. - * Web: https://infinilabs.com - * Email: hello#infini.ltd */ - -package client - -import ( - "context" - "fmt" - "infini.sh/console/modules/agent/common" - "infini.sh/framework/core/model" - "infini.sh/framework/core/elastic" - "infini.sh/framework/core/host" - "infini.sh/framework/core/util" - "net/http" -) - -var defaultClient ClientAPI - -func GetClient() ClientAPI { - if defaultClient == nil { - panic("agent client not init") - } - return defaultClient -} - -func RegisterClient(client ClientAPI) { - defaultClient = client -} -type ClientAPI interface { - GetHostInfo(ctx context.Context, agentBaseURL string) (*host.HostInfo, error) - GetElasticProcess(ctx context.Context, agentBaseURL string, agentID string)(interface{}, error) - GetElasticLogFiles(ctx context.Context, agentBaseURL string, logsPath string)(interface{}, error) - GetElasticLogFileContent(ctx context.Context, agentBaseURL string, body interface{})(interface{}, error) - GetInstanceBasicInfo(ctx context.Context, agentBaseURL string) (*model.Instance, error) - RegisterElasticsearch(ctx context.Context, agentBaseURL string, cfgs []elastic.ElasticsearchConfig) error - GetElasticsearchNodes(ctx context.Context, agentBaseURL string) ([]model.ESNodeInfo, error) - AuthESNode(ctx context.Context, agentBaseURL string, cfg elastic.ElasticsearchConfig) (*model.ESNodeInfo, error) - CreatePipeline(ctx context.Context, agentBaseURL string, body []byte) error - DeletePipeline(ctx context.Context, agentBaseURL, pipelineID string) error - SetKeystoreValue(ctx context.Context, agentBaseURL string, key, value string) error - SaveDynamicConfig(ctx context.Context, agentBaseURL string, name, content string) error - SaveIngestConfig(ctx context.Context, agentBaseURL string) error - DoRequest(req *util.Request, respObj interface{}) error -} - -type Client struct { - Executor Executor -} - - -//func (client *Client) GetHostInfo(ctx context.Context, agentBaseURL string) (*host.HostInfo, error) { -// req := &util.Request{ -// Method: http.MethodGet, -// Url: fmt.Sprintf("%s/agent/host/_basic", agentBaseURL), -// Context: ctx, -// } -// resBody := struct { -// Success bool `json:"success"` -// Error string `json:"error"` -// HostInfo *host.HostInfo `json:"result"` -// }{} -// err := client.DoRequest(req, &resBody) -// if err != nil { -// return nil, err -// } -// if resBody.Success != true { -// return nil, fmt.Errorf("enroll error from client: %v", resBody.Error) -// } -// return resBody.HostInfo, nil -//} - -//TODO -func (client *Client) GetElasticProcess(ctx context.Context, agentBaseURL string, agentID string)(interface{}, error) { - panic("implement me") - req := &util.Request{ - Method: http.MethodGet, - Url: fmt.Sprintf("%s/elasticsearch/%s/process/_elastic", agentBaseURL, agentID), - Context: ctx, - } - resBody := map[string]interface{}{} - err := client.DoRequest(req, &resBody) - if err != nil { - return nil, err - } - if resBody["success"] != true { - return nil, fmt.Errorf("discover host callback error: %v", resBody["error"]) - } - return resBody["elastic_process"], nil -} - -func (client *Client) GetElasticLogFiles(ctx context.Context, agentBaseURL string, logsPath string)(interface{}, error) { - panic("implement me") - - - reqBody := util.MustToJSONBytes(util.MapStr{ - "logs_path": logsPath, - }) - req := &util.Request{ - Method: http.MethodPost, - Url: fmt.Sprintf("%s/agent/logs/elastic/list", agentBaseURL), - Context: ctx, - Body: reqBody, - } - resBody := map[string]interface{}{} - err := client.DoRequest(req, &resBody) - if err != nil { - return nil, err - } - if resBody["success"] != true { - return nil, fmt.Errorf("get elasticsearch log files error: %v", resBody["error"]) - } - return resBody["result"], nil -} - -func (client *Client) GetElasticLogFileContent(ctx context.Context, agentBaseURL string, body interface{})(interface{}, error) { - req := &util.Request{ - Method: http.MethodPost, - Url: fmt.Sprintf("%s/agent/logs/elastic/_read", agentBaseURL), - Context: ctx, - Body: util.MustToJSONBytes(body), - } - resBody := map[string]interface{}{} - err := client.DoRequest(req, &resBody) - if err != nil { - return nil, err - } - if resBody["success"] != true { - return nil, fmt.Errorf("get elasticsearch log files error: %v", resBody["error"]) - } - var hasMore bool - if v, ok := resBody["EOF"].(bool); ok && !v { - hasMore = true - } - return map[string]interface{}{ - "lines": resBody["result"], - "has_more": hasMore, - } , nil -} - -func (client *Client) GetInstanceBasicInfo(ctx context.Context, agentBaseURL string) (*model.Instance, error){ - req := &util.Request{ - Method: http.MethodGet, - Url: fmt.Sprintf("%s/_info", agentBaseURL ), - Context: ctx, - } - resBody := &model.Instance{} - err := client.DoRequest(req, &resBody) - return resBody, err -} - -func (client *Client) RegisterElasticsearch(ctx context.Context, agentBaseURL string, cfgs []elastic.ElasticsearchConfig) error { - reqBody, err := util.ToJSONBytes(cfgs) - if err != nil { - return err - } - req := &util.Request{ - Method: http.MethodPost, - Url: fmt.Sprintf("%s/elasticsearch/_register", agentBaseURL ), - Context: ctx, - Body: reqBody, - } - resBody := util.MapStr{} - err = client.DoRequest(req, &resBody) - if err != nil { - return err - } - if resBody["acknowledged"] != true { - return fmt.Errorf("%v", resBody["error"]) - } - return nil -} - -func (client *Client) GetElasticsearchNodes(ctx context.Context, agentBaseURL string) ([]model.ESNodeInfo, error) { - req := &util.Request{ - Method: http.MethodGet, - Url: fmt.Sprintf("%s/elasticsearch/nodes/_discovery", agentBaseURL ), - Context: ctx, - } - resBody := []model.ESNodeInfo{} - err := client.DoRequest(req, &resBody) - if err != nil { - return nil, err - } - - return resBody, nil -} - -func (client *Client) AuthESNode(ctx context.Context, agentBaseURL string, cfg elastic.ElasticsearchConfig) (*model.ESNodeInfo, error) { - reqBody, err := util.ToJSONBytes(cfg) - if err != nil { - return nil, err - } - req := &util.Request{ - Method: http.MethodPost, - Url: fmt.Sprintf("%s/elasticsearch/_auth", agentBaseURL ), - Context: ctx, - Body: reqBody, - } - resBody := &model.ESNodeInfo{} - err = client.DoRequest(req, resBody) - if err != nil { - return nil, err - } - return resBody, nil -} - -func (client *Client) CreatePipeline(ctx context.Context, agentBaseURL string, body []byte) error{ - req := &util.Request{ - Method: http.MethodPost, - Url: agentBaseURL + "/pipeline/tasks/", - Body: body, - Context: ctx, - } - resBody := util.MapStr{} - return client.DoRequest(req, &resBody) -} - -func (client *Client) DeletePipeline(ctx context.Context, agentBaseURL, pipelineID string) error{ - req := &util.Request{ - Method: http.MethodDelete, - Url: fmt.Sprintf("%s/pipeline/task/%s", agentBaseURL, pipelineID), - Context: ctx, - } - return client.DoRequest(req, nil) -} - -func (client *Client) SetKeystoreValue(ctx context.Context, agentBaseURL string, key, value string) error{ - body := util.MapStr{ - "key": key, - "value": value, - } - req := &util.Request{ - Method: http.MethodPost, - Url: fmt.Sprintf("%s/keystore", agentBaseURL), - Context: ctx, - Body: util.MustToJSONBytes(body), - } - return client.DoRequest(req, nil) -} - -func (client *Client) SaveDynamicConfig(ctx context.Context, agentBaseURL string, filename, content string) error{ - body := util.MapStr{ - "configs": util.MapStr{ - filename: content, - }, - } - req := &util.Request{ - Method: http.MethodPost, - Url: fmt.Sprintf("%s/config/_update", agentBaseURL), - Context: ctx, - Body: util.MustToJSONBytes(body), - } - return client.DoRequest(req, nil) -} - -func (client *Client) SaveIngestConfig(ctx context.Context, agentBaseURL string) error { - ingestCfg, basicAuth, err := common.GetAgentIngestConfig() - if err != nil { - return err - } - if basicAuth != nil && basicAuth.Password != "" { - err = client.SetKeystoreValue(ctx, agentBaseURL, "ingest_cluster_password", basicAuth.Password) - if err != nil { - return fmt.Errorf("set keystore value to agent error: %w", err) - } - } - err = client.SaveDynamicConfig(context.Background(), agentBaseURL, "ingest_variables.yml", ingestCfg ) - if err != nil { - return fmt.Errorf("save dynamic config to agent error: %w", err) - } - return nil -} - - -func (client *Client) DoRequest(req *util.Request, respObj interface{}) error { - return client.Executor.DoRequest(req, respObj) -} - diff --git a/modules/agent/client/executor.go b/modules/agent/client/executor.go deleted file mode 100644 index f756f3a1..00000000 --- a/modules/agent/client/executor.go +++ /dev/null @@ -1,100 +0,0 @@ -/* Copyright © INFINI Ltd. All rights reserved. - * Web: https://infinilabs.com - * Email: hello#infini.ltd */ - -package client - -import ( - "bytes" - "fmt" - "infini.sh/console/modules/agent/common" - "infini.sh/framework/core/util" - "io" - "net/http" -) - -type Executor interface { - DoRequest(req *util.Request, respObj interface{}) error -} - -type HttpExecutor struct { -} - -func (executor *HttpExecutor) DoRequest(req *util.Request, respObj interface{}) error { - result, err := util.ExecuteRequest(req) - if err != nil { - return err - } - if result.StatusCode != 200 { - return fmt.Errorf(string(result.Body)) - } - if respObj == nil { - return nil - } - return util.FromJSONBytes(result.Body, respObj) -} - -func NewMTLSExecutor(caCertFile, caKeyFile string) (*MTLSExecutor, error){ - var ( - instanceCrt string - instanceKey string - ) - instanceCrt, instanceKey, err := common.GetAgentInstanceCerts(caCertFile, caKeyFile) - if err != nil { - return nil, fmt.Errorf("generate tls cert error: %w", err) - } - hClient, err := util.NewMTLSClient(caCertFile, instanceCrt, instanceKey) - if err != nil { - return nil, err - } - return &MTLSExecutor{ - CaCertFile: caCertFile, - CAKeyFile: caKeyFile, - client: hClient, - }, nil -} - -type MTLSExecutor struct { - CaCertFile string - CAKeyFile string - client *http.Client -} - - -func (executor *MTLSExecutor) DoRequest(req *util.Request, respObj interface{}) error { - var reader io.Reader - if len(req.Body) > 0 { - reader = bytes.NewReader(req.Body) - } - var ( - hr *http.Request - err error - ) - if req.Context == nil { - hr, err = http.NewRequest(req.Method, req.Url, reader) - }else{ - hr, err = http.NewRequestWithContext(req.Context, req.Method, req.Url, reader) - } - if err != nil { - return err - } - res, err := executor.client.Do(hr) - if err != nil { - return err - } - defer res.Body.Close() - buf, err := io.ReadAll(res.Body) - if err != nil { - return err - } - if res.StatusCode != 200 { - return fmt.Errorf(string(buf)) - } - if respObj != nil { - err = util.FromJSONBytes(buf, respObj) - if err != nil { - return err - } - } - return nil -} \ No newline at end of file diff --git a/modules/agent/common/config.go b/modules/agent/common/config.go index fb7e6ed9..3d1ccbf2 100644 --- a/modules/agent/common/config.go +++ b/modules/agent/common/config.go @@ -5,15 +5,10 @@ package common import ( - "crypto/x509" - "encoding/pem" log "github.com/cihub/seelog" "infini.sh/console/modules/agent/model" "infini.sh/framework/core/env" - "infini.sh/framework/core/global" - "infini.sh/framework/core/util" - "os" - "path" + "infini.sh/framework/plugins/managed/common" ) @@ -30,37 +25,10 @@ func GetAgentConfig() *model.AgentConfig { log.Debug("agent config not found: %v", err) } if agentCfg.Setup.CACertFile == "" && agentCfg.Setup.CAKeyFile == "" { - agentCfg.Setup.CACertFile, agentCfg.Setup.CAKeyFile, err = GetOrInitDefaultCaCerts() + agentCfg.Setup.CACertFile, agentCfg.Setup.CAKeyFile, err = common.GetOrInitDefaultCaCerts() if err != nil { log.Errorf("generate default ca certs error: %v", err) } } return agentCfg } - -func GetOrInitDefaultCaCerts()(string, string, error){ - dataDir := global.Env().GetDataDir() - caFile := path.Join(dataDir, "certs/ca.crt") - caKey := path.Join(dataDir, "certs/ca.key") - if !(util.FileExists(caFile) && util.FileExists(caKey) ) { - err := os.MkdirAll(path.Join(dataDir, "certs"), 0775) - if err != nil { - return "", "", err - } - log.Info("auto generating cert files") - _, rootKey, rootCertPEM := util.GetRootCert() - - caKeyPEM := pem.EncodeToMemory(&pem.Block{ - Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(rootKey), - }) - _, err = util.FilePutContentWithByte(caKey, caKeyPEM) - if err != nil { - return "", "", err - } - _, err = util.FilePutContentWithByte(caFile, rootCertPEM) - if err != nil { - return "", "", err - } - } - return caFile, caKey, nil -} \ No newline at end of file diff --git a/modules/agent/common/helper.go b/modules/agent/common/helper.go index 5f75db2a..7baa7952 100644 --- a/modules/agent/common/helper.go +++ b/modules/agent/common/helper.go @@ -401,7 +401,7 @@ func GetLatestOnlineAgentIDs(agentIds []string, lastSeconds int) (map[string]str return agentIDs, nil } -func GetAgentIngestConfig() (string, *elastic.BasicAuth, error) { +func GetAgentIngestConfig() (string, *model.BasicAuth, error) { agCfg := GetAgentConfig() var ( endpoint string @@ -422,7 +422,7 @@ func GetAgentIngestConfig() (string, *elastic.BasicAuth, error) { } var ( - basicAuth elastic.BasicAuth + basicAuth model.BasicAuth ) if agCfg.Setup.IngestClusterCredentialID != "" { cred := credential.Credential{} @@ -435,7 +435,7 @@ func GetAgentIngestConfig() (string, *elastic.BasicAuth, error) { if err != nil { return "", nil, fmt.Errorf("decode credential [%s] error: %w", cred.ID, err) } - if basicAuth, ok = info.(elastic.BasicAuth); !ok { + if basicAuth, ok = info.(model.BasicAuth); !ok { log.Debug("invalid credential: ", cred) } }else{ diff --git a/modules/agent/common/helper_test.go b/modules/agent/common/helper_test.go deleted file mode 100644 index 285df64a..00000000 --- a/modules/agent/common/helper_test.go +++ /dev/null @@ -1,41 +0,0 @@ -/* Copyright © INFINI Ltd. All rights reserved. - * Web: https://infinilabs.com - * Email: hello#infini.ltd */ - -package common - -import ( - "fmt" - "gopkg.in/yaml.v2" - "infini.sh/framework/core/util" - "testing" -) - -func TestTransformSettingsToConfig(t *testing.T) { - setting := TaskSetting{ - ClusterHealth: ClusterHealthTask{ - Enabled: true, - }, - ClusterStats: ClusterStatsTask { - Enabled: true, - }, - IndexStats: IndexStatsTask{ - Enabled: true, - }, - NodeStats: NodeStatsTask{ - Enabled: true, - NodeIDs: []string{"ddddnnnn"}, - }, - } - pipelines, err := transformSettingsToConfig(&setting, "testxxx") - if err !=nil { - t.Fatal(err) - } - buf, err := yaml.Marshal(util.MapStr{ - "pipeline": pipelines, - }) - if err !=nil { - t.Fatal(err) - } - fmt.Println(string(buf)) -} diff --git a/modules/agent/model/const.go b/modules/agent/model/const.go deleted file mode 100644 index 3cea10ea..00000000 --- a/modules/agent/model/const.go +++ /dev/null @@ -1,15 +0,0 @@ -/* Copyright © INFINI Ltd. All rights reserved. - * Web: https://infinilabs.com - * Email: hello#infini.ltd */ - -package model - -const ( - StatusOnline string = "online" - StatusOffline = "offline" -) - -const ( - KVAgentIngestConfigChanged = "agent_ingest_config_changed" - KVSyncDynamicTaskSettings = "agent_sync_dynamic_task_settings" -) \ No newline at end of file diff --git a/modules/agent/state/state.go b/modules/agent/state/state.go deleted file mode 100644 index a4ff8342..00000000 --- a/modules/agent/state/state.go +++ /dev/null @@ -1,352 +0,0 @@ -/* Copyright © INFINI Ltd. All rights reserved. - * Web: https://infinilabs.com - * Email: hello#infini.ltd */ - -package state - -import ( - "context" - "fmt" - "github.com/buger/jsonparser" - log "github.com/cihub/seelog" - "gopkg.in/yaml.v2" - "infini.sh/console/modules/agent/client" - "infini.sh/console/modules/agent/common" - model2 "infini.sh/console/modules/agent/model" - "infini.sh/framework/core/model" - "infini.sh/framework/core/host" - "infini.sh/framework/core/kv" - "infini.sh/framework/core/orm" - "infini.sh/framework/core/util" - "infini.sh/framework/modules/elastic" - "runtime" - "runtime/debug" - "strconv" - "sync" - "time" -) - -var stateManager IStateManager - -func GetStateManager() IStateManager { - if stateManager == nil { - panic("agent state manager not init") - } - return stateManager -} - -func RegisterStateManager(sm IStateManager) { - stateManager = sm -} - -func IsEnabled() bool { - return stateManager != nil -} - -type IStateManager interface { - GetAgent(ID string) (*model.Instance, error) - UpdateAgent(inst *model.Instance, syncToES bool) (*model.Instance, error) - GetTaskAgent(clusterID string) (*model.Instance, error) - DeleteAgent(agentID string) error - LoopState() - Stop() - GetAgentClient() client.ClientAPI -} - -type StateManager struct { - TTL time.Duration // kv ttl - KVKey string - stopC chan struct{} - stopCompleteC chan struct{} - agentClient *client.Client - agentIds map[string]string - agentMutex sync.Mutex - workerChan chan struct{} -} - -func NewStateManager(TTL time.Duration, kvKey string, agentIds map[string]string, agentClient *client.Client) *StateManager { - return &StateManager{ - TTL: TTL, - KVKey: kvKey, - stopC: make(chan struct{}), - stopCompleteC: make(chan struct{}), - agentClient: agentClient, - agentIds: agentIds, - workerChan: make(chan struct{}, runtime.NumCPU()), - } -} - -func (sm *StateManager) checkAgentStatus() { - onlineAgentIDs, err := common.GetLatestOnlineAgentIDs(nil, int(sm.TTL.Seconds())) - if err != nil { - log.Error(err) - return - } - //add new agent to state - sm.agentMutex.Lock() - for agentID := range onlineAgentIDs { - if _, ok := sm.agentIds[agentID]; !ok { - log.Infof("status of agent [%s] changed to online", agentID) - sm.agentIds[agentID] = model2.StatusOnline - } - } - sm.agentMutex.Unlock() - for agentID, status := range sm.agentIds { - sm.workerChan <- struct{}{} - go func(agentID string) { - defer func() { - if err := recover(); err != nil { - log.Errorf("check agent [%s] status recover form panic error: %v", agentID, err) - debug.PrintStack() - } - <-sm.workerChan - }() - sm.syncSettings(agentID) - sm.syncIngestSettings(agentID) - if _, ok := onlineAgentIDs[agentID]; ok { - host.UpdateHostAgentStatus(agentID, model2.StatusOnline) - if status == model2.StatusOnline { - return - } - // status change to online - sm.agentMutex.Lock() - sm.agentIds[agentID] = model2.StatusOnline - sm.agentMutex.Unlock() - log.Infof("status of agent [%s] changed to online", agentID) - return - }else{ - // already offline - if status == model2.StatusOffline { - return - } - } - // status change to offline - sm.agentMutex.Lock() - sm.agentIds[agentID] = model2.StatusOffline - sm.agentMutex.Unlock() - ag, err := sm.GetAgent(agentID) - if err != nil { - if err != elastic.ErrNotFound { - log.Error(err) - } - return - } - ag.Status = model2.StatusOffline - log.Infof("agent [%s] is offline", ag.Endpoint) - _, err = sm.UpdateAgent(ag, true) - if err != nil { - log.Error(err) - return - } - //update host agent status - host.UpdateHostAgentStatus(ag.ID, model2.StatusOffline) - }(agentID) - - } -} -func (sm *StateManager) getLastSyncSettingsTimestamp(agentID string) int64{ - vbytes, err := kv.GetValue(model2.KVSyncDynamicTaskSettings, []byte(agentID)) - if err != nil { - log.Error(err) - } - if vbytes == nil { - return 0 - } - t, err := strconv.ParseInt(string(vbytes), 10, 64) - if err != nil { - log.Error(err) - } - - return t -} - -func (sm *StateManager) syncSettings(agentID string) { - ag, err := sm.GetAgent(agentID) - if err != nil { - if err != elastic.ErrNotFound { - log.Errorf("get agent error: %v", err) - } - return - } - newTimestamp := time.Now().UnixMilli() - lastSyncTimestamp := sm.getLastSyncSettingsTimestamp(agentID) - settings, err := common.GetAgentSettings(agentID, lastSyncTimestamp) - if err != nil { - log.Errorf("query agent settings error: %v", err) - return - } - if len(settings) == 0 { - log.Debugf("got no settings of agent [%s]", agentID) - return - } - parseResult, err := common.ParseAgentSettings(settings) - if err != nil { - log.Errorf("parse agent settings error: %v", err) - return - } - agClient := sm.GetAgentClient() - var clusterCfgs []util.MapStr - if len(parseResult.ClusterConfigs) > 0 { - for _, cfg := range parseResult.ClusterConfigs { - clusterCfg := util.MapStr{ - "name": cfg.ID, - "enabled": true, - "endpoint": cfg.GetAnyEndpoint(), - } - if cfg.BasicAuth != nil && cfg.BasicAuth.Password != ""{ - cid := cfg.ID - if cfg.ClusterUUID != "" { - cid = cfg.ClusterUUID - } - err = agClient.SetKeystoreValue(context.Background(), ag.GetEndpoint(), fmt.Sprintf("%s_password", cid), cfg.BasicAuth.Password) - if err != nil { - log.Errorf("set keystore value error: %v", err) - continue - } - clusterCfg["basic_auth"] = util.MapStr{ - "username": cfg.BasicAuth.Username, - "password": fmt.Sprintf("$[[keystore.%s_password]]", cid), - } - } - clusterCfgs = append(clusterCfgs, clusterCfg) - } - } - var dynamicCfg = util.MapStr{} - if len(clusterCfgs) > 0 { - dynamicCfg["elasticsearch"] = clusterCfgs - } - if len(parseResult.Pipelines) > 0 { - dynamicCfg["pipeline"] = parseResult.Pipelines - } - cfgBytes, err := yaml.Marshal(dynamicCfg) - if err != nil { - log.Error("serialize config to yaml error: ", err) - return - } - //TODO - err = agClient.SaveDynamicConfig(context.Background(), ag.GetEndpoint(), "dynamic_task.yml", string(cfgBytes)) - - newTimestampStr := strconv.FormatInt(newTimestamp, 10) - err = kv.AddValue(model2.KVSyncDynamicTaskSettings, []byte(agentID), []byte(newTimestampStr)) - if err != nil { - log.Error(err) - } -} -func (sm *StateManager) syncIngestSettings(agentID string) { - v, err := kv.GetValue(model2.KVAgentIngestConfigChanged, []byte(agentID)) - if err != nil { - log.Error(err) - } - if string(v) != "1" { - return - } - ag, err := sm.GetAgent(agentID) - if err != nil { - if err != elastic.ErrNotFound { - log.Errorf("get agent error: %v", err) - } - return - } - err = sm.agentClient.SaveIngestConfig(context.Background(), ag.GetEndpoint()) - if err == nil { - kv.AddValue(model2.KVAgentIngestConfigChanged,[]byte(agentID), []byte("0")) - } -} - -func (sm *StateManager) getAvailableAgent(clusterID string) (*model.Instance, error) { - agents, err := common.LoadAgentsFromES(clusterID) - if err != nil { - return nil, err - } - if len(agents) == 0 { - return nil, nil - } - for _, ag := range agents { - if ag.Status == "offline" { - continue - } - } - return nil, nil -} - -func (sm *StateManager) LoopState() { - t := time.NewTicker(30 * time.Second) - defer t.Stop() -MAINLOOP: - for { - select { - case <-sm.stopC: - sm.stopCompleteC <- struct{}{} - close(sm.workerChan) - break MAINLOOP - case <-t.C: - sm.checkAgentStatus() - } - } -} - -func (sm *StateManager) Stop() { - sm.stopC <- struct{}{} - <-sm.stopCompleteC -} - -func (sm *StateManager) GetAgent(ID string) (*model.Instance, error) { - buf, err := kv.GetValue(sm.KVKey, []byte(ID)) - if err != nil { - return nil, err - } - strTime, _ := jsonparser.GetString(buf, "timestamp") - timestamp, _ := time.Parse(time.RFC3339, strTime) - inst := &model.Instance{} - inst.ID = ID - if time.Since(timestamp) > sm.TTL { - exists, err := orm.Get(inst) - if err != nil { - return nil, err - } - if !exists { - return nil, fmt.Errorf("can not found agent [%s]", ID) - } - //inst.Timestamp = time.Now() - err = kv.AddValue(sm.KVKey, []byte(ID), util.MustToJSONBytes(inst)) - if err != nil { - log.Errorf("save agent [%s] to kv error: %v", ID, err) - } - return inst, nil - } - err = util.FromJSONBytes(buf, inst) - return inst, err -} - -func (sm *StateManager) UpdateAgent(inst *model.Instance, syncToES bool) (*model.Instance, error) { - //inst.Timestamp = time.Now() - err := kv.AddValue(sm.KVKey, []byte(inst.ID), util.MustToJSONBytes(inst)) - if syncToES { - ctx := orm.Context{ - Refresh: "wait_for", - } - err = orm.Update(&ctx, inst) - if err != nil { - return nil, err - } - } - return inst, err -} - -func (sm *StateManager) GetTaskAgent(clusterID string) (*model.Instance, error) { - return nil, nil -} - - -func (sm *StateManager) DeleteAgent(agentID string) error { - sm.agentMutex.Lock() - delete(sm.agentIds, agentID) - sm.agentMutex.Unlock() - log.Infof("delete agent [%s] from state", agentID) - - return kv.DeleteKey(sm.KVKey, []byte(agentID)) -} - -func (sm *StateManager) GetAgentClient() client.ClientAPI { - return sm.agentClient -} diff --git a/plugin/api/email/common/auth.go b/plugin/api/email/common/auth.go index a5c78301..7dd69dc5 100644 --- a/plugin/api/email/common/auth.go +++ b/plugin/api/email/common/auth.go @@ -7,11 +7,11 @@ package common import ( "infini.sh/console/model" "infini.sh/framework/core/credential" - "infini.sh/framework/core/elastic" + model2 "infini.sh/framework/core/model" "infini.sh/framework/core/orm" ) -func GetBasicAuth(srv *model.EmailServer) (basicAuth elastic.BasicAuth, err error) { +func GetBasicAuth(srv *model.EmailServer) (basicAuth model2.BasicAuth, err error) { if srv.Auth != nil && srv.Auth.Username != "" { basicAuth = *srv.Auth return @@ -28,7 +28,7 @@ func GetBasicAuth(srv *model.EmailServer) (basicAuth elastic.BasicAuth, err erro if err != nil { return } - if auth, ok := dv.(elastic.BasicAuth); ok { + if auth, ok := dv.(model2.BasicAuth); ok { basicAuth = auth } } diff --git a/plugin/api/license/api.go b/plugin/api/license/api.go index fc9e9eb1..e5533514 100644 --- a/plugin/api/license/api.go +++ b/plugin/api/license/api.go @@ -35,7 +35,7 @@ func (handler *LicenseAPI) RequestTrialLicense(w http.ResponseWriter, req *http. } //TODO implement config for the api endpoint - request:=util.NewPostRequest("https://api.infini.sh/_license/request_trial", util.MustToJSONBytes(v)) + request:=util.NewPostRequest("https://api.infini.cloud/_license/request_trial", util.MustToJSONBytes(v)) response,err:=util.ExecuteRequest(request) if err!=nil{ handler.WriteError(w,err.Error(),response.StatusCode) diff --git a/plugin/setup/setup.go b/plugin/setup/setup.go index b746a653..a27cad77 100644 --- a/plugin/setup/setup.go +++ b/plugin/setup/setup.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "fmt" "infini.sh/framework/core/kv" + "infini.sh/framework/core/model" "io" "net/http" uri2 "net/url" @@ -88,7 +89,7 @@ func (module *Module) Start() error { log.Error(err) return } - if basicAuth, ok := bv.(elastic.BasicAuth); ok { + if basicAuth, ok := bv.(model.BasicAuth); ok { err = keystore.SetValue("SYSTEM_CLUSTER_PASS", []byte(basicAuth.Password)) if err != nil { log.Error(err) @@ -203,7 +204,7 @@ func (module *Module) validate(w http.ResponseWriter, r *http.Request, ps httpro } cfg1 = elastic1.ORMConfig{} exist, err := env.ParseConfig("elastic.orm", &cfg1) - if exist && err != nil { + if exist && err != nil &&global.Env().SystemConfig.Configs.PanicOnConfigError{ panic(err) } @@ -272,7 +273,7 @@ func (module *Module) initTempClient(r *http.Request) (error, elastic.API, Setup Enabled: true, Reserved: true, Endpoint: request.Cluster.Endpoint, - BasicAuth: &elastic.BasicAuth{ + BasicAuth: &model.BasicAuth{ Username: request.Cluster.Username, Password: request.Cluster.Password, },