diff --git a/config/ingest_config.tpl b/config/ingest_config.tpl deleted file mode 100644 index d93dffa9..00000000 --- a/config/ingest_config.tpl +++ /dev/null @@ -1,90 +0,0 @@ -elasticsearch: - - name: $[[INGEST_CLUSTER_ID]] - enabled: true - endpoints: $[[INGEST_CLUSTER_ENDPOINT]] - discovery: - enabled: false - basic_auth: - username: $[[INGEST_CLUSTER_USERNAME]] - password: $[[keystore.ingest_cluster_password]] - -metrics: - enabled: true - queue: metrics - network: - enabled: true - summary: true - details: true - memory: - metrics: - - swap - - memory - disk: - metrics: - - iops - - usage - cpu: - metrics: - - idle - - system - - user - - iowait - - load - instance: - enabled: true - -elastic: - availability_check: - enabled: false - -pipeline: - - name: merge_logs - auto_start: true - keep_running: true - processor: - - indexing_merge: - index_name: ".infini_logs" - elasticsearch: "$[[INGEST_CLUSTER_ID]]" - input_queue: "logs" - idle_timeout_in_seconds: 10 - output_queue: - name: "merged_requests" - worker_size: 1 - bulk_size_in_mb: 5 - - name: merge_metrics - auto_start: true - keep_running: true - processor: - - indexing_merge: - elasticsearch: "$[[INGEST_CLUSTER_ID]]" - index_name: ".infini_metrics" - input_queue: "metrics" - output_queue: - name: "merged_requests" - worker_size: 1 - bulk_size_in_mb: 5 - - name: ingest_merged_requests - auto_start: true - keep_running: true - processor: - - bulk_indexing: - max_worker_size: 1 - bulk: - batch_size_in_mb: 5 - batch_size_in_docs: 5000 - max_retry_times: 0 - invalid_queue: "" - response_handle: - include_index_stats: false - include_action_stats: false - output_bulk_stats: false - include_error_details: false - save_error_results: false - save_success_results: false - save_busy_results: false - consumer: - fetch_max_messages: 5 - queues: - type: indexing_merge - when: - cluster_available: ["$[[INGEST_CLUSTER_ID]]"] \ No newline at end of file diff --git a/config/task_config.tpl b/config/task_config.tpl deleted file mode 100644 index c5abc715..00000000 --- a/config/task_config.tpl +++ /dev/null @@ -1,84 +0,0 @@ -elasticsearch: - - id: $[[CLUSTER_ID]] - name: $[[CLUSTER_ID]] - enabled: true - endpoint: $[[CLUSTER_ENDPOINT]] - discovery: - enabled: false - basic_auth: - username: $[[CLUSTER_USERNAME]] - password: $[[keystore.$[[CLUSTER_ID]]_password]] - -pipeline: -#clsuter level metrics -- auto_start: $[[CLUSTER_LEVEL_TASKS_ENABLED]] - enabled: $[[CLUSTER_LEVEL_TASKS_ENABLED]] - keep_running: true - singleton: true - name: collect_$[[CLUSTER_ID]]_es_cluster_stats - retry_delay_in_ms: 10000 - processor: - - es_cluster_stats: - elasticsearch: $[[CLUSTER_ID]] - labels: - cluster_id: $[[CLUSTER_ID]] - when: - cluster_available: ["$[[CLUSTER_ID]]"] - -- auto_start: $[[CLUSTER_LEVEL_TASKS_ENABLED]] - enabled: $[[CLUSTER_LEVEL_TASKS_ENABLED]] - keep_running: true - singleton: true - name: collect_$[[CLUSTER_ID]]_es_index_stats - retry_delay_in_ms: 10000 - processor: - - es_index_stats: - elasticsearch: $[[CLUSTER_ID]] - labels: - cluster_id: $[[CLUSTER_ID]] - when: - cluster_available: ["$[[CLUSTER_ID]]"] - -- auto_start: $[[CLUSTER_LEVEL_TASKS_ENABLED]] - enabled: $[[CLUSTER_LEVEL_TASKS_ENABLED]] - keep_running: true - singleton: true - name: collect_$[[CLUSTER_ID]]_es_cluster_health - retry_delay_in_ms: 10000 - processor: - - es_cluster_health: - elasticsearch: $[[CLUSTER_ID]] - labels: - cluster_id: $[[CLUSTER_ID]] - when: - cluster_available: ["$[[CLUSTER_ID]]"] - -#node level metrics -- auto_start: $[[NODE_LEVEL_TASKS_ENABLED]] - enabled: $[[NODE_LEVEL_TASKS_ENABLED]] - keep_running: true - name: collect_$[[CLUSTER_ID]]_es_node_stats - retry_delay_in_ms: 10000 - processor: - - es_node_stats: - elasticsearch: $[[CLUSTER_ID]] - labels: - cluster_id: $[[CLUSTER_ID]] - when: - cluster_available: ["$[[CLUSTER_ID]]"] - -#node logs -- auto_start: $[[NODE_LEVEL_TASKS_ENABLED]] - enabled: $[[NODE_LEVEL_TASKS_ENABLED]] - keep_running: true - name: collect_$[[CLUSTER_ID]]_es_logs - retry_delay_in_ms: 10000 - processor: - - es_logs_processor: - elasticsearch: $[[CLUSTER_ID]] - labels: - cluster_id: $[[CLUSTER_ID]] - logs_path: $[[NODE_LOGS_PATH]] - queue_name: logs - when: - cluster_available: ["$[[CLUSTER_ID]]"] diff --git a/config_repo/configs/cluster_xx_node_xx.yml b/config_repo/configs/cluster_xx_node_xx.yml deleted file mode 100644 index d67e7cc6..00000000 --- a/config_repo/configs/cluster_xx_node_xx.yml +++ /dev/null @@ -1,17 +0,0 @@ -configs.template: - - name: "cluster_a_node_b_task_config" - path: ./config/task_config.tpl - variable: - CLUSTER_ID: infini_default_ingest_cluster - CLUSTER_ENDPOINT: ["https://localhost:9200"] - CLUSTER_USERNAME: "admin" - CLUSTER_VER: "1.6.0" - CLUSTER_DISTRIBUTION: "easysearch" - INDEX_PREFIX: ".infini_" - CLUSTER_LEVEL_TASKS_ENABLED: false - NODE_LEVEL_TASKS_ENABLED: true - NODE_LOGS_PATH: "/opt/easysearch/logs/" - - -#MANAGED_CONFIG_VERSION: 19 -#MANAGED: true \ No newline at end of file diff --git a/config_repo/settings.yml b/config_repo/settings.yml index 41d42024..722d8df2 100644 --- a/config_repo/settings.yml +++ b/config_repo/settings.yml @@ -4,14 +4,13 @@ configs: #define configs group - ./templates/ingest_config.tpl - ./templates/task_config.tpl - ./configs/ingest_config.yml - - ./configs/cluster_xx_node_xx.yml instances: #define which config instance should fetch - infini_default_system_cluster: #instance group - plugins: - - ingest - instances: - - ck0mkk805f5virpsejp0 - - ckjrpdg05f5lrfp8qlng + _all: #instance group +# plugins: +# - ingest +# instances: +# - ck0mkk805f5virpsejp0 +# - ckjrpdg05f5lrfp8qlng configs: - general_ingest_template secrets: @@ -22,10 +21,4 @@ secrets: keystore: ingest_cluster_password: type: plaintext - value: "d7cc48e69a41dac719fb" - infini_default_ingest_cluster_password: - type: plaintext - value: "d7cc48e69a41dac719fb" -# infini_default_ingest_cluster_password: -# type: credential -# value: "ckghspo05f5q7pr20ct0" #credential_id \ No newline at end of file + value: "d7cc48e69a41dac719fb" \ No newline at end of file diff --git a/config_repo/templates/task_config.tpl b/config_repo/templates/task_config.tpl index 119d2fc8..7d30522e 100644 --- a/config_repo/templates/task_config.tpl +++ b/config_repo/templates/task_config.tpl @@ -1,3 +1,6 @@ +env: + CLUSTER_PASSWORD: $[[keystore.$[[CLUSTER_ID]]_password]] + elasticsearch: - id: $[[CLUSTER_ID]] name: $[[CLUSTER_ID]] @@ -7,7 +10,7 @@ elasticsearch: enabled: false basic_auth: username: $[[CLUSTER_USERNAME]] - password: $[[keystore.$[[CLUSTER_ID]]_password]] + password: $[[CLUSTER_PASSWORD]] pipeline: #clsuter level metrics diff --git a/main.go b/main.go index 095ffbba..63e76815 100644 --- a/main.go +++ b/main.go @@ -129,7 +129,6 @@ func main() { orm.RegisterSchemaWithIndexName(elastic.View{}, "view") orm.RegisterSchemaWithIndexName(elastic.CommonCommand{}, "commands") //orm.RegisterSchema(elastic.TraceTemplate{}, "trace-template") - //orm.RegisterSchema(model.Instance{}, "instance") orm.RegisterSchemaWithIndexName(alerting.Rule{}, "alert-rule") orm.RegisterSchemaWithIndexName(alerting.Alert{}, "alert-history") orm.RegisterSchemaWithIndexName(alerting.AlertMessage{}, "alert-message") @@ -142,6 +141,7 @@ func main() { orm.RegisterSchemaWithIndexName(model.Notification{}, "notification") orm.RegisterSchemaWithIndexName(model.EmailServer{}, "email-server") orm.RegisterSchemaWithIndexName(model2.Instance{}, "instance") + orm.RegisterSchemaWithIndexName(api3.RemoteConfig{}, "configs") api.RegisterSchema() diff --git a/modules/agent/api/elasticsearch.go b/modules/agent/api/elasticsearch.go index c03986ae..2b54c739 100644 --- a/modules/agent/api/elasticsearch.go +++ b/modules/agent/api/elasticsearch.go @@ -5,6 +5,7 @@ package api import ( + "bytes" "context" "fmt" log "github.com/cihub/seelog" @@ -13,12 +14,70 @@ import ( "infini.sh/framework/core/model" "infini.sh/framework/core/orm" "infini.sh/framework/core/util" + common2 "infini.sh/framework/modules/elastic/common" + "infini.sh/framework/plugins/managed/common" "net/http" "time" ) +//node -> binding item +func GetEnrolledNodesByAgent(instance *model.Instance) (map[string]BindingItem, error) { + + //get nodes settings where agent id = instance id + q := orm.Query{ + Size: 1000, + Conds: orm.And(orm.Eq("metadata.category", "node_settings"), + orm.Eq("metadata.name", "agent"), + orm.Eq("metadata.labels.agent_id", instance.ID), + ), + } + + err, result := orm.Search(model.Setting{}, &q) + + if err != nil { + return nil, err + } + + ids := map[string]BindingItem{} + for _, row := range result.Result { + v, ok := row.(map[string]interface{}) + if ok { + x, ok := v["payload"] + if ok { + f, ok := x.(map[string]interface{}) + if ok { + nodeID, ok := f["node_uuid"].(string) + if ok { + item := BindingItem{} + item.ClusterID = util.ToString(f["cluster_id"]) + item.ClusterName = util.ToString(f["cluster_name"]) + item.ClusterUUID = util.ToString(f["cluster_uuid"]) + item.PublishAddress = util.ToString(f["publish_address"]) + item.NodeName = util.ToString(f["node_name"]) + item.PathHome = util.ToString(f["path_home"]) + item.PathLogs = util.ToString(f["path_logs"]) + item.NodeUUID = nodeID + + t, ok := v["updated"] + if ok { + layout := "2006-01-02T15:04:05.999999-07:00" + t1, err := time.Parse(layout, util.ToString(t)) + if err == nil { + item.Updated = t1.Unix() + } + } + ids[item.NodeUUID] = item + + } + } + } + } + } + return ids, nil +} + func refreshNodesInfo(inst *model.Instance) (*elastic.DiscoveryResult, error) { - enrolledNodesByAgent, err := getEnrolledNodesByAgent(inst) + enrolledNodesByAgent, err := GetEnrolledNodesByAgent(inst) if err != nil { return nil, fmt.Errorf("error on get binding nodes info: %w", err) } @@ -30,13 +89,14 @@ func refreshNodesInfo(inst *model.Instance) (*elastic.DiscoveryResult, error) { //TODO return already biding nodes info ?? return nil, fmt.Errorf("error on get nodes info from agent: %w", err) } + for nodeID, node := range nodesInfo.Nodes { - v, ok := enrolledNodesByAgent[nodeID] - if ok { - node.ClusterID = v.ClusterID - node.Enrolled = true - } + v, ok := enrolledNodesByAgent[nodeID] + if ok { + node.ClusterID = v.ClusterID + node.Enrolled = true + } } ////not recognized by agent, need auth? @@ -75,6 +135,149 @@ func refreshNodesInfo(inst *model.Instance) (*elastic.DiscoveryResult, error) { return nodesInfo, nil } +type RemoteConfig struct { + orm.ORMObjectBase + Metadata model.Metadata `json:"metadata" elastic_mapping:"metadata: { type: object }"` + Payload common.ConfigFile `json:"payload" elastic_mapping:"payload: { type: object}"` +} + +func remoteConfigProvider(instance model.Instance) []*common.ConfigFile { + + //fetch configs from remote db + //fetch configs assigned to (instance=_all OR instance=$instance_id ) AND application.name=$application.name + + q := orm.Query{ + Size: 1000, + Conds: orm.And(orm.Eq("metadata.category", "app_settings"), + orm.Eq("metadata.name", instance.Application.Name), + orm.Eq("metadata.labels.instance", "_all"), + ), + } + + err, searchResult := orm.Search(RemoteConfig{}, &q) + if err != nil { + panic(err) + } + + result := []*common.ConfigFile{} + + for _, row := range searchResult.Result { + v, ok := row.(map[string]interface{}) + if ok { + x, ok := v["payload"] + if ok { + f, ok := x.(map[string]interface{}) + if ok { + name, ok := f["name"].(string) + if ok { + item := common.ConfigFile{} + item.Name = util.ToString(name) + item.Location = util.ToString(f["location"]) + item.Content = util.ToString(f["content"]) + item.Version,_ = util.ToInt64(util.ToString(f["version"])) + item.Size = int64(len(item.Content)) + item.Managed = true + t, ok := v["updated"] + if ok { + layout := "2006-01-02T15:04:05.999999-07:00" + t1, err := time.Parse(layout, util.ToString(t)) + if err == nil { + item.Updated = t1.Unix() + } + } + result=append(result,&item) + } + } + } + } + } + + log.Error("remoteConfigProvider", "result", result) + + return result +} + +func dynamicAgentConfigProvider(instance model.Instance) []*common.ConfigFile { + + //get config files from remote db + //get settings with this agent id + result := []*common.ConfigFile{} + ids, err := GetEnrolledNodesByAgent(&instance) + if err != nil { + panic(err) + } + + var latestTimestamp int64 + for _, v := range ids { + if v.Updated > latestTimestamp { + latestTimestamp = v.Updated + } + } + + if len(ids) > 0 { + + cfg := common.ConfigFile{} + cfg.Name = "generated_metrics_tasks.yml" + cfg.Location = "generated_metrics_tasks.yml" + cfg.Content = getConfigs(ids) + cfg.Size = int64(len(cfg.Content)) + cfg.Version = latestTimestamp + cfg.Managed = true + cfg.Updated = latestTimestamp + result = append(result, &cfg) + } + + return result +} + +func getConfigs(items map[string]BindingItem) string { + buffer := bytes.NewBuffer([]byte("configs.template:\n ")) + + for _, v := range items { + + if v.ClusterID == "" { + panic("cluster id is empty") + } + metadata := elastic.GetMetadata(v.ClusterID) + var clusterLevelEnabled = false + var nodeLevelEnabled = true + var clusterEndPoint = metadata.Config.GetAnyEndpoint() + credential, err := common2.GetCredential(metadata.Config.CredentialID) + if err != nil { + panic(err) + } + var dv interface{} + dv, err = credential.Decode() + if err != nil { + panic(err) + } + var username = "" + var password = "" + + if auth, ok := dv.(model.BasicAuth); ok { + username = auth.Username + password = auth.Password + } + + buffer.Write([]byte(fmt.Sprintf("- name: \"%v\"\n path: ./config/task_config.tpl\n "+ + "variable:\n "+ + "CLUSTER_ID: %v\n "+ + "CLUSTER_ENDPOINT: [\"%v\"]\n "+ + "CLUSTER_USERNAME: \"%v\"\n "+ + "CLUSTER_PASSWORD: \"%v\"\n "+ + "CLUSTER_LEVEL_TASKS_ENABLED: %v\n "+ + "NODE_LEVEL_TASKS_ENABLED: %v\n "+ + "NODE_LOGS_PATH: \"%v\"\n\n\n"+ + "#MANAGED_CONFIG_VERSION: %v\n"+ + "#MANAGED: true", + v.NodeUUID, v.ClusterID, clusterEndPoint, username, password, clusterLevelEnabled, nodeLevelEnabled, v.PathLogs, v.Updated))) + } + + //password: $[[keystore.$[[CLUSTER_ID]]_password]] + + return buffer.String() +} + //get nodes info via agent func GetElasticsearchNodesViaAgent(ctx context.Context, instance *model.Instance) (*elastic.DiscoveryResult, error) { req := &util.Request{ @@ -134,52 +337,7 @@ type BindingItem struct { //infini system assigned id ClusterID string `json:"cluster_id"` -} - -//node -> binding item -func getEnrolledNodesByAgent(instance *model.Instance) (map[string]BindingItem, error) { - - //get nodes settings where agent id = instance id - q := orm.Query{ - Size: 1000, - Conds: orm.And(orm.Eq("metadata.category", "node_settings"), - orm.Eq("metadata.name", "agent"), - orm.Eq("metadata.labels.agent_id", instance.ID), - ), - } - - err, result := orm.Search(model.Setting{}, &q) - - if err != nil { - return nil, err - } - - ids := map[string]BindingItem{} - for _, row := range result.Result { - v, ok := row.(map[string]interface{}) - if ok { - x, ok := v["payload"] - if ok { - f, ok := x.(map[string]interface{}) - if ok { - nodeID, ok := f["node_uuid"].(string) - if ok { - item := BindingItem{} - item.ClusterID = util.ToString(f["cluster_id"]) - item.ClusterName = util.ToString(f["cluster_name"]) - item.ClusterUUID = util.ToString(f["cluster_uuid"]) - item.PublishAddress = util.ToString(f["publish_address"]) - item.NodeName = util.ToString(f["node_name"]) - item.PathHome = util.ToString(f["path_home"]) - item.PathLogs = util.ToString(f["path_logs"]) - item.NodeUUID = nodeID - ids[item.NodeUUID] = item - } - } - } - } - } - return ids, nil + Updated int64 `json:"updated"` } func getUnAssociateNodes() (map[string][]model.ESNodeInfo, error) { diff --git a/modules/agent/api/init.go b/modules/agent/api/init.go index 9dc23296..d863e023 100644 --- a/modules/agent/api/init.go +++ b/modules/agent/api/init.go @@ -7,6 +7,7 @@ package api import ( "infini.sh/framework/core/api" "infini.sh/framework/core/api/rbac/enum" + "infini.sh/framework/plugins/managed/server" ) func Init() { @@ -32,4 +33,8 @@ func Init() { //api.HandleAPIMethod(api.POST, "/agent/install_command", handler.RequireLogin(handler.generateInstallCommand)) //api.HandleAPIMethod(api.GET, "/agent/install.sh", handler.getInstallScript) + + + server.RegisterConfigProvider(remoteConfigProvider) + server.RegisterConfigProvider(dynamicAgentConfigProvider) } diff --git a/modules/agent/api/tod.go b/modules/agent/api/tod.go index 0a457bbc..da6c172f 100644 --- a/modules/agent/api/tod.go +++ b/modules/agent/api/tod.go @@ -239,7 +239,7 @@ func (h *APIHandler) authESNode(w http.ResponseWriter, req *http.Request, ps htt func NewClusterSettings(clusterID string) *model.Setting { settings := model.Setting{ - Metadata: model.SettingsMetadata{ + Metadata: model.Metadata{ Category: Cluster, }, } @@ -255,7 +255,7 @@ func NewClusterSettings(clusterID string) *model.Setting { func NewNodeAgentSettings(instanceID string, item *BindingItem) *model.Setting { settings := model.Setting{ - Metadata: model.SettingsMetadata{ + Metadata: model.Metadata{ Category: Node, Name: "agent", }, @@ -283,7 +283,7 @@ func NewNodeAgentSettings(instanceID string, item *BindingItem) *model.Setting { func NewIndexSettings(clusterID, nodeID, agentID, indexName, indexID string) *model.Setting { settings := model.Setting{ - Metadata: model.SettingsMetadata{ + Metadata: model.Metadata{ Category: Index, }, } @@ -469,7 +469,7 @@ func (h *APIHandler) autoAssociateESNode(w http.ResponseWriter, req *http.Reques taskSetting.ClusterStats = nil } setting = &model.Setting{ - Metadata: model.SettingsMetadata{ + Metadata: model.Metadata{ Category: "agent", Name: "task", Labels: util.MapStr{ @@ -623,7 +623,7 @@ func getAgentTaskSetting(agentID string, v model.ESNodeInfo) (*model.Setting, er LogsPath: v.Path.Logs, } return &model.Setting{ - Metadata: model.SettingsMetadata{ + Metadata: model.Metadata{ Category: "agent", Name: "task", Labels: util.MapStr{