support remote config store

This commit is contained in:
medcl 2023-10-19 21:14:49 +08:00
parent ddba189aec
commit 9bab385d8c
9 changed files with 232 additions and 264 deletions

View File

@ -1,90 +0,0 @@
elasticsearch:
- name: $[[INGEST_CLUSTER_ID]]
enabled: true
endpoints: $[[INGEST_CLUSTER_ENDPOINT]]
discovery:
enabled: false
basic_auth:
username: $[[INGEST_CLUSTER_USERNAME]]
password: $[[keystore.ingest_cluster_password]]
metrics:
enabled: true
queue: metrics
network:
enabled: true
summary: true
details: true
memory:
metrics:
- swap
- memory
disk:
metrics:
- iops
- usage
cpu:
metrics:
- idle
- system
- user
- iowait
- load
instance:
enabled: true
elastic:
availability_check:
enabled: false
pipeline:
- name: merge_logs
auto_start: true
keep_running: true
processor:
- indexing_merge:
index_name: ".infini_logs"
elasticsearch: "$[[INGEST_CLUSTER_ID]]"
input_queue: "logs"
idle_timeout_in_seconds: 10
output_queue:
name: "merged_requests"
worker_size: 1
bulk_size_in_mb: 5
- name: merge_metrics
auto_start: true
keep_running: true
processor:
- indexing_merge:
elasticsearch: "$[[INGEST_CLUSTER_ID]]"
index_name: ".infini_metrics"
input_queue: "metrics"
output_queue:
name: "merged_requests"
worker_size: 1
bulk_size_in_mb: 5
- name: ingest_merged_requests
auto_start: true
keep_running: true
processor:
- bulk_indexing:
max_worker_size: 1
bulk:
batch_size_in_mb: 5
batch_size_in_docs: 5000
max_retry_times: 0
invalid_queue: ""
response_handle:
include_index_stats: false
include_action_stats: false
output_bulk_stats: false
include_error_details: false
save_error_results: false
save_success_results: false
save_busy_results: false
consumer:
fetch_max_messages: 5
queues:
type: indexing_merge
when:
cluster_available: ["$[[INGEST_CLUSTER_ID]]"]

View File

@ -1,84 +0,0 @@
elasticsearch:
- id: $[[CLUSTER_ID]]
name: $[[CLUSTER_ID]]
enabled: true
endpoint: $[[CLUSTER_ENDPOINT]]
discovery:
enabled: false
basic_auth:
username: $[[CLUSTER_USERNAME]]
password: $[[keystore.$[[CLUSTER_ID]]_password]]
pipeline:
#clsuter level metrics
- auto_start: $[[CLUSTER_LEVEL_TASKS_ENABLED]]
enabled: $[[CLUSTER_LEVEL_TASKS_ENABLED]]
keep_running: true
singleton: true
name: collect_$[[CLUSTER_ID]]_es_cluster_stats
retry_delay_in_ms: 10000
processor:
- es_cluster_stats:
elasticsearch: $[[CLUSTER_ID]]
labels:
cluster_id: $[[CLUSTER_ID]]
when:
cluster_available: ["$[[CLUSTER_ID]]"]
- auto_start: $[[CLUSTER_LEVEL_TASKS_ENABLED]]
enabled: $[[CLUSTER_LEVEL_TASKS_ENABLED]]
keep_running: true
singleton: true
name: collect_$[[CLUSTER_ID]]_es_index_stats
retry_delay_in_ms: 10000
processor:
- es_index_stats:
elasticsearch: $[[CLUSTER_ID]]
labels:
cluster_id: $[[CLUSTER_ID]]
when:
cluster_available: ["$[[CLUSTER_ID]]"]
- auto_start: $[[CLUSTER_LEVEL_TASKS_ENABLED]]
enabled: $[[CLUSTER_LEVEL_TASKS_ENABLED]]
keep_running: true
singleton: true
name: collect_$[[CLUSTER_ID]]_es_cluster_health
retry_delay_in_ms: 10000
processor:
- es_cluster_health:
elasticsearch: $[[CLUSTER_ID]]
labels:
cluster_id: $[[CLUSTER_ID]]
when:
cluster_available: ["$[[CLUSTER_ID]]"]
#node level metrics
- auto_start: $[[NODE_LEVEL_TASKS_ENABLED]]
enabled: $[[NODE_LEVEL_TASKS_ENABLED]]
keep_running: true
name: collect_$[[CLUSTER_ID]]_es_node_stats
retry_delay_in_ms: 10000
processor:
- es_node_stats:
elasticsearch: $[[CLUSTER_ID]]
labels:
cluster_id: $[[CLUSTER_ID]]
when:
cluster_available: ["$[[CLUSTER_ID]]"]
#node logs
- auto_start: $[[NODE_LEVEL_TASKS_ENABLED]]
enabled: $[[NODE_LEVEL_TASKS_ENABLED]]
keep_running: true
name: collect_$[[CLUSTER_ID]]_es_logs
retry_delay_in_ms: 10000
processor:
- es_logs_processor:
elasticsearch: $[[CLUSTER_ID]]
labels:
cluster_id: $[[CLUSTER_ID]]
logs_path: $[[NODE_LOGS_PATH]]
queue_name: logs
when:
cluster_available: ["$[[CLUSTER_ID]]"]

View File

@ -1,17 +0,0 @@
configs.template:
- name: "cluster_a_node_b_task_config"
path: ./config/task_config.tpl
variable:
CLUSTER_ID: infini_default_ingest_cluster
CLUSTER_ENDPOINT: ["https://localhost:9200"]
CLUSTER_USERNAME: "admin"
CLUSTER_VER: "1.6.0"
CLUSTER_DISTRIBUTION: "easysearch"
INDEX_PREFIX: ".infini_"
CLUSTER_LEVEL_TASKS_ENABLED: false
NODE_LEVEL_TASKS_ENABLED: true
NODE_LOGS_PATH: "/opt/easysearch/logs/"
#MANAGED_CONFIG_VERSION: 19
#MANAGED: true

View File

@ -4,14 +4,13 @@ configs: #define configs group
- ./templates/ingest_config.tpl
- ./templates/task_config.tpl
- ./configs/ingest_config.yml
- ./configs/cluster_xx_node_xx.yml
instances: #define which config instance should fetch
infini_default_system_cluster: #instance group
plugins:
- ingest
instances:
- ck0mkk805f5virpsejp0
- ckjrpdg05f5lrfp8qlng
_all: #instance group
# plugins:
# - ingest
# instances:
# - ck0mkk805f5virpsejp0
# - ckjrpdg05f5lrfp8qlng
configs:
- general_ingest_template
secrets:
@ -22,10 +21,4 @@ secrets:
keystore:
ingest_cluster_password:
type: plaintext
value: "d7cc48e69a41dac719fb"
infini_default_ingest_cluster_password:
type: plaintext
value: "d7cc48e69a41dac719fb"
# infini_default_ingest_cluster_password:
# type: credential
# value: "ckghspo05f5q7pr20ct0" #credential_id
value: "d7cc48e69a41dac719fb"

View File

@ -1,3 +1,6 @@
env:
CLUSTER_PASSWORD: $[[keystore.$[[CLUSTER_ID]]_password]]
elasticsearch:
- id: $[[CLUSTER_ID]]
name: $[[CLUSTER_ID]]
@ -7,7 +10,7 @@ elasticsearch:
enabled: false
basic_auth:
username: $[[CLUSTER_USERNAME]]
password: $[[keystore.$[[CLUSTER_ID]]_password]]
password: $[[CLUSTER_PASSWORD]]
pipeline:
#clsuter level metrics

View File

@ -129,7 +129,6 @@ func main() {
orm.RegisterSchemaWithIndexName(elastic.View{}, "view")
orm.RegisterSchemaWithIndexName(elastic.CommonCommand{}, "commands")
//orm.RegisterSchema(elastic.TraceTemplate{}, "trace-template")
//orm.RegisterSchema(model.Instance{}, "instance")
orm.RegisterSchemaWithIndexName(alerting.Rule{}, "alert-rule")
orm.RegisterSchemaWithIndexName(alerting.Alert{}, "alert-history")
orm.RegisterSchemaWithIndexName(alerting.AlertMessage{}, "alert-message")
@ -142,6 +141,7 @@ func main() {
orm.RegisterSchemaWithIndexName(model.Notification{}, "notification")
orm.RegisterSchemaWithIndexName(model.EmailServer{}, "email-server")
orm.RegisterSchemaWithIndexName(model2.Instance{}, "instance")
orm.RegisterSchemaWithIndexName(api3.RemoteConfig{}, "configs")
api.RegisterSchema()

View File

@ -5,6 +5,7 @@
package api
import (
"bytes"
"context"
"fmt"
log "github.com/cihub/seelog"
@ -13,12 +14,70 @@ import (
"infini.sh/framework/core/model"
"infini.sh/framework/core/orm"
"infini.sh/framework/core/util"
common2 "infini.sh/framework/modules/elastic/common"
"infini.sh/framework/plugins/managed/common"
"net/http"
"time"
)
//node -> binding item
func GetEnrolledNodesByAgent(instance *model.Instance) (map[string]BindingItem, error) {
//get nodes settings where agent id = instance id
q := orm.Query{
Size: 1000,
Conds: orm.And(orm.Eq("metadata.category", "node_settings"),
orm.Eq("metadata.name", "agent"),
orm.Eq("metadata.labels.agent_id", instance.ID),
),
}
err, result := orm.Search(model.Setting{}, &q)
if err != nil {
return nil, err
}
ids := map[string]BindingItem{}
for _, row := range result.Result {
v, ok := row.(map[string]interface{})
if ok {
x, ok := v["payload"]
if ok {
f, ok := x.(map[string]interface{})
if ok {
nodeID, ok := f["node_uuid"].(string)
if ok {
item := BindingItem{}
item.ClusterID = util.ToString(f["cluster_id"])
item.ClusterName = util.ToString(f["cluster_name"])
item.ClusterUUID = util.ToString(f["cluster_uuid"])
item.PublishAddress = util.ToString(f["publish_address"])
item.NodeName = util.ToString(f["node_name"])
item.PathHome = util.ToString(f["path_home"])
item.PathLogs = util.ToString(f["path_logs"])
item.NodeUUID = nodeID
t, ok := v["updated"]
if ok {
layout := "2006-01-02T15:04:05.999999-07:00"
t1, err := time.Parse(layout, util.ToString(t))
if err == nil {
item.Updated = t1.Unix()
}
}
ids[item.NodeUUID] = item
}
}
}
}
}
return ids, nil
}
func refreshNodesInfo(inst *model.Instance) (*elastic.DiscoveryResult, error) {
enrolledNodesByAgent, err := getEnrolledNodesByAgent(inst)
enrolledNodesByAgent, err := GetEnrolledNodesByAgent(inst)
if err != nil {
return nil, fmt.Errorf("error on get binding nodes info: %w", err)
}
@ -30,13 +89,14 @@ func refreshNodesInfo(inst *model.Instance) (*elastic.DiscoveryResult, error) {
//TODO return already biding nodes info ??
return nil, fmt.Errorf("error on get nodes info from agent: %w", err)
}
for nodeID, node := range nodesInfo.Nodes {
v, ok := enrolledNodesByAgent[nodeID]
if ok {
node.ClusterID = v.ClusterID
node.Enrolled = true
}
v, ok := enrolledNodesByAgent[nodeID]
if ok {
node.ClusterID = v.ClusterID
node.Enrolled = true
}
}
////not recognized by agent, need auth?
@ -75,6 +135,149 @@ func refreshNodesInfo(inst *model.Instance) (*elastic.DiscoveryResult, error) {
return nodesInfo, nil
}
type RemoteConfig struct {
orm.ORMObjectBase
Metadata model.Metadata `json:"metadata" elastic_mapping:"metadata: { type: object }"`
Payload common.ConfigFile `json:"payload" elastic_mapping:"payload: { type: object}"`
}
func remoteConfigProvider(instance model.Instance) []*common.ConfigFile {
//fetch configs from remote db
//fetch configs assigned to (instance=_all OR instance=$instance_id ) AND application.name=$application.name
q := orm.Query{
Size: 1000,
Conds: orm.And(orm.Eq("metadata.category", "app_settings"),
orm.Eq("metadata.name", instance.Application.Name),
orm.Eq("metadata.labels.instance", "_all"),
),
}
err, searchResult := orm.Search(RemoteConfig{}, &q)
if err != nil {
panic(err)
}
result := []*common.ConfigFile{}
for _, row := range searchResult.Result {
v, ok := row.(map[string]interface{})
if ok {
x, ok := v["payload"]
if ok {
f, ok := x.(map[string]interface{})
if ok {
name, ok := f["name"].(string)
if ok {
item := common.ConfigFile{}
item.Name = util.ToString(name)
item.Location = util.ToString(f["location"])
item.Content = util.ToString(f["content"])
item.Version,_ = util.ToInt64(util.ToString(f["version"]))
item.Size = int64(len(item.Content))
item.Managed = true
t, ok := v["updated"]
if ok {
layout := "2006-01-02T15:04:05.999999-07:00"
t1, err := time.Parse(layout, util.ToString(t))
if err == nil {
item.Updated = t1.Unix()
}
}
result=append(result,&item)
}
}
}
}
}
log.Error("remoteConfigProvider", "result", result)
return result
}
func dynamicAgentConfigProvider(instance model.Instance) []*common.ConfigFile {
//get config files from remote db
//get settings with this agent id
result := []*common.ConfigFile{}
ids, err := GetEnrolledNodesByAgent(&instance)
if err != nil {
panic(err)
}
var latestTimestamp int64
for _, v := range ids {
if v.Updated > latestTimestamp {
latestTimestamp = v.Updated
}
}
if len(ids) > 0 {
cfg := common.ConfigFile{}
cfg.Name = "generated_metrics_tasks.yml"
cfg.Location = "generated_metrics_tasks.yml"
cfg.Content = getConfigs(ids)
cfg.Size = int64(len(cfg.Content))
cfg.Version = latestTimestamp
cfg.Managed = true
cfg.Updated = latestTimestamp
result = append(result, &cfg)
}
return result
}
func getConfigs(items map[string]BindingItem) string {
buffer := bytes.NewBuffer([]byte("configs.template:\n "))
for _, v := range items {
if v.ClusterID == "" {
panic("cluster id is empty")
}
metadata := elastic.GetMetadata(v.ClusterID)
var clusterLevelEnabled = false
var nodeLevelEnabled = true
var clusterEndPoint = metadata.Config.GetAnyEndpoint()
credential, err := common2.GetCredential(metadata.Config.CredentialID)
if err != nil {
panic(err)
}
var dv interface{}
dv, err = credential.Decode()
if err != nil {
panic(err)
}
var username = ""
var password = ""
if auth, ok := dv.(model.BasicAuth); ok {
username = auth.Username
password = auth.Password
}
buffer.Write([]byte(fmt.Sprintf("- name: \"%v\"\n path: ./config/task_config.tpl\n "+
"variable:\n "+
"CLUSTER_ID: %v\n "+
"CLUSTER_ENDPOINT: [\"%v\"]\n "+
"CLUSTER_USERNAME: \"%v\"\n "+
"CLUSTER_PASSWORD: \"%v\"\n "+
"CLUSTER_LEVEL_TASKS_ENABLED: %v\n "+
"NODE_LEVEL_TASKS_ENABLED: %v\n "+
"NODE_LOGS_PATH: \"%v\"\n\n\n"+
"#MANAGED_CONFIG_VERSION: %v\n"+
"#MANAGED: true",
v.NodeUUID, v.ClusterID, clusterEndPoint, username, password, clusterLevelEnabled, nodeLevelEnabled, v.PathLogs, v.Updated)))
}
//password: $[[keystore.$[[CLUSTER_ID]]_password]]
return buffer.String()
}
//get nodes info via agent
func GetElasticsearchNodesViaAgent(ctx context.Context, instance *model.Instance) (*elastic.DiscoveryResult, error) {
req := &util.Request{
@ -134,52 +337,7 @@ type BindingItem struct {
//infini system assigned id
ClusterID string `json:"cluster_id"`
}
//node -> binding item
func getEnrolledNodesByAgent(instance *model.Instance) (map[string]BindingItem, error) {
//get nodes settings where agent id = instance id
q := orm.Query{
Size: 1000,
Conds: orm.And(orm.Eq("metadata.category", "node_settings"),
orm.Eq("metadata.name", "agent"),
orm.Eq("metadata.labels.agent_id", instance.ID),
),
}
err, result := orm.Search(model.Setting{}, &q)
if err != nil {
return nil, err
}
ids := map[string]BindingItem{}
for _, row := range result.Result {
v, ok := row.(map[string]interface{})
if ok {
x, ok := v["payload"]
if ok {
f, ok := x.(map[string]interface{})
if ok {
nodeID, ok := f["node_uuid"].(string)
if ok {
item := BindingItem{}
item.ClusterID = util.ToString(f["cluster_id"])
item.ClusterName = util.ToString(f["cluster_name"])
item.ClusterUUID = util.ToString(f["cluster_uuid"])
item.PublishAddress = util.ToString(f["publish_address"])
item.NodeName = util.ToString(f["node_name"])
item.PathHome = util.ToString(f["path_home"])
item.PathLogs = util.ToString(f["path_logs"])
item.NodeUUID = nodeID
ids[item.NodeUUID] = item
}
}
}
}
}
return ids, nil
Updated int64 `json:"updated"`
}
func getUnAssociateNodes() (map[string][]model.ESNodeInfo, error) {

View File

@ -7,6 +7,7 @@ package api
import (
"infini.sh/framework/core/api"
"infini.sh/framework/core/api/rbac/enum"
"infini.sh/framework/plugins/managed/server"
)
func Init() {
@ -32,4 +33,8 @@ func Init() {
//api.HandleAPIMethod(api.POST, "/agent/install_command", handler.RequireLogin(handler.generateInstallCommand))
//api.HandleAPIMethod(api.GET, "/agent/install.sh", handler.getInstallScript)
server.RegisterConfigProvider(remoteConfigProvider)
server.RegisterConfigProvider(dynamicAgentConfigProvider)
}

View File

@ -239,7 +239,7 @@ func (h *APIHandler) authESNode(w http.ResponseWriter, req *http.Request, ps htt
func NewClusterSettings(clusterID string) *model.Setting {
settings := model.Setting{
Metadata: model.SettingsMetadata{
Metadata: model.Metadata{
Category: Cluster,
},
}
@ -255,7 +255,7 @@ func NewClusterSettings(clusterID string) *model.Setting {
func NewNodeAgentSettings(instanceID string, item *BindingItem) *model.Setting {
settings := model.Setting{
Metadata: model.SettingsMetadata{
Metadata: model.Metadata{
Category: Node,
Name: "agent",
},
@ -283,7 +283,7 @@ func NewNodeAgentSettings(instanceID string, item *BindingItem) *model.Setting {
func NewIndexSettings(clusterID, nodeID, agentID, indexName, indexID string) *model.Setting {
settings := model.Setting{
Metadata: model.SettingsMetadata{
Metadata: model.Metadata{
Category: Index,
},
}
@ -469,7 +469,7 @@ func (h *APIHandler) autoAssociateESNode(w http.ResponseWriter, req *http.Reques
taskSetting.ClusterStats = nil
}
setting = &model.Setting{
Metadata: model.SettingsMetadata{
Metadata: model.Metadata{
Category: "agent",
Name: "task",
Labels: util.MapStr{
@ -623,7 +623,7 @@ func getAgentTaskSetting(agentID string, v model.ESNodeInfo) (*model.Setting, er
LogsPath: v.Path.Logs,
}
return &model.Setting{
Metadata: model.SettingsMetadata{
Metadata: model.Metadata{
Category: "agent",
Name: "task",
Labels: util.MapStr{