diff --git a/config/ingest_config.tpl b/config/ingest_config.tpl new file mode 100644 index 00000000..d93dffa9 --- /dev/null +++ b/config/ingest_config.tpl @@ -0,0 +1,90 @@ +elasticsearch: + - name: $[[INGEST_CLUSTER_ID]] + enabled: true + endpoints: $[[INGEST_CLUSTER_ENDPOINT]] + discovery: + enabled: false + basic_auth: + username: $[[INGEST_CLUSTER_USERNAME]] + password: $[[keystore.ingest_cluster_password]] + +metrics: + enabled: true + queue: metrics + network: + enabled: true + summary: true + details: true + memory: + metrics: + - swap + - memory + disk: + metrics: + - iops + - usage + cpu: + metrics: + - idle + - system + - user + - iowait + - load + instance: + enabled: true + +elastic: + availability_check: + enabled: false + +pipeline: + - name: merge_logs + auto_start: true + keep_running: true + processor: + - indexing_merge: + index_name: ".infini_logs" + elasticsearch: "$[[INGEST_CLUSTER_ID]]" + input_queue: "logs" + idle_timeout_in_seconds: 10 + output_queue: + name: "merged_requests" + worker_size: 1 + bulk_size_in_mb: 5 + - name: merge_metrics + auto_start: true + keep_running: true + processor: + - indexing_merge: + elasticsearch: "$[[INGEST_CLUSTER_ID]]" + index_name: ".infini_metrics" + input_queue: "metrics" + output_queue: + name: "merged_requests" + worker_size: 1 + bulk_size_in_mb: 5 + - name: ingest_merged_requests + auto_start: true + keep_running: true + processor: + - bulk_indexing: + max_worker_size: 1 + bulk: + batch_size_in_mb: 5 + batch_size_in_docs: 5000 + max_retry_times: 0 + invalid_queue: "" + response_handle: + include_index_stats: false + include_action_stats: false + output_bulk_stats: false + include_error_details: false + save_error_results: false + save_success_results: false + save_busy_results: false + consumer: + fetch_max_messages: 5 + queues: + type: indexing_merge + when: + cluster_available: ["$[[INGEST_CLUSTER_ID]]"] \ No newline at end of file diff --git a/config/install_agent.tpl b/config/install_agent.tpl index d7174559..e764d685 100644 --- a/config/install_agent.tpl +++ b/config/install_agent.tpl @@ -220,6 +220,23 @@ path.data: data path.logs: log path.configs: config +resource_limit.cpu.max_num_of_cpus: 1 +resource_limit.memory.max_in_bytes: 533708800 + +stats: + include_storage_stats_in_api: false + +disk_queue: + max_msg_size: 20485760 + max_bytes_per_file: 20485760 + max_used_bytes: 524288000 + retention.max_num_of_local_files: 1 + compress: + idle_threshold: 0 + num_of_files_decompress_ahead: 0 + segment: + enabled: true + api: enabled: true tls: @@ -232,11 +249,12 @@ api: binding: \$[[env.API_BINDING]] badger: + value_threshold: 1024 + mem_table_size: 1048576 value_log_max_entries: 1000000 value_log_file_size: 104857600 - value_threshold: 1024 -agent: +node: major_ip_pattern: ".*" EOF } diff --git a/config/system_config.tpl b/config/system_config.tpl index 23c4c338..d78fdde6 100644 --- a/config/system_config.tpl +++ b/config/system_config.tpl @@ -7,7 +7,7 @@ elasticsearch: enabled: true monitored: true reserved: true - endpoint: $[[CLUSTER_ENDPINT]] + endpoint: $[[CLUSTER_ENDPOINT]] discovery: enabled: false basic_auth: @@ -17,7 +17,7 @@ elasticsearch: elastic.elasticsearch: $[[CLUSTER_ID]] pipeline: - - name: indexing_merge + - name: merge_metrics auto_start: true keep_running: true processor: @@ -31,22 +31,7 @@ pipeline: tag: "metrics" worker_size: 1 bulk_size_in_mb: 5 - - name: consume-metrics_requests - auto_start: true - keep_running: true - processor: - - bulk_indexing: - bulk: - compress: true - batch_size_in_mb: 5 - batch_size_in_docs: 5000 - consumer: - fetch_max_messages: 100 - queues: - type: indexing_merge - tag: "metrics" - when: - cluster_available: ["$[[CLUSTER_ID]]"] + - name: metadata_ingest auto_start: true keep_running: true @@ -91,7 +76,7 @@ pipeline: when: cluster_available: ["$[[CLUSTER_ID]]"] - - name: logging_indexing_merge + - name: merge_logging auto_start: true keep_running: true processor: @@ -106,19 +91,22 @@ pipeline: tag: "request_logging" worker_size: 1 bulk_size_in_kb: 1 - - name: consume-logging_requests + + - name: ingest_merged_requests auto_start: true keep_running: true + retry_delay_in_ms: 5000 + max_running_in_ms: 30000 processor: - bulk_indexing: + idle_timeout_in_seconds: 5 bulk: compress: true - batch_size_in_mb: 1 - batch_size_in_docs: 1 + batch_size_in_mb: 10 + batch_size_in_docs: 1000 consumer: fetch_max_messages: 100 queues: type: indexing_merge - tag: "request_logging" when: cluster_available: ["$[[CLUSTER_ID]]"] \ No newline at end of file diff --git a/config/task_config.tpl b/config/task_config.tpl new file mode 100644 index 00000000..c5abc715 --- /dev/null +++ b/config/task_config.tpl @@ -0,0 +1,84 @@ +elasticsearch: + - id: $[[CLUSTER_ID]] + name: $[[CLUSTER_ID]] + enabled: true + endpoint: $[[CLUSTER_ENDPOINT]] + discovery: + enabled: false + basic_auth: + username: $[[CLUSTER_USERNAME]] + password: $[[keystore.$[[CLUSTER_ID]]_password]] + +pipeline: +#clsuter level metrics +- auto_start: $[[CLUSTER_LEVEL_TASKS_ENABLED]] + enabled: $[[CLUSTER_LEVEL_TASKS_ENABLED]] + keep_running: true + singleton: true + name: collect_$[[CLUSTER_ID]]_es_cluster_stats + retry_delay_in_ms: 10000 + processor: + - es_cluster_stats: + elasticsearch: $[[CLUSTER_ID]] + labels: + cluster_id: $[[CLUSTER_ID]] + when: + cluster_available: ["$[[CLUSTER_ID]]"] + +- auto_start: $[[CLUSTER_LEVEL_TASKS_ENABLED]] + enabled: $[[CLUSTER_LEVEL_TASKS_ENABLED]] + keep_running: true + singleton: true + name: collect_$[[CLUSTER_ID]]_es_index_stats + retry_delay_in_ms: 10000 + processor: + - es_index_stats: + elasticsearch: $[[CLUSTER_ID]] + labels: + cluster_id: $[[CLUSTER_ID]] + when: + cluster_available: ["$[[CLUSTER_ID]]"] + +- auto_start: $[[CLUSTER_LEVEL_TASKS_ENABLED]] + enabled: $[[CLUSTER_LEVEL_TASKS_ENABLED]] + keep_running: true + singleton: true + name: collect_$[[CLUSTER_ID]]_es_cluster_health + retry_delay_in_ms: 10000 + processor: + - es_cluster_health: + elasticsearch: $[[CLUSTER_ID]] + labels: + cluster_id: $[[CLUSTER_ID]] + when: + cluster_available: ["$[[CLUSTER_ID]]"] + +#node level metrics +- auto_start: $[[NODE_LEVEL_TASKS_ENABLED]] + enabled: $[[NODE_LEVEL_TASKS_ENABLED]] + keep_running: true + name: collect_$[[CLUSTER_ID]]_es_node_stats + retry_delay_in_ms: 10000 + processor: + - es_node_stats: + elasticsearch: $[[CLUSTER_ID]] + labels: + cluster_id: $[[CLUSTER_ID]] + when: + cluster_available: ["$[[CLUSTER_ID]]"] + +#node logs +- auto_start: $[[NODE_LEVEL_TASKS_ENABLED]] + enabled: $[[NODE_LEVEL_TASKS_ENABLED]] + keep_running: true + name: collect_$[[CLUSTER_ID]]_es_logs + retry_delay_in_ms: 10000 + processor: + - es_logs_processor: + elasticsearch: $[[CLUSTER_ID]] + labels: + cluster_id: $[[CLUSTER_ID]] + logs_path: $[[NODE_LOGS_PATH]] + queue_name: logs + when: + cluster_available: ["$[[CLUSTER_ID]]"] diff --git a/console.yml b/console.yml index 2faba170..b10556bd 100644 --- a/console.yml +++ b/console.yml @@ -52,7 +52,6 @@ elastic: metrics: enabled: true - major_ip_pattern: "192.*" queue: metrics elasticsearch: enabled: true diff --git a/main.go b/main.go index f7d71d81..dd01f93e 100644 --- a/main.go +++ b/main.go @@ -5,6 +5,7 @@ import ( "errors" _ "expvar" "infini.sh/console/plugin/api/email" + model2 "infini.sh/framework/core/model" _ "time/tzdata" log "github.com/cihub/seelog" @@ -12,7 +13,6 @@ import ( "infini.sh/console/model" "infini.sh/console/model/alerting" "infini.sh/console/model/insight" - "infini.sh/console/modules/agent" _ "infini.sh/console/plugin" setup1 "infini.sh/console/plugin/setup" alerting2 "infini.sh/console/service/alerting" @@ -37,6 +37,7 @@ import ( _ "infini.sh/framework/plugins" api2 "infini.sh/gateway/api" _ "infini.sh/gateway/proxy" + _ "infini.sh/framework/plugins/managed" ) var appConfig *config.AppConfig @@ -70,7 +71,6 @@ func main() { modules = append(modules, module.ModuleItem{Value: &task.TaskModule{}, Priority: 1}) modules = append(modules, module.ModuleItem{Value: &metrics.MetricsModule{}, Priority: 1}) modules = append(modules, module.ModuleItem{Value: &security.Module{}, Priority: 1}) - modules = append(modules, module.ModuleItem{Value: &agent.AgentModule{}, Priority: 100}) uiModule := &ui.UIModule{} @@ -122,11 +122,11 @@ func main() { elastic2.InitTemplate(false) - //orm.RegisterSchemaWithIndexName(model.Dict{}, "dict") + //orm.RegisterSchema(model.Dict{}, "dict") orm.RegisterSchemaWithIndexName(elastic.View{}, "view") orm.RegisterSchemaWithIndexName(elastic.CommonCommand{}, "commands") - //orm.RegisterSchemaWithIndexName(elastic.TraceTemplate{}, "trace-template") - orm.RegisterSchemaWithIndexName(model.Instance{}, "instance") + //orm.RegisterSchema(elastic.TraceTemplate{}, "trace-template") + //orm.RegisterSchema(model.Instance{}, "instance") orm.RegisterSchemaWithIndexName(alerting.Rule{}, "alert-rule") orm.RegisterSchemaWithIndexName(alerting.Alert{}, "alert-history") orm.RegisterSchemaWithIndexName(alerting.AlertMessage{}, "alert-message") @@ -138,6 +138,8 @@ func main() { orm.RegisterSchemaWithIndexName(model.Layout{}, "layout") orm.RegisterSchemaWithIndexName(model.Notification{}, "notification") orm.RegisterSchemaWithIndexName(model.EmailServer{}, "email-server") + orm.RegisterSchemaWithIndexName(model2.Instance{}, "instance") + api.RegisterSchema() if global.Env().SetupRequired() { diff --git a/model/instance.go b/model/instance.go index 81b28946..9b7921e2 100644 --- a/model/instance.go +++ b/model/instance.go @@ -11,26 +11,15 @@ import ( "net/http" "time" - "infini.sh/framework/core/agent" - "infini.sh/framework/core/orm" + "infini.sh/framework/core/model" "infini.sh/framework/core/util" "infini.sh/framework/modules/pipeline" ) - -type Instance struct { - orm.ORMObjectBase - - //InstanceID string `json:"instance_id,omitempty" elastic_mapping:"instance_id: { type: keyword }"` - Name string `json:"name,omitempty" elastic_mapping:"name:{type:keyword,fields:{text: {type: text}}}"` - Endpoint string `json:"endpoint,omitempty" elastic_mapping:"endpoint: { type: keyword }"` - Version map[string]interface{} `json:"version,omitempty" elastic_mapping:"version: { type: object }"` - BasicAuth agent.BasicAuth `config:"basic_auth" json:"basic_auth,omitempty" elastic_mapping:"basic_auth:{type:object}"` - Owner string `json:"owner,omitempty" config:"owner" elastic_mapping:"owner:{type:keyword}"` - Tags []string `json:"tags,omitempty"` - Description string `json:"description,omitempty" config:"description" elastic_mapping:"description:{type:keyword}"` +type TaskWorker struct { + model.Instance } -func (inst *Instance) CreatePipeline(body []byte) error { +func (inst *TaskWorker) CreatePipeline(body []byte) error { req := &util.Request{ Method: http.MethodPost, Body: body, @@ -39,7 +28,7 @@ func (inst *Instance) CreatePipeline(body []byte) error { return inst.doRequest(req, nil) } -func (inst *Instance) StopPipeline(ctx context.Context, pipelineID string) error { +func (inst *TaskWorker) StopPipeline(ctx context.Context, pipelineID string) error { req := &util.Request{ Method: http.MethodPost, Url: fmt.Sprintf("%s/pipeline/task/%s/_stop", inst.Endpoint, pipelineID), @@ -48,13 +37,13 @@ func (inst *Instance) StopPipeline(ctx context.Context, pipelineID string) error return inst.doRequest(req, nil) } -func (inst *Instance) StopPipelineWithTimeout(pipelineID string, duration time.Duration) error { +func (inst *TaskWorker) StopPipelineWithTimeout(pipelineID string, duration time.Duration) error { ctx, cancel := context.WithTimeout(context.Background(), duration) defer cancel() return inst.StopPipeline(ctx, pipelineID) } -func (inst *Instance) StartPipeline(pipelineID string) error { +func (inst *TaskWorker) StartPipeline(pipelineID string) error { req := &util.Request{ Method: http.MethodPost, Url: fmt.Sprintf("%s/pipeline/task/%s/_start", inst.Endpoint, pipelineID), @@ -62,7 +51,7 @@ func (inst *Instance) StartPipeline(pipelineID string) error { return inst.doRequest(req, nil) } -func (inst *Instance) DeletePipeline(pipelineID string) error { +func (inst *TaskWorker) DeletePipeline(pipelineID string) error { req := &util.Request{ Method: http.MethodDelete, Url: fmt.Sprintf("%s/pipeline/task/%s", inst.Endpoint, pipelineID), @@ -70,7 +59,7 @@ func (inst *Instance) DeletePipeline(pipelineID string) error { return inst.doRequest(req, nil) } -func (inst *Instance) GetPipeline(pipelineID string) (*pipeline.PipelineStatus, error) { +func (inst *TaskWorker) GetPipeline(pipelineID string) (*pipeline.PipelineStatus, error) { if pipelineID == "" { return nil, errors.New("invalid pipelineID") } @@ -89,7 +78,7 @@ func (inst *Instance) GetPipeline(pipelineID string) (*pipeline.PipelineStatus, return &res, nil } -func (inst *Instance) GetPipelinesByIDs(pipelineIDs []string) (pipeline.GetPipelinesResponse, error) { +func (inst *TaskWorker) GetPipelinesByIDs(pipelineIDs []string) (pipeline.GetPipelinesResponse, error) { body := util.MustToJSONBytes(util.MapStr{ "ids": pipelineIDs, }) @@ -106,7 +95,7 @@ func (inst *Instance) GetPipelinesByIDs(pipelineIDs []string) (pipeline.GetPipel return res, err } -func (inst *Instance) DeleteQueueBySelector(selector util.MapStr) error { +func (inst *TaskWorker) DeleteQueueBySelector(selector util.MapStr) error { req := &util.Request{ Method: http.MethodDelete, Url: fmt.Sprintf("%s/queue/_search", inst.Endpoint), @@ -117,7 +106,7 @@ func (inst *Instance) DeleteQueueBySelector(selector util.MapStr) error { return inst.doRequest(req, nil) } -func (inst *Instance) DeleteQueueConsumersBySelector(selector util.MapStr) error { +func (inst *TaskWorker) DeleteQueueConsumersBySelector(selector util.MapStr) error { req := &util.Request{ Method: http.MethodDelete, Url: fmt.Sprintf("%s/queue/consumer/_search", inst.Endpoint), @@ -128,21 +117,22 @@ func (inst *Instance) DeleteQueueConsumersBySelector(selector util.MapStr) error return inst.doRequest(req, nil) } -func (inst *Instance) TryConnect(ctx context.Context) error { +func (inst *TaskWorker) TryConnect(ctx context.Context) error { req := &util.Request{ Method: http.MethodGet, - Url: fmt.Sprintf("%s/_framework/api/_info", inst.Endpoint), + Url: fmt.Sprintf("%s/_info", inst.Endpoint), Context: ctx, } return inst.doRequest(req, nil) } -func (inst *Instance) TryConnectWithTimeout(duration time.Duration) error { + +func (inst *TaskWorker) TryConnectWithTimeout(duration time.Duration) error { ctx, cancel := context.WithTimeout(context.Background(), duration) defer cancel() return inst.TryConnect(ctx) } -func (inst *Instance) doRequest(req *util.Request, resBody interface{}) error { +func (inst *TaskWorker) doRequest(req *util.Request, resBody interface{}) error { req.SetBasicAuth(inst.BasicAuth.Username, inst.BasicAuth.Password) result, err := util.ExecuteRequest(req) if err != nil { diff --git a/modules/agent/agent.go b/modules/agent/agent.go deleted file mode 100644 index 9b86e2d9..00000000 --- a/modules/agent/agent.go +++ /dev/null @@ -1,158 +0,0 @@ -/* Copyright © INFINI Ltd. All rights reserved. - * Web: https://infinilabs.com - * Email: hello#infini.ltd */ - -package agent - -import ( - "fmt" - log "github.com/cihub/seelog" - "infini.sh/console/modules/agent/api" - "infini.sh/console/modules/agent/client" - "infini.sh/console/modules/agent/common" - "infini.sh/console/modules/agent/model" - "infini.sh/console/modules/agent/state" - "infini.sh/framework/core/agent" - "infini.sh/framework/core/credential" - "infini.sh/framework/core/elastic" - "infini.sh/framework/core/env" - "infini.sh/framework/core/host" - "infini.sh/framework/core/kv" - "infini.sh/framework/core/orm" - "infini.sh/framework/core/util" - "time" -) - -func (module *AgentModule) Name() string { - return "agent" -} - -func (module *AgentModule) Setup() { - module.AgentConfig.Enabled = true - module.AgentConfig.StateManager.Enabled = true - exists, err := env.ParseConfig("agent", &module.AgentConfig) - if exists && err != nil { - panic(err) - } - if module.AgentConfig.Enabled { - api.Init() - } -} -func (module *AgentModule) Start() error { - if !module.AgentConfig.Enabled { - return nil - } - orm.RegisterSchemaWithIndexName(agent.Instance{}, "agent") - orm.RegisterSchemaWithIndexName(agent.ESNodeInfo{}, "agent-node") - orm.RegisterSchemaWithIndexName(host.HostInfo{}, "host") - orm.RegisterSchemaWithIndexName(agent.Setting{}, "agent-setting") - var ( - executor client.Executor - err error - caFile string - caKey string - ) - if module.AgentConfig.Setup != nil { - caFile = module.AgentConfig.Setup.CACertFile - caKey = module.AgentConfig.Setup.CAKeyFile - } - if caFile == "" && caKey == "" { - caFile, caKey, err = common.GetOrInitDefaultCaCerts() - if err != nil { - panic(err) - } - } - executor, err = client.NewMTLSExecutor(caFile, caKey) - if err != nil { - panic(err) - } - agClient := &client.Client{ - Executor: executor, - } - client.RegisterClient(agClient) - - if module.AgentConfig.StateManager.Enabled { - onlineAgentIDs, err := common.GetLatestOnlineAgentIDs(nil, 60) - if err != nil { - log.Error(err) - } - agents, err := common.LoadAgentsFromES("") - if err != nil { - log.Error(err) - } - agentIds := map[string]string{} - for _, ag := range agents { - if _, ok := onlineAgentIDs[ag.ID]; ok { - agentIds[ag.ID] = "online" - } - } - credential.RegisterChangeEvent(func(cred *credential.Credential) { - var effectsClusterIDs []string - elastic.WalkConfigs(func(key, value interface{}) bool { - if cfg, ok := value.(*elastic.ElasticsearchConfig); ok { - if cfg.CredentialID == cred.ID { - effectsClusterIDs = append(effectsClusterIDs, cfg.ID) - } - } - return true - }) - if len(effectsClusterIDs) > 0 { - queryDsl := util.MapStr{ - "query": util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "terms": util.MapStr{ - "metadata.labels.cluster_id": effectsClusterIDs, - }, - }, - }, - }, - }, - "script": util.MapStr{ - "source": fmt.Sprintf("ctx._source['updated'] = '%s'", time.Now().Format(time.RFC3339Nano)), - }, - } - err = orm.UpdateBy(agent.Setting{}, util.MustToJSONBytes(queryDsl)) - if err != nil { - log.Error(err) - } - } - //check ingest cluster credential - if module.AgentConfig.Setup != nil && module.AgentConfig.Setup.IngestClusterCredentialID == cred.ID { - agents, err = common.LoadAgentsFromES("") - if err != nil { - log.Error(err) - return - } - for _, ag := range agents { - err = kv.AddValue(model.KVAgentIngestConfigChanged, []byte(ag.ID), []byte("1")) - if err != nil { - log.Error(err) - } - } - } - }) - - sm := state.NewStateManager(time.Second*30, "agent_state", agentIds, agClient) - state.RegisterStateManager(sm) - go sm.LoopState() - } - return nil -} - -func (module *AgentModule) Stop() error { - if !module.AgentConfig.Enabled { - return nil - } - log.Info("start to stop agent module") - if module.AgentConfig.StateManager.Enabled { - state.GetStateManager().Stop() - } - log.Info("agent module was stopped") - return nil -} - -type AgentModule struct { - model.AgentConfig -} diff --git a/modules/agent/api/host.go b/modules/agent/api/host.go index 4fb061c8..f6341ebc 100644 --- a/modules/agent/api/host.go +++ b/modules/agent/api/host.go @@ -4,236 +4,223 @@ package api -import ( - "context" - "fmt" - log "github.com/cihub/seelog" - "infini.sh/console/modules/agent/state" - httprouter "infini.sh/framework/core/api/router" - "infini.sh/framework/core/host" - "infini.sh/framework/core/orm" - "infini.sh/framework/core/util" - "net/http" - "strings" - "time" -) - -func (h *APIHandler) enrollHost(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - var reqBody []struct { - AgentID string `json:"agent_id"` - HostName string `json:"host_name"` - IP string `json:"ip"` - Source string `json:"source"` - OSName string `json:"os_name"` - OSArch string `json:"os_arch"` - NodeID string `json:"node_uuid"` - } - err := h.DecodeJSON(req, &reqBody) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - errors := util.MapStr{} - for i, hi := range reqBody { - var ( - hostInfo *host.HostInfo - ) - switch hi.Source { - case "agent": - hostInfo, err = enrollHostFromAgent(hi.AgentID) - if err != nil { - errors[hi.IP] = util.MapStr{ - "error": err.Error(), - } - log.Error(err) - continue - } - hostInfo.IP = hi.IP - hostInfo.AgentID = hi.AgentID - err = orm.Create(nil, hostInfo) - if err != nil { - errors[hi.IP] = util.MapStr{ - "error": err.Error(), - } - log.Error(err) - continue - } - case "es_node": - hostInfo = &host.HostInfo{ - IP: hi.IP, - OSInfo: host.OS{ - Platform: hi.OSName, - KernelArch: hi.OSArch, - }, - NodeID: hi.NodeID, - } - default: - errors[hi.IP] = util.MapStr{ - "error": fmt.Errorf("unkonow source type"), - } - continue - } - hostInfo.Timestamp = time.Now() - var ctx *orm.Context - if i == len(reqBody) - 1 { - ctx = &orm.Context{ - Refresh: "wait_for", - } - } - hostInfo.OSInfo.Platform = strings.ToLower(hostInfo.OSInfo.Platform) - err = orm.Create(ctx, hostInfo) - if err != nil { - errors[hi.IP] = util.MapStr{ - "error": err.Error(), - } - log.Error(err) - continue - } - } - resBody := util.MapStr{ - "success": true, - } - if len(errors) > 0 { - resBody["errors"] = errors - resBody["success"] = false - } - - h.WriteJSON(w, resBody, http.StatusOK) -} - -func (h *APIHandler) deleteHost(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - hostID := ps.MustGetParameter("host_id") - hostInfo, err := getHost(hostID) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - ctx := orm.Context{ - Refresh: "wait_for", - } - err = orm.Delete(&ctx, hostInfo) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - h.WriteDeletedOKJSON(w, hostID) -} - -func (h *APIHandler) GetHostAgentInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - hostID := ps.MustGetParameter("host_id") - hostInfo, err := getHost(hostID) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - if hostInfo.AgentID == "" { - h.WriteJSON(w, util.MapStr{}, http.StatusOK) - return - } - - sm := state.GetStateManager() - ag, err := sm.GetAgent(hostInfo.AgentID) - if err != nil { - log.Error(err) - h.WriteJSON(w, util.MapStr{}, http.StatusOK) - return - } - aversion, err := ag.GetVersion() - if err == nil { - ag.Version = aversion - orm.Save(nil, ag) - } - h.WriteJSON(w, util.MapStr{ - "host_id": hostID, - "agent_id": ag.ID, - "version": ag.Version, - "status": hostInfo.AgentStatus, - "endpoint": ag.GetEndpoint(), - }, http.StatusOK) -} - -func getHost(hostID string) (*host.HostInfo, error){ - hostInfo := &host.HostInfo{} - hostInfo.ID = hostID - exists, err := orm.Get(hostInfo) - if err != nil { - return nil, fmt.Errorf("get host info error: %w", err) - } - if !exists { - return nil, fmt.Errorf("host [%s] not found", hostID) - } - return hostInfo, nil -} - -func (h *APIHandler) GetHostElasticProcess(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - hostID := ps.MustGetParameter("host_id") - hostInfo := &host.HostInfo{} - hostInfo.ID = hostID - exists, err := orm.Get(hostInfo) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - if !exists { - h.WriteError(w, fmt.Sprintf("host [%s] not found", hostID), http.StatusNotFound) - return - } - if hostInfo.AgentID == "" { - h.WriteJSON(w, util.MapStr{}, http.StatusOK) - return - } - sm := state.GetStateManager() - ag, err := sm.GetAgent(hostInfo.AgentID) - if err != nil { - log.Error(err) - h.WriteJSON(w, util.MapStr{}, http.StatusOK) - return - } - ctx,cancel := context.WithTimeout(context.Background(), time.Second * 10) - defer cancel() - esNodesInfo, err := sm.GetAgentClient().GetElasticsearchNodes(ctx, ag.GetEndpoint()) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - var processes []util.MapStr - for _, node := range esNodesInfo { - processes = append(processes, util.MapStr{ - "pid": node.ProcessInfo.PID, - "pid_status": node.ProcessInfo.Status, - "cluster_name": node.ClusterName, - "cluster_uuid": node.ClusterUuid, - "cluster_id": node.ClusterID, - "node_id": node.NodeUUID, - "node_name": node.NodeName, - "uptime_in_ms": time.Now().UnixMilli() - node.ProcessInfo.CreateTime, - }) - } - h.WriteJSON(w, util.MapStr{ - "elastic_processes": processes, - }, http.StatusOK) -} - -func enrollHostFromAgent(agentID string) (*host.HostInfo, error){ - sm := state.GetStateManager() - ag, err := sm.GetAgent(agentID) - if err != nil { - return nil, err - } - if ag == nil { - return nil, fmt.Errorf("can not found agent [%s]", agentID) - } - agentClient := sm.GetAgentClient() - hostInfo, err := agentClient.GetHostInfo(nil, ag.GetEndpoint()) - if err != nil { - return nil, err - } - hostInfo.AgentStatus = ag.Status - return hostInfo, nil -} \ No newline at end of file +// +//func (h *APIHandler) enrollHost(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { +// var reqBody []struct { +// AgentID string `json:"agent_id"` +// HostName string `json:"host_name"` +// IP string `json:"ip"` +// Source string `json:"source"` +// OSName string `json:"os_name"` +// OSArch string `json:"os_arch"` +// NodeID string `json:"node_uuid"` +// } +// err := h.DecodeJSON(req, &reqBody) +// if err != nil { +// log.Error(err) +// h.WriteError(w, err.Error(), http.StatusInternalServerError) +// return +// } +// errors := util.MapStr{} +// for i, hi := range reqBody { +// var ( +// hostInfo *host.HostInfo +// ) +// switch hi.Source { +// case "agent": +// hostInfo, err = enrollHostFromAgent(hi.AgentID) +// if err != nil { +// errors[hi.IP] = util.MapStr{ +// "error": err.Error(), +// } +// log.Error(err) +// continue +// } +// hostInfo.IP = hi.IP +// hostInfo.AgentID = hi.AgentID +// err = orm.Create(nil, hostInfo) +// if err != nil { +// errors[hi.IP] = util.MapStr{ +// "error": err.Error(), +// } +// log.Error(err) +// continue +// } +// case "es_node": +// hostInfo = &host.HostInfo{ +// IP: hi.IP, +// OSInfo: host.OS{ +// Platform: hi.OSName, +// KernelArch: hi.OSArch, +// }, +// NodeID: hi.NodeID, +// } +// default: +// errors[hi.IP] = util.MapStr{ +// "error": fmt.Errorf("unkonow source type"), +// } +// continue +// } +// hostInfo.Timestamp = time.Now() +// var ctx *orm.Context +// if i == len(reqBody) - 1 { +// ctx = &orm.Context{ +// Refresh: "wait_for", +// } +// } +// hostInfo.OSInfo.Platform = strings.ToLower(hostInfo.OSInfo.Platform) +// err = orm.Create(ctx, hostInfo) +// if err != nil { +// errors[hi.IP] = util.MapStr{ +// "error": err.Error(), +// } +// log.Error(err) +// continue +// } +// } +// resBody := util.MapStr{ +// "success": true, +// } +// if len(errors) > 0 { +// resBody["errors"] = errors +// resBody["success"] = false +// } +// +// h.WriteJSON(w, resBody, http.StatusOK) +//} +// +//func (h *APIHandler) deleteHost(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { +// hostID := ps.MustGetParameter("host_id") +// hostInfo, err := getHost(hostID) +// if err != nil { +// log.Error(err) +// h.WriteError(w, err.Error(), http.StatusInternalServerError) +// return +// } +// ctx := orm.Context{ +// Refresh: "wait_for", +// } +// err = orm.Delete(&ctx, hostInfo) +// if err != nil { +// log.Error(err) +// h.WriteError(w, err.Error(), http.StatusInternalServerError) +// return +// } +// h.WriteDeletedOKJSON(w, hostID) +//} +// +//func (h *APIHandler) GetHostAgentInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { +// hostID := ps.MustGetParameter("host_id") +// hostInfo, err := getHost(hostID) +// if err != nil { +// log.Error(err) +// h.WriteError(w, err.Error(), http.StatusInternalServerError) +// return +// } +// if hostInfo.AgentID == "" { +// h.WriteJSON(w, util.MapStr{}, http.StatusOK) +// return +// } +// +// sm := state.GetStateManager() +// ag, err := sm.GetAgent(hostInfo.AgentID) +// if err != nil { +// log.Error(err) +// h.WriteJSON(w, util.MapStr{}, http.StatusOK) +// return +// } +// aversion, err := ag.GetVersion() +// if err == nil { +// ag.Version = aversion +// orm.Save(nil, ag) +// } +// h.WriteJSON(w, util.MapStr{ +// "host_id": hostID, +// "agent_id": ag.ID, +// "version": ag.Version, +// "status": hostInfo.AgentStatus, +// "endpoint": ag.GetEndpoint(), +// }, http.StatusOK) +//} +// +//func getHost(hostID string) (*host.HostInfo, error){ +// hostInfo := &host.HostInfo{} +// hostInfo.ID = hostID +// exists, err := orm.Get(hostInfo) +// if err != nil { +// return nil, fmt.Errorf("get host info error: %w", err) +// } +// if !exists { +// return nil, fmt.Errorf("host [%s] not found", hostID) +// } +// return hostInfo, nil +//} +// +//func (h *APIHandler) GetHostElasticProcess(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { +// hostID := ps.MustGetParameter("host_id") +// hostInfo := &host.HostInfo{} +// hostInfo.ID = hostID +// exists, err := orm.Get(hostInfo) +// if err != nil { +// log.Error(err) +// h.WriteError(w, err.Error(), http.StatusInternalServerError) +// return +// } +// if !exists { +// h.WriteError(w, fmt.Sprintf("host [%s] not found", hostID), http.StatusNotFound) +// return +// } +// if hostInfo.AgentID == "" { +// h.WriteJSON(w, util.MapStr{}, http.StatusOK) +// return +// } +// sm := state.GetStateManager() +// ag, err := sm.GetAgent(hostInfo.AgentID) +// if err != nil { +// log.Error(err) +// h.WriteJSON(w, util.MapStr{}, http.StatusOK) +// return +// } +// ctx,cancel := context.WithTimeout(context.Background(), time.Second * 10) +// defer cancel() +// esNodesInfo, err := sm.GetAgentClient().GetElasticsearchNodes(ctx, ag.GetEndpoint()) +// if err != nil { +// log.Error(err) +// h.WriteError(w, err.Error(), http.StatusInternalServerError) +// return +// } +// var processes []util.MapStr +// for _, node := range esNodesInfo { +// processes = append(processes, util.MapStr{ +// "pid": node.ProcessInfo.PID, +// "pid_status": node.ProcessInfo.Status, +// "cluster_name": node.ClusterName, +// "cluster_uuid": node.ClusterUuid, +// "cluster_id": node.ClusterID, +// "node_id": node.NodeUUID, +// "node_name": node.NodeName, +// "uptime_in_ms": time.Now().UnixMilli() - node.ProcessInfo.CreateTime, +// }) +// } +// h.WriteJSON(w, util.MapStr{ +// "elastic_processes": processes, +// }, http.StatusOK) +//} +// +//func enrollHostFromAgent(agentID string) (*host.HostInfo, error){ +// sm := state.GetStateManager() +// ag, err := sm.GetAgent(agentID) +// if err != nil { +// return nil, err +// } +// if ag == nil { +// return nil, fmt.Errorf("can not found agent [%s]", agentID) +// } +// agentClient := sm.GetAgentClient() +// hostInfo, err := agentClient.GetHostInfo(nil, ag.GetEndpoint()) +// if err != nil { +// return nil, err +// } +// hostInfo.AgentStatus = ag.Status +// return hostInfo, nil +//} \ No newline at end of file diff --git a/modules/agent/api/init.go b/modules/agent/api/init.go index bcefa402..254d25f7 100644 --- a/modules/agent/api/init.go +++ b/modules/agent/api/init.go @@ -4,35 +4,32 @@ package api -import ( - "infini.sh/framework/core/api" - "infini.sh/framework/core/api/rbac/enum" -) - func Init() { - handler := APIHandler{} - api.HandleAPIMethod(api.POST, "/agent/instance", handler.createInstance) - api.HandleAPIMethod(api.GET, "/agent/instance/_search", handler.RequirePermission(handler.searchInstance, enum.PermissionAgentInstanceRead)) - api.HandleAPIMethod(api.GET, "/agent/instance/:instance_id", handler.getInstance) - api.HandleAPIMethod(api.PUT, "/agent/instance/:instance_id", handler.updateInstance) - api.HandleAPIMethod(api.DELETE, "/agent/instance/:instance_id", handler.RequirePermission(handler.deleteInstance, enum.PermissionAgentInstanceWrite)) - api.HandleAPIMethod(api.POST, "/agent/instance/_stats", handler.RequirePermission(handler.getInstanceStats, enum.PermissionAgentInstanceRead)) - api.HandleAPIMethod(api.GET, "/agent/log/node/:node_id/files", handler.RequirePermission(handler.getLogFilesByNode, enum.PermissionAgentInstanceRead)) - api.HandleAPIMethod(api.POST, "/agent/log/node/:node_id/_scroll", handler.RequirePermission(handler.getLogFileContent, enum.PermissionAgentInstanceRead)) - api.HandleAPIMethod(api.GET, "/agent/instance/:instance_id/_nodes", handler.RequirePermission(handler.getESNodesInfo, enum.PermissionAgentInstanceRead)) - api.HandleAPIMethod(api.POST, "/agent/instance/:instance_id/_nodes/_refresh", handler.RequirePermission(handler.refreshESNodesInfo, enum.PermissionAgentInstanceWrite)) - api.HandleAPIMethod(api.POST, "/agent/instance/:instance_id/node/_auth", handler.RequirePermission(handler.authESNode, enum.PermissionAgentInstanceWrite)) - api.HandleAPIMethod(api.DELETE, "/agent/instance/:instance_id/_nodes", handler.RequirePermission(handler.deleteESNode, enum.PermissionAgentInstanceWrite)) - api.HandleAPIMethod(api.POST, "/agent/instance/:instance_id/node/_associate", handler.RequirePermission(handler.associateESNode, enum.PermissionAgentInstanceWrite)) - api.HandleAPIMethod(api.POST, "/agent/instance/try_connect", handler.RequireLogin(handler.tryConnect)) - api.HandleAPIMethod(api.POST, "/agent/auto_associate", handler.RequirePermission(handler.autoAssociateESNode, enum.PermissionAgentInstanceWrite)) + //handler := APIHandler{} + //api.HandleAPIMethod(api.POST, "/instance", handler.registerInstance) //new - api.HandleAPIMethod(api.POST, "/host/_enroll", handler.enrollHost) - api.HandleAPIMethod(api.GET, "/host/:host_id/agent/info",handler.GetHostAgentInfo) - api.HandleAPIMethod(api.GET, "/host/:host_id/processes",handler.GetHostElasticProcess) - api.HandleAPIMethod(api.DELETE, "/host/:host_id",handler.deleteHost) - - - api.HandleAPIMethod(api.POST, "/agent/install_command", handler.RequireLogin(handler.generateInstallCommand)) - api.HandleAPIMethod(api.GET, "/agent/install.sh", handler.getInstallScript) + //api.HandleAPIMethod(api.POST, "/agent/instance", handler.registerInstance) + //api.HandleAPIMethod(api.GET, "/agent/instance/_search", handler.RequirePermission(handler.searchInstance, enum.PermissionAgentInstanceRead)) + //api.HandleAPIMethod(api.GET, "/agent/instance/:instance_id", handler.getInstance) + //api.HandleAPIMethod(api.PUT, "/agent/instance/:instance_id", handler.updateInstance) + //api.HandleAPIMethod(api.DELETE, "/agent/instance/:instance_id", handler.RequirePermission(handler.deleteInstance, enum.PermissionAgentInstanceWrite)) + //api.HandleAPIMethod(api.POST, "/agent/instance/_stats", handler.RequirePermission(handler.getInstanceStats, enum.PermissionAgentInstanceRead)) + //api.HandleAPIMethod(api.GET, "/agent/log/node/:node_id/files", handler.RequirePermission(handler.getLogFilesByNode, enum.PermissionAgentInstanceRead)) + //api.HandleAPIMethod(api.POST, "/agent/log/node/:node_id/_scroll", handler.RequirePermission(handler.getLogFileContent, enum.PermissionAgentInstanceRead)) + //api.HandleAPIMethod(api.GET, "/agent/instance/:instance_id/_nodes", handler.RequirePermission(handler.getESNodesInfo, enum.PermissionAgentInstanceRead)) + //api.HandleAPIMethod(api.POST, "/agent/instance/:instance_id/_nodes/_refresh", handler.RequirePermission(handler.refreshESNodesInfo, enum.PermissionAgentInstanceWrite)) + //api.HandleAPIMethod(api.POST, "/agent/instance/:instance_id/node/_auth", handler.RequirePermission(handler.authESNode, enum.PermissionAgentInstanceWrite)) + //api.HandleAPIMethod(api.DELETE, "/agent/instance/:instance_id/_nodes", handler.RequirePermission(handler.deleteESNode, enum.PermissionAgentInstanceWrite)) + //api.HandleAPIMethod(api.POST, "/agent/instance/:instance_id/node/_associate", handler.RequirePermission(handler.associateESNode, enum.PermissionAgentInstanceWrite)) + //api.HandleAPIMethod(api.POST, "/agent/instance/try_connect", handler.RequireLogin(handler.tryConnect)) + //api.HandleAPIMethod(api.POST, "/agent/auto_associate", handler.RequirePermission(handler.autoAssociateESNode, enum.PermissionAgentInstanceWrite)) + // + //api.HandleAPIMethod(api.POST, "/host/_enroll", handler.enrollHost) + //api.HandleAPIMethod(api.GET, "/host/:host_id/agent/info",handler.GetHostAgentInfo) + //api.HandleAPIMethod(api.GET, "/host/:host_id/processes",handler.GetHostElasticProcess) + //api.HandleAPIMethod(api.DELETE, "/host/:host_id",handler.deleteHost) + // + // + //api.HandleAPIMethod(api.POST, "/agent/install_command", handler.RequireLogin(handler.generateInstallCommand)) + //api.HandleAPIMethod(api.GET, "/agent/install.sh", handler.getInstallScript) } diff --git a/modules/agent/api/instance.go b/modules/agent/api/instance.go index 79bdbaf0..a6ebe0ed 100644 --- a/modules/agent/api/instance.go +++ b/modules/agent/api/instance.go @@ -5,1152 +5,9 @@ package api import ( - "context" - "fmt" - "net" - "net/http" - "net/url" - "strconv" - "strings" - "time" - - log "github.com/cihub/seelog" - "infini.sh/console/modules/agent/client" - common2 "infini.sh/console/modules/agent/common" - "infini.sh/console/modules/agent/model" - "infini.sh/console/modules/agent/state" - "infini.sh/framework/core/agent" "infini.sh/framework/core/api" - httprouter "infini.sh/framework/core/api/router" - "infini.sh/framework/core/elastic" - "infini.sh/framework/core/orm" - "infini.sh/framework/core/util" - elastic2 "infini.sh/framework/modules/elastic" - "infini.sh/framework/modules/elastic/common" ) type APIHandler struct { api.Handler } - -func (h *APIHandler) createInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - var obj = &agent.Instance{} - err := h.DecodeJSON(req, obj) - if err != nil { - h.WriteError(w, err.Error(), http.StatusInternalServerError) - log.Error(err) - return - } - //validate token for auto register - token := h.GetParameter(req, "token") - if token != "" { - if v, ok := tokens.Load(token); !ok { - h.WriteError(w, "token is invalid", http.StatusUnauthorized) - return - } else { - if t, ok := v.(*Token); !ok || t.CreatedAt.Add(ExpiredIn).Before(time.Now()) { - tokens.Delete(token) - h.WriteError(w, "token was expired", http.StatusUnauthorized) - return - } - } - remoteIP := util.ClientIP(req) - agCfg := common2.GetAgentConfig() - port := agCfg.Setup.Port - if port == "" { - port = "8080" - } - obj.Endpoint = fmt.Sprintf("https://%s:%s", remoteIP, port) - obj.Tags = append(obj.Tags, "mtls", "auto") - } - - //fetch more information of agent instance - res, err := client.GetClient().GetInstanceBasicInfo(context.Background(), obj.GetEndpoint()) - if err != nil { - errStr := fmt.Sprintf("get agent instance basic info error: %s", err.Error()) - h.WriteError(w, errStr, http.StatusInternalServerError) - log.Error(errStr) - return - } - if res.ID == "" { - errStr := fmt.Sprintf("got unexpected response of agent instance basic info: %s", util.MustToJSON(res)) - h.WriteError(w, errStr, http.StatusInternalServerError) - log.Error(errStr) - return - } else { - obj.ID = res.ID - obj.Version = res.Version - obj.MajorIP = res.MajorIP - obj.Host = res.Host - obj.IPS = res.IPS - if obj.Name == "" { - obj.Name = res.Name - } - } - oldInst := &agent.Instance{} - oldInst.ID = obj.ID - exists, err := orm.Get(oldInst) - - if err != nil && err != elastic2.ErrNotFound { - h.WriteError(w, err.Error(), http.StatusInternalServerError) - log.Error(err) - return - } - if exists { - errMsg := fmt.Sprintf("agent [%s] already exists", obj.ID) - h.WriteError(w, errMsg, http.StatusInternalServerError) - log.Error(errMsg) - return - } - if token != "" { - err, result := orm.GetBy("endpoint", obj.Endpoint, oldInst) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - if len(result.Result) > 0 { - if m, ok := result.Result[0].(map[string]interface{}); ok { - if id, ok := m["id"].(string); ok { - oldInst.ID = id - err = orm.Delete(nil, oldInst) - if err != nil { - log.Error(err) - } - } - } - } - } - - obj.Status = model.StatusOnline - err = orm.Create(nil, obj) - if err != nil { - h.WriteError(w, err.Error(), http.StatusInternalServerError) - log.Error(err) - return - } - err = client.GetClient().SaveIngestConfig(context.Background(), obj.GetEndpoint()) - if err != nil { - log.Error(err) - } - _, err = refreshNodesInfo(obj) - if err != nil { - log.Error(err) - } - - h.WriteCreatedOKJSON(w, obj.ID) - -} - -func (h *APIHandler) getInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - id := ps.MustGetParameter("instance_id") - - obj := agent.Instance{} - obj.ID = id - - exists, err := orm.Get(&obj) - if !exists || err != nil { - h.WriteJSON(w, util.MapStr{ - "_id": id, - "found": false, - }, http.StatusNotFound) - return - } - if err != nil { - h.WriteError(w, err.Error(), http.StatusInternalServerError) - log.Error(err) - return - } - - h.WriteJSON(w, util.MapStr{ - "found": true, - "_id": id, - "_source": obj, - }, 200) -} - -func (h *APIHandler) deleteInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - id := ps.MustGetParameter("instance_id") - - obj := agent.Instance{} - obj.ID = id - - exists, err := orm.Get(&obj) - if !exists || err != nil { - h.WriteJSON(w, util.MapStr{ - "_id": id, - "result": "not_found", - }, http.StatusNotFound) - return - } - - err = orm.Delete(nil, &obj) - if err != nil { - h.WriteError(w, err.Error(), http.StatusInternalServerError) - log.Error(err) - return - } - if sm := state.GetStateManager(); sm != nil { - sm.DeleteAgent(obj.ID) - } - queryDsl := util.MapStr{ - "query": util.MapStr{ - "term": util.MapStr{ - "agent_id": util.MapStr{ - "value": id, - }, - }, - }, - } - err = orm.DeleteBy(agent.ESNodeInfo{}, util.MustToJSONBytes(queryDsl)) - if err != nil { - h.WriteError(w, err.Error(), http.StatusInternalServerError) - log.Error("delete node info error: ", err) - return - } - - queryDsl = util.MapStr{ - "query": util.MapStr{ - "term": util.MapStr{ - "metadata.labels.agent_id": util.MapStr{ - "value": id, - }, - }, - }, - } - err = orm.DeleteBy(agent.Setting{}, util.MustToJSONBytes(queryDsl)) - if err != nil { - h.WriteError(w, err.Error(), http.StatusInternalServerError) - log.Error("delete agent settings error: ", err) - return - } - - h.WriteDeletedOKJSON(w, id) -} - -func (h *APIHandler) getInstanceStats(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - var instanceIDs = []string{} - err := h.DecodeJSON(req, &instanceIDs) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - if len(instanceIDs) == 0 { - h.WriteJSON(w, util.MapStr{}, http.StatusOK) - return - } - q := orm.Query{} - queryDSL := util.MapStr{ - "size": len(instanceIDs), - "query": util.MapStr{ - "terms": util.MapStr{ - "_id": instanceIDs, - }, - }, - } - q.RawQuery = util.MustToJSONBytes(queryDSL) - - err, res := orm.Search(&agent.Instance{}, &q) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - result := util.MapStr{} - for _, item := range res.Result { - instBytes, err := util.ToJSONBytes(item) - if err != nil { - log.Error(err) - continue - } - instance := agent.Instance{} - err = util.FromJSONBytes(instBytes, &instance) - if err != nil { - log.Error(err) - continue - } - agReq := &util.Request{ - Method: http.MethodGet, - Url: fmt.Sprintf("%s/stats", instance.GetEndpoint()), - - } - var resMap = util.MapStr{} - err = client.GetClient().DoRequest(agReq, &resMap) - - if err != nil { - log.Error(err) - result[instance.ID] = util.MapStr{} - continue - } - result[instance.ID] = resMap - } - h.WriteJSON(w, result, http.StatusOK) -} - -func (h *APIHandler) updateInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - id := ps.MustGetParameter("instance_id") - oldInst := agent.Instance{} - oldInst.ID = id - _, err := orm.Get(&oldInst) - if err != nil { - if err == elastic2.ErrNotFound { - h.WriteJSON(w, util.MapStr{ - "_id": id, - "result": "not_found", - }, http.StatusNotFound) - return - } - h.WriteError(w, err.Error(), http.StatusInternalServerError) - log.Error(err) - return - } - - obj := agent.Instance{} - err = h.DecodeJSON(req, &obj) - if err != nil { - h.WriteError(w, err.Error(), http.StatusInternalServerError) - log.Error(err) - return - } - - oldInst.Name = obj.Name - oldInst.Endpoint = obj.Endpoint - oldInst.Description = obj.Description - oldInst.Tags = obj.Tags - oldInst.BasicAuth = obj.BasicAuth - err = orm.Update(&orm.Context{ - Refresh: "wait_for", - }, &oldInst) - if err != nil { - h.WriteError(w, err.Error(), http.StatusInternalServerError) - log.Error(err) - return - } - - h.WriteJSON(w, util.MapStr{ - "_id": obj.ID, - "result": "updated", - }, 200) -} - -func (h *APIHandler) searchInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - - var ( - keyword = h.GetParameterOrDefault(req, "keyword", "") - //queryDSL = `{"query":{"bool":{"must":[%s]}}, "size": %d, "from": %d}` - strSize = h.GetParameterOrDefault(req, "size", "20") - strFrom = h.GetParameterOrDefault(req, "from", "0") - ) - - var ( - mustQ []interface{} - ) - - if keyword != "" { - mustQ = append(mustQ, util.MapStr{ - "query_string": util.MapStr{ - "default_field": "*", - "query": keyword, - }, - }) - } - size, _ := strconv.Atoi(strSize) - if size <= 0 { - size = 20 - } - from, _ := strconv.Atoi(strFrom) - if from < 0 { - from = 0 - } - - queryDSL := util.MapStr{ - "size": size, - "from": from, - } - if len(mustQ) > 0 { - queryDSL["query"] = util.MapStr{ - "bool": util.MapStr{ - "must": mustQ, - }, - } - } - - q := orm.Query{} - q.RawQuery = util.MustToJSONBytes(queryDSL) - - err, res := orm.Search(&agent.Instance{}, &q) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - - h.Write(w, res.Raw) -} - -func (h *APIHandler) getESNodesInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - id := ps.MustGetParameter("instance_id") - obj := agent.Instance{} - obj.ID = id - exists, err := orm.Get(&obj) - if !exists || err != nil { - h.WriteJSON(w, util.MapStr{ - "_id": id, - "found": false, - }, http.StatusNotFound) - return - } - nodes, err := refreshNodesInfo(&obj) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - var nodeUUIDs []string - for _, node := range nodes { - if node.NodeUUID != "" { - nodeUUIDs = append(nodeUUIDs, node.NodeUUID) - } - } - if len(nodeUUIDs) == 0 { - h.WriteJSON(w, nodes, http.StatusOK) - return - } - query := util.MapStr{ - "size": len(nodeUUIDs), - "query": util.MapStr{ - "terms": util.MapStr{ - "metadata.node_id": nodeUUIDs, - }, - }, - "collapse": util.MapStr{ - "field": "metadata.node_id", - }, - } - q := orm.Query{ - RawQuery: util.MustToJSONBytes(query), - } - err, result := orm.Search(elastic.NodeConfig{}, &q) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - idToAddresses := map[string]string{} - for _, row := range result.Result { - if rowM, ok := row.(map[string]interface{}); ok { - nodeUUID, _ := util.MapStr(rowM).GetValue("metadata.node_id") - transportAddr, _ := util.MapStr(rowM).GetValue("metadata.labels.transport_address") - if v, ok := nodeUUID.(string); ok { - idToAddresses[v] = transportAddr.(string) - } - } - } - var nNodes []tempNode - for _, node := range nodes { - nNode := tempNode{ - ESNodeInfo: node, - } - if node.NodeUUID != "" { - if addr, ok := idToAddresses[node.NodeUUID]; ok { - nNode.TransportAddress = addr - } - } - nNodes = append(nNodes, nNode) - } - - - h.WriteJSON(w, nNodes, http.StatusOK) -} -type tempNode struct { - agent.ESNodeInfo - TransportAddress string `json:"transport_address"` -} - -func (h *APIHandler) refreshESNodesInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - id := ps.MustGetParameter("instance_id") - obj := agent.Instance{} - obj.ID = id - exists, err := orm.Get(&obj) - if !exists || err != nil { - h.WriteJSON(w, util.MapStr{ - "_id": id, - "found": false, - }, http.StatusNotFound) - return - } - _, err = refreshNodesInfo(&obj) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - h.WriteAckOKJSON(w) -} - -func (h *APIHandler) authESNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - id := ps.MustGetParameter("instance_id") - inst := agent.Instance{} - inst.ID = id - exists, err := orm.Get(&inst) - if !exists || err != nil { - h.WriteJSON(w, util.MapStr{ - "_id": id, - "found": false, - }, http.StatusNotFound) - return - } - reqBody := struct { - NodeID string `json:"node_id"` - ESConfig *elastic.ElasticsearchConfig `json:"es_config"` - }{} - err = h.DecodeJSON(req, &reqBody) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - oldNodeInfo := &agent.ESNodeInfo{ - ID: reqBody.NodeID, - } - exists, err = orm.Get(oldNodeInfo) - if !exists || err != nil { - h.WriteJSON(w, fmt.Sprintf("node [%s] of agent [%s] was not found", oldNodeInfo.ID, inst.Name), http.StatusInternalServerError) - return - } - - cfg := reqBody.ESConfig - if cfg.Endpoint == "" { - cfg.Endpoint = fmt.Sprintf("%s://%s", cfg.Schema, cfg.Host) - } - basicAuth, err := common.GetBasicAuth(cfg) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - cfg.BasicAuth = &basicAuth - nodeInfo, err := client.GetClient().AuthESNode(context.Background(), inst.GetEndpoint(), *cfg) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - host, port, err := net.SplitHostPort(nodeInfo.PublishAddress) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - if !util.StringInArray(inst.IPS, host) && !net.ParseIP(host).IsLoopback() { - h.WriteError(w, fmt.Sprintf("got node host %s not match any ip of %v", host, inst.IPS), http.StatusInternalServerError) - return - } - if oldNodeInfo.HttpPort != port { - h.WriteError(w, fmt.Sprintf("port mismatch, got: %s,expected: %s", port, oldNodeInfo.HttpPort), http.StatusInternalServerError) - return - } - if oldNodeInfo.ProcessInfo.PID != nodeInfo.ProcessInfo.PID { - h.WriteError(w, fmt.Sprintf("process id mismatch, got: %d,expected: %d", nodeInfo.ProcessInfo.PID, oldNodeInfo.ProcessInfo.PID), http.StatusInternalServerError) - return - } - - nodeInfo.ID = oldNodeInfo.ID - nodeInfo.AgentID = inst.ID - err = orm.Save(nil, nodeInfo) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - h.WriteJSON(w, nodeInfo, http.StatusOK) -} - -func (h *APIHandler) associateESNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - instID := ps.MustGetParameter("instance_id") - reqBody := struct { - ID string `json:"id"` - ClusterID string `json:"cluster_id"` - }{} - err := h.DecodeJSON(req, &reqBody) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - node := agent.ESNodeInfo{ - ID: reqBody.ID, - } - _, err = orm.Get(&node) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - if node.AgentID != instID { - errStr := fmt.Sprintf("agent id not match: %s, %s", node.AgentID, instID) - log.Error(errStr) - h.WriteError(w, errStr, http.StatusInternalServerError) - return - } - node.ClusterID = reqBody.ClusterID - err = orm.Save(&orm.Context{ - Refresh: "wait_for", - }, node) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - settings, err := common2.GetAgentSettings(instID, 0) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - setting := pickAgentSettings(settings, node) - if setting == nil { - setting, err = getAgentTaskSetting(instID, node) - if err != nil { - log.Error("get agent task setting error: ", err) - } - err = orm.Create(nil, setting) - if err != nil { - log.Error("save agent task setting error: ", err) - } - } - h.WriteAckOKJSON(w) -} - -func (h *APIHandler) autoAssociateESNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - reqBody := struct { - ClusterIDs []string `json:"cluster_ids"` - }{} - err := h.DecodeJSON(req, &reqBody) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - // query not associated nodes info - nodesM, err := getUnAssociateNodes() - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - if len(nodesM) == 0 { - h.WriteAckOKJSON(w) - return - } - agentIds := make([]string, 0, len(nodesM)) - for agentID := range nodesM { - agentIds = append(agentIds, agentID) - } - agents, err := getAgentByIds(agentIds) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - for _, clusterID := range reqBody.ClusterIDs { - // query cluster basicauth - cfg := elastic.GetConfig(clusterID) - basicAuth, err := common.GetBasicAuth(cfg) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - taskSetting, err := getSettingsByClusterID(cfg.ID) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - for agentID, nodes := range nodesM { - var ( - inst *agent.Instance - ok bool - ) - if inst, ok = agents[agentID]; !ok { - log.Warnf("agent [%v] was not found", agentID) - continue - } - settings, err := common2.GetAgentSettings(agentID, 0) - if err != nil { - log.Error(err) - continue - } - for _, node := range nodes { - host := node.PublishAddress - var endpoint string - if strings.HasPrefix(host, "::") { - instURL, err := url.Parse(inst.Endpoint) - if err != nil { - log.Error(err) - continue - } - host = instURL.Hostname() - endpoint = fmt.Sprintf("%s://%s:%s", node.Schema, host, node.HttpPort) - } else { - endpoint = fmt.Sprintf("%s://%s", node.Schema, host) - } - escfg := elastic.ElasticsearchConfig{ - Endpoint: endpoint, - BasicAuth: &basicAuth, - } - nodeInfo, err := client.GetClient().AuthESNode(context.Background(), inst.GetEndpoint(), escfg) - if err != nil { - log.Warn(err) - continue - } - //matched - if nodeInfo.ClusterUuid == cfg.ClusterUUID { - //update node info - nodeInfo.ID = node.ID - nodeInfo.AgentID = inst.ID - nodeInfo.ClusterID = cfg.ID - err = orm.Save(nil, nodeInfo) - if err != nil { - log.Error(err) - continue - } - setting := pickAgentSettings(settings, node) - if setting == nil { - tsetting := model.TaskSetting{ - NodeStats: &model.NodeStatsTask{ - Enabled: true, - }, - Logs: &model.LogsTask{ - Enabled: true, - LogsPath: nodeInfo.Path.Logs, - }, - } - if taskSetting.IndexStats != nil { - tsetting.IndexStats = taskSetting.IndexStats - taskSetting.IndexStats = nil - } - if taskSetting.ClusterHealth != nil { - tsetting.ClusterHealth = taskSetting.ClusterHealth - taskSetting.ClusterHealth = nil - } - if taskSetting.ClusterStats != nil { - tsetting.ClusterStats = taskSetting.ClusterStats - taskSetting.ClusterStats = nil - } - setting = &agent.Setting{ - Metadata: agent.SettingsMetadata{ - Category: "agent", - Name: "task", - Labels: util.MapStr{ - "agent_id": agentID, - "cluster_uuid": nodeInfo.ClusterUuid, - "cluster_id": nodeInfo.ClusterID, - "node_uuid": nodeInfo.NodeUUID, - "endpoint": fmt.Sprintf("%s://%s", nodeInfo.Schema, nodeInfo.PublishAddress), - }, - }, - Payload: util.MapStr{ - "task": tsetting, - }, - } - err = orm.Create(nil, setting) - if err != nil { - log.Error("save agent task setting error: ", err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - } - } - } - - } - } - h.WriteAckOKJSON(w) -} - -func getAgentByIds(agentIDs []string)(map[string]*agent.Instance, error){ - query := util.MapStr{ - "size": len(agentIDs), - "query": util.MapStr{ - "terms": util.MapStr{ - "id": agentIDs, - }, - }, - } - q := orm.Query{ - RawQuery: util.MustToJSONBytes(query), - } - err, result := orm.Search(agent.Instance{}, &q) - if err != nil { - return nil, err - } - agents := map[string]*agent.Instance{} - for _, row := range result.Result { - inst := agent.Instance{} - buf := util.MustToJSONBytes(row) - util.MustFromJSONBytes(buf, &inst) - agents[inst.ID] = &inst - } - return agents, nil -} - -func (h *APIHandler) deleteESNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - id := ps.MustGetParameter("instance_id") - nodeIDs := []string{} - err := h.DecodeJSON(req, &nodeIDs) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - if len(nodeIDs) > 0 { - q := util.MapStr{ - "query": util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "terms": util.MapStr{ - "id": nodeIDs, - }, - }, - { - "term": util.MapStr{ - "agent_id": util.MapStr{ - "value": id, - }, - }, - }, - }, - }, - }, - } - err = orm.DeleteBy(agent.ESNodeInfo{}, util.MustToJSONBytes(q)) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - q = util.MapStr{ - "query": util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "terms": util.MapStr{ - "metadata.labels.node_uuid": nodeIDs, - }, - }, - { - "term": util.MapStr{ - "metadata.labels.agent_id": util.MapStr{ - "value": id, - }, - }, - }, - }, - }, - }, - } - } - h.WriteAckOKJSON(w) -} - -func (h *APIHandler) tryConnect(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - var reqBody = struct { - Endpoint string `json:"endpoint"` - BasicAuth agent.BasicAuth - }{} - err := h.DecodeJSON(req, &reqBody) - if err != nil { - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - connectRes, err := client.GetClient().GetInstanceBasicInfo(context.Background(), reqBody.Endpoint) - if err != nil { - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - h.WriteJSON(w, connectRes, http.StatusOK) -} - -func refreshNodesInfo(inst *agent.Instance) ([]agent.ESNodeInfo, error) { - oldNodesInfo, err := getNodesInfoFromES(inst.ID) - if err != nil { - return nil, fmt.Errorf("get elasticsearch nodes info from es error: %w", err) - } - nodesInfo, err := client.GetClient().GetElasticsearchNodes(context.Background(), inst.GetEndpoint()) - if err != nil { - log.Errorf("get elasticsearch nodes error: %v", err) - //return nodes info from es after failed to get nodes info from agent - var nodes = []agent.ESNodeInfo{} - for _, nodeInfo := range oldNodesInfo { - nodes = append(nodes, *nodeInfo) - } - return nodes, nil - } - - oldPids := map[int]struct{}{} - var resultNodes []agent.ESNodeInfo - if err != nil { - return nil, err - } - for _, node := range nodesInfo { - oldNode := getNodeByPidOrUUID(oldNodesInfo, node.ProcessInfo.PID, node.NodeUUID, node.HttpPort) - node.AgentID = inst.ID - if oldNode != nil { - node.ID = oldNode.ID - //keep old validate info - if node.ClusterUuid == "" && oldNode.ClusterUuid != "" { - node = *oldNode - } - oldPids[oldNode.ProcessInfo.PID] = struct{}{} - } else { - node.ID = util.GetUUID() - } - if node.ClusterUuid != "" { - if oldNode != nil && oldNode.ClusterID != "" { - node.ClusterID = oldNode.ClusterID - } - } - - node.Status = "online" - err = orm.Save(nil, node) - if err != nil { - log.Error("save node info error: ", err) - } - resultNodes = append(resultNodes, node) - } - for k, node := range oldNodesInfo { - if _, ok := oldPids[k]; !ok { - //auto delete not associated cluster - if node.ClusterID == "" { - log.Info("delete node with pid: ", node.ProcessInfo.PID) - err = orm.Delete(nil, node) - if err != nil { - log.Error("delete node info error: ", err) - } - continue - } - node.Status = "offline" - err = orm.Save(nil, node) - if err != nil { - log.Error("save node info error: ", err) - } - resultNodes = append(resultNodes, *node) - } - } - return resultNodes, nil -} - -func getNodeByPidOrUUID(nodes map[int]*agent.ESNodeInfo, pid int, uuid string, port string) *agent.ESNodeInfo { - if nodes[pid] != nil { - return nodes[pid] - } - for _, node := range nodes { - if node.NodeUUID != "" && node.NodeUUID == uuid { - return node - } - } - return nil -} - -func getNodesInfoFromES(agentID string) (map[int]*agent.ESNodeInfo, error) { - query := util.MapStr{ - "size": 1000, - "query": util.MapStr{ - "term": util.MapStr{ - "agent_id": util.MapStr{ - "value": agentID, - }, - }, - }, - } - q := orm.Query{ - RawQuery: util.MustToJSONBytes(query), - } - - err, result := orm.Search(agent.ESNodeInfo{}, &q) - if err != nil { - return nil, err - } - nodesInfo := map[int]*agent.ESNodeInfo{} - for _, row := range result.Result { - node := agent.ESNodeInfo{} - buf := util.MustToJSONBytes(row) - util.MustFromJSONBytes(buf, &node) - nodesInfo[node.ProcessInfo.PID] = &node - } - return nodesInfo, nil -} - -func getUnAssociateNodes() (map[string][]agent.ESNodeInfo, error){ - query := util.MapStr{ - "size": 3000, - "query": util.MapStr{ - "bool": util.MapStr{ - "must_not": []util.MapStr{ - { - "exists": util.MapStr{ - "field": "cluster_id", - }, - }, - }, - }, - }, - } - q := orm.Query{ - RawQuery: util.MustToJSONBytes(query), - } - - err, result := orm.Search(agent.ESNodeInfo{}, &q) - if err != nil { - return nil, err - } - nodesInfo := map[string][]agent.ESNodeInfo{} - for _, row := range result.Result { - node := agent.ESNodeInfo{} - buf := util.MustToJSONBytes(row) - util.MustFromJSONBytes(buf, &node) - nodesInfo[node.AgentID] = append(nodesInfo[node.AgentID], node) - } - return nodesInfo, nil -} - -func pickAgentSettings(settings []agent.Setting, nodeInfo agent.ESNodeInfo) *agent.Setting { - for _, setting := range settings { - if setting.Metadata.Labels["node_uuid"] == nodeInfo.NodeUUID { - return &setting - } - } - return nil -} - -func getAgentTaskSetting(agentID string, node agent.ESNodeInfo) (*agent.Setting, error) { - taskSetting, err := getSettingsByClusterID(node.ClusterID) - if err != nil { - return nil, err - } - taskSetting.Logs = &model.LogsTask{ - Enabled: true, - LogsPath: node.Path.Logs, - } - return &agent.Setting{ - Metadata: agent.SettingsMetadata{ - Category: "agent", - Name: "task", - Labels: util.MapStr{ - "agent_id": agentID, - "cluster_uuid": node.ClusterUuid, - "cluster_id": node.ClusterID, - "node_uuid": node.NodeUUID, - "endpoint": fmt.Sprintf("%s://%s", node.Schema, node.PublishAddress), - }, - }, - Payload: util.MapStr{ - "task": taskSetting, - }, - }, nil -} - -// getSettingsByClusterID query agent task settings with cluster id -func getSettingsByClusterID(clusterID string) (*model.TaskSetting, error) { - err, result := querySettingsByClusterID(clusterID) - if err != nil { - return nil, err - } - - setting := &model.TaskSetting{ - NodeStats: &model.NodeStatsTask{ - Enabled: true, - }, - } - var ( - clusterStats = true - indexStats = true - clusterHealth = true - ) - keys := []string{"payload.task.cluster_stats.enabled", "payload.task.cluster_health.enabled", "payload.task.index_stats.enabled"} - for _, row := range result.Result { - if v, ok := row.(map[string]interface{}); ok { - vm := util.MapStr(v) - for _, key := range keys { - tv, _ := vm.GetValue(key) - if tv == true { - switch key { - case "payload.task.cluster_stats.enabled": - clusterStats = false - case "payload.task.index_stats.enabled": - indexStats = false - case "payload.task.cluster_health.enabled": - clusterHealth = false - } - } - } - } - } - if clusterStats { - setting.ClusterStats = &model.ClusterStatsTask{ - Enabled: true, - } - } - if indexStats { - setting.IndexStats = &model.IndexStatsTask{ - Enabled: true, - } - } - if clusterHealth { - setting.ClusterHealth = &model.ClusterHealthTask{ - Enabled: true, - } - } - return setting, nil -} - -func querySettingsByClusterID(clusterID string)(error, orm.Result){ - queryDsl := util.MapStr{ - "size": 500, - "query": util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "metadata.labels.cluster_id": util.MapStr{ - "value": clusterID, - }, - }, - }, - }, - "minimum_should_match": 1, - "should": []util.MapStr{ - { - "term": util.MapStr{ - "payload.task.cluster_health.enabled": util.MapStr{ - "value": true, - }, - }, - }, - { - "term": util.MapStr{ - "payload.task.cluster_stats.enabled": util.MapStr{ - "value": true, - }, - }, - }, - { - "term": util.MapStr{ - "payload.task.index_stats.enabled": util.MapStr{ - "value": true, - }, - }, - }, - }, - }, - }, - } - q := orm.Query{ - RawQuery: util.MustToJSONBytes(queryDsl), - } - return orm.Search(agent.Setting{}, &q) -} \ No newline at end of file diff --git a/modules/agent/api/log.go b/modules/agent/api/log.go index a50d3e8f..e4bb6341 100644 --- a/modules/agent/api/log.go +++ b/modules/agent/api/log.go @@ -9,8 +9,8 @@ import ( log "github.com/cihub/seelog" "infini.sh/console/modules/agent/client" "infini.sh/console/modules/agent/state" - "infini.sh/framework/core/agent" httprouter "infini.sh/framework/core/api/router" + "infini.sh/framework/core/model" "infini.sh/framework/core/orm" "infini.sh/framework/core/util" "net/http" @@ -80,7 +80,7 @@ func (h *APIHandler) getLogFileContent(w http.ResponseWriter, req *http.Request, h.WriteJSON(w, res, http.StatusOK) } -func getAgentByNodeID(nodeID string) (*agent.Instance, *agent.ESNodeInfo, error){ +func getAgentByNodeID(nodeID string) (*model.Instance, *model.ESNodeInfo, error){ queryDsl := util.MapStr{ "size":1, "query": util.MapStr{ @@ -101,24 +101,24 @@ func getAgentByNodeID(nodeID string) (*agent.Instance, *agent.ESNodeInfo, error) q := &orm.Query{ RawQuery: util.MustToJSONBytes(queryDsl), } - err, result := orm.Search(agent.ESNodeInfo{}, q) + err, result := orm.Search(model.ESNodeInfo{}, q) if err != nil { return nil,nil, err } if len(result.Result) > 0 { buf := util.MustToJSONBytes(result.Result[0]) - node := &agent.ESNodeInfo{} - err = util.FromJSONBytes(buf, node) - inst := &agent.Instance{} - inst.ID = node.AgentID + v := &model.ESNodeInfo{} + err = util.FromJSONBytes(buf, v) + inst := &model.Instance{} + inst.ID = v.AgentID _, err = orm.Get(inst) if err != nil { - return nil, node, err + return nil, v, err } if inst.Name == "" { - return nil, node, nil + return nil, v, nil } - return inst, node, nil + return inst, v, nil } return nil, nil, nil } diff --git a/modules/agent/api/setup.go b/modules/agent/api/setup.go deleted file mode 100644 index 5c1c5253..00000000 --- a/modules/agent/api/setup.go +++ /dev/null @@ -1,155 +0,0 @@ -/* Copyright © INFINI Ltd. All rights reserved. - * Web: https://infinilabs.com - * Email: hello#infini.ltd */ - -package api - -import ( - "fmt" - log "github.com/cihub/seelog" - "github.com/valyala/fasttemplate" - "infini.sh/console/modules/agent/common" - "infini.sh/framework/core/api/rbac" - httprouter "infini.sh/framework/core/api/router" - "infini.sh/framework/core/global" - "infini.sh/framework/core/util" - "os" - - "net/http" - "path" - "strings" - "sync" - "time" -) - -var tokens = sync.Map{} -type Token struct { - CreatedAt time.Time - UserID string -} - -const ExpiredIn = time.Millisecond * 1000 * 60 * 60 -func (h *APIHandler) generateInstallCommand(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - claims, ok := req.Context().Value("user").(*rbac.UserClaims) - if !ok { - h.WriteError(w, "user not found", http.StatusInternalServerError) - return - } - agCfg := common.GetAgentConfig() - if agCfg == nil || agCfg.Setup == nil { - h.WriteError(w, "agent setup config was not found, please configure in the configuration file first", http.StatusInternalServerError) - return - } - var ( - t *Token - tokenStr string - ) - tokens.Range(func(key, value any) bool { - if v, ok := value.(*Token); ok && claims.UserId == v.UserID { - t = v - tokenStr = key.(string) - return false - } - return true - }) - - if t == nil { - tokenStr = util.GetUUID() - t = &Token{ - CreatedAt: time.Now(), - UserID: claims.UserId, - } - }else{ - if t.CreatedAt.Add(ExpiredIn).Before(time.Now()){ - tokens.Delete(tokenStr) - tokenStr = util.GetUUID() - t = &Token{ - CreatedAt: time.Now(), - UserID: claims.UserId, - } - }else{ - t.CreatedAt = time.Now() - } - } - tokens.Store(tokenStr, t) - consoleEndpoint := agCfg.Setup.ConsoleEndpoint - if consoleEndpoint == "" { - consoleEndpoint = getDefaultConsoleEndpoint(req) - } - - - h.WriteJSON(w, util.MapStr{ - "script": fmt.Sprintf(`curl -sSL %s/agent/install.sh?token=%s |sudo bash -s -- -u %s -v %s -t /opt/agent`, consoleEndpoint, tokenStr, agCfg.Setup.DownloadURL, agCfg.Setup.Version), - //"script": fmt.Sprintf(`sudo BASE_URL="%s" AGENT_VER="%s" INSTALL_PATH="/opt" bash -c "$(curl -L '%s/agent/install.sh?token=%s')"`, agCfg.Setup.DownloadURL, agCfg.Setup.Version, consoleEndpoint, tokenStr), - "token": tokenStr, - "expired_at": t.CreatedAt.Add(ExpiredIn), - }, http.StatusOK) -} - -func getDefaultConsoleEndpoint(req *http.Request) string{ - scheme := "http" - if req.TLS != nil { - scheme = "https" - } - return fmt.Sprintf("%s://%s", scheme, req.Host) -} - -func (h *APIHandler) getInstallScript(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - tokenStr := h.GetParameter(req, "token") - if strings.TrimSpace(tokenStr) == "" { - h.WriteError(w, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized) - return - } - if v, ok := tokens.Load(tokenStr); !ok { - h.WriteError(w, "token is invalid", http.StatusUnauthorized) - return - }else{ - if t, ok := v.(*Token); !ok || t.CreatedAt.Add(ExpiredIn).Before(time.Now()) { - tokens.Delete(tokenStr) - h.WriteError(w, "token was expired", http.StatusUnauthorized) - return - } - } - agCfg := common.GetAgentConfig() - caCert, clientCertPEM, clientKeyPEM, err := common.GenerateServerCert(agCfg.Setup.CACertFile, agCfg.Setup.CAKeyFile) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - - scriptTplPath := path.Join(global.Env().GetConfigDir(), "install_agent.tpl") - buf, err := os.ReadFile(scriptTplPath) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - tpl := fasttemplate.New(string(buf), "{{", "}}") - downloadURL := agCfg.Setup.DownloadURL - if downloadURL == "" { - downloadURL = "https://release.infinilabs.com/agent/stable/" - } - port := agCfg.Setup.Port - if port == "" { - port = "8080" - } - consoleEndpoint := agCfg.Setup.ConsoleEndpoint - if consoleEndpoint == "" { - consoleEndpoint = getDefaultConsoleEndpoint(req) - } - _, err = tpl.Execute(w, map[string]interface{}{ - "base_url": agCfg.Setup.DownloadURL, - "agent_version": agCfg.Setup.Version, - "console_endpoint": consoleEndpoint, - "client_crt": clientCertPEM, - "client_key": clientKeyPEM, - "ca_crt": caCert, - "port": port, - "token": tokenStr, - }) - if err != nil { - log.Error(err) - } -} - diff --git a/modules/agent/client/client.go b/modules/agent/client/client.go index 586dc67f..7f26092d 100644 --- a/modules/agent/client/client.go +++ b/modules/agent/client/client.go @@ -8,7 +8,7 @@ import ( "context" "fmt" "infini.sh/console/modules/agent/common" - "infini.sh/framework/core/agent" + "infini.sh/framework/core/model" "infini.sh/framework/core/elastic" "infini.sh/framework/core/host" "infini.sh/framework/core/util" @@ -32,10 +32,10 @@ type ClientAPI interface { GetElasticProcess(ctx context.Context, agentBaseURL string, agentID string)(interface{}, error) GetElasticLogFiles(ctx context.Context, agentBaseURL string, logsPath string)(interface{}, error) GetElasticLogFileContent(ctx context.Context, agentBaseURL string, body interface{})(interface{}, error) - GetInstanceBasicInfo(ctx context.Context, agentBaseURL string) (*agent.Instance, error) + GetInstanceBasicInfo(ctx context.Context, agentBaseURL string) (*model.Instance, error) RegisterElasticsearch(ctx context.Context, agentBaseURL string, cfgs []elastic.ElasticsearchConfig) error - GetElasticsearchNodes(ctx context.Context, agentBaseURL string) ([]agent.ESNodeInfo, error) - AuthESNode(ctx context.Context, agentBaseURL string, cfg elastic.ElasticsearchConfig) (*agent.ESNodeInfo, error) + GetElasticsearchNodes(ctx context.Context, agentBaseURL string) ([]model.ESNodeInfo, error) + AuthESNode(ctx context.Context, agentBaseURL string, cfg elastic.ElasticsearchConfig) (*model.ESNodeInfo, error) CreatePipeline(ctx context.Context, agentBaseURL string, body []byte) error DeletePipeline(ctx context.Context, agentBaseURL, pipelineID string) error SetKeystoreValue(ctx context.Context, agentBaseURL string, key, value string) error @@ -49,28 +49,30 @@ type Client struct { } -func (client *Client) GetHostInfo(ctx context.Context, agentBaseURL string) (*host.HostInfo, error) { - req := &util.Request{ - Method: http.MethodGet, - Url: fmt.Sprintf("%s/agent/host/_basic", agentBaseURL), - Context: ctx, - } - resBody := struct { - Success bool `json:"success"` - Error string `json:"error"` - HostInfo *host.HostInfo `json:"result"` - }{} - err := client.DoRequest(req, &resBody) - if err != nil { - return nil, err - } - if resBody.Success != true { - return nil, fmt.Errorf("enroll error from client: %v", resBody.Error) - } - return resBody.HostInfo, nil -} +//func (client *Client) GetHostInfo(ctx context.Context, agentBaseURL string) (*host.HostInfo, error) { +// req := &util.Request{ +// Method: http.MethodGet, +// Url: fmt.Sprintf("%s/agent/host/_basic", agentBaseURL), +// Context: ctx, +// } +// resBody := struct { +// Success bool `json:"success"` +// Error string `json:"error"` +// HostInfo *host.HostInfo `json:"result"` +// }{} +// err := client.DoRequest(req, &resBody) +// if err != nil { +// return nil, err +// } +// if resBody.Success != true { +// return nil, fmt.Errorf("enroll error from client: %v", resBody.Error) +// } +// return resBody.HostInfo, nil +//} +//TODO func (client *Client) GetElasticProcess(ctx context.Context, agentBaseURL string, agentID string)(interface{}, error) { + panic("implement me") req := &util.Request{ Method: http.MethodGet, Url: fmt.Sprintf("%s/elasticsearch/%s/process/_elastic", agentBaseURL, agentID), @@ -88,6 +90,9 @@ func (client *Client) GetElasticProcess(ctx context.Context, agentBaseURL string } func (client *Client) GetElasticLogFiles(ctx context.Context, agentBaseURL string, logsPath string)(interface{}, error) { + panic("implement me") + + reqBody := util.MustToJSONBytes(util.MapStr{ "logs_path": logsPath, }) @@ -133,13 +138,13 @@ func (client *Client) GetElasticLogFileContent(ctx context.Context, agentBaseURL } , nil } -func (client *Client) GetInstanceBasicInfo(ctx context.Context, agentBaseURL string) (*agent.Instance, error){ +func (client *Client) GetInstanceBasicInfo(ctx context.Context, agentBaseURL string) (*model.Instance, error){ req := &util.Request{ Method: http.MethodGet, - Url: fmt.Sprintf("%s/agent/_info", agentBaseURL ), + Url: fmt.Sprintf("%s/_info", agentBaseURL ), Context: ctx, } - resBody := &agent.Instance{} + resBody := &model.Instance{} err := client.DoRequest(req, &resBody) return resBody, err } @@ -166,13 +171,13 @@ func (client *Client) RegisterElasticsearch(ctx context.Context, agentBaseURL st return nil } -func (client *Client) GetElasticsearchNodes(ctx context.Context, agentBaseURL string) ([]agent.ESNodeInfo, error) { +func (client *Client) GetElasticsearchNodes(ctx context.Context, agentBaseURL string) ([]model.ESNodeInfo, error) { req := &util.Request{ Method: http.MethodGet, - Url: fmt.Sprintf("%s/elasticsearch/_nodes", agentBaseURL ), + Url: fmt.Sprintf("%s/elasticsearch/nodes/_discovery", agentBaseURL ), Context: ctx, } - resBody := []agent.ESNodeInfo{} + resBody := []model.ESNodeInfo{} err := client.DoRequest(req, &resBody) if err != nil { return nil, err @@ -181,7 +186,7 @@ func (client *Client) GetElasticsearchNodes(ctx context.Context, agentBaseURL st return resBody, nil } -func (client *Client) AuthESNode(ctx context.Context, agentBaseURL string, cfg elastic.ElasticsearchConfig) (*agent.ESNodeInfo, error) { +func (client *Client) AuthESNode(ctx context.Context, agentBaseURL string, cfg elastic.ElasticsearchConfig) (*model.ESNodeInfo, error) { reqBody, err := util.ToJSONBytes(cfg) if err != nil { return nil, err @@ -192,7 +197,7 @@ func (client *Client) AuthESNode(ctx context.Context, agentBaseURL string, cfg e Context: ctx, Body: reqBody, } - resBody := &agent.ESNodeInfo{} + resBody := &model.ESNodeInfo{} err = client.DoRequest(req, resBody) if err != nil { return nil, err @@ -227,22 +232,22 @@ func (client *Client) SetKeystoreValue(ctx context.Context, agentBaseURL string, } req := &util.Request{ Method: http.MethodPost, - Url: fmt.Sprintf("%s/_framework/keystore", agentBaseURL), + Url: fmt.Sprintf("%s/keystore", agentBaseURL), Context: ctx, Body: util.MustToJSONBytes(body), } return client.DoRequest(req, nil) } -func (client *Client) SaveDynamicConfig(ctx context.Context, agentBaseURL string, name, content string) error{ +func (client *Client) SaveDynamicConfig(ctx context.Context, agentBaseURL string, filename, content string) error{ body := util.MapStr{ "configs": util.MapStr{ - name: content, + filename: content, }, } req := &util.Request{ Method: http.MethodPost, - Url: fmt.Sprintf("%s/agent/config", agentBaseURL), + Url: fmt.Sprintf("%s/config/_update", agentBaseURL), Context: ctx, Body: util.MustToJSONBytes(body), } @@ -260,7 +265,7 @@ func (client *Client) SaveIngestConfig(ctx context.Context, agentBaseURL string) return fmt.Errorf("set keystore value to agent error: %w", err) } } - err = client.SaveDynamicConfig(context.Background(), agentBaseURL, "ingest", ingestCfg ) + err = client.SaveDynamicConfig(context.Background(), agentBaseURL, "ingest_variables.yml", ingestCfg ) if err != nil { return fmt.Errorf("save dynamic config to agent error: %w", err) } diff --git a/modules/agent/common/helper.go b/modules/agent/common/helper.go index ff47c752..5f75db2a 100644 --- a/modules/agent/common/helper.go +++ b/modules/agent/common/helper.go @@ -6,19 +6,19 @@ package common import ( "fmt" - "infini.sh/console/modules/agent/model" - "infini.sh/framework/core/agent" + log "github.com/cihub/seelog" + model2 "infini.sh/console/modules/agent/model" "infini.sh/framework/core/credential" "infini.sh/framework/core/elastic" "infini.sh/framework/core/event" "infini.sh/framework/core/global" + "infini.sh/framework/core/model" "infini.sh/framework/core/orm" "infini.sh/framework/core/util" - log "src/github.com/cihub/seelog" "strings" ) -func ParseAgentSettings(settings []agent.Setting)(*model.ParseAgentSettingsResult, error){ +func ParseAgentSettings(settings []model.Setting)(*model2.ParseAgentSettingsResult, error){ var clusterCfgs []elastic.ElasticsearchConfig var ( pipelines []util.MapStr @@ -58,7 +58,7 @@ func ParseAgentSettings(settings []agent.Setting)(*model.ParseAgentSettingsResul if err != nil { return nil, err } - taskSetting := model.TaskSetting{} + taskSetting := model2.TaskSetting{} err = util.FromJSONBytes(vBytes, &taskSetting) if err != nil { return nil, err @@ -70,7 +70,7 @@ func ParseAgentSettings(settings []agent.Setting)(*model.ParseAgentSettingsResul pipelines = append(pipelines, partPipelines...) toDeletePipelineNames = append(toDeletePipelineNames, partDeletePipelineNames...) } - return &model.ParseAgentSettingsResult{ + return &model2.ParseAgentSettingsResult{ ClusterConfigs: clusterCfgs, Pipelines: pipelines, ToDeletePipelineNames: toDeletePipelineNames, @@ -80,7 +80,7 @@ func ParseAgentSettings(settings []agent.Setting)(*model.ParseAgentSettingsResul // GetAgentSettings query agent setting by agent id and updated timestamp, // if there has any setting was updated, then return setting list includes settings not changed, // otherwise return empty setting list -func GetAgentSettings(agentID string, timestamp int64) ([]agent.Setting, error) { +func GetAgentSettings(agentID string, timestamp int64) ([]model.Setting, error) { query := util.MapStr{ "bool": util.MapStr{ "must": []util.MapStr{ @@ -116,13 +116,13 @@ func GetAgentSettings(agentID string, timestamp int64) ([]agent.Setting, error) }, } queryDsl := util.MapStr{ - "size": 500, + "size": 1000, "query": query, } q := orm.Query{ RawQuery: util.MustToJSONBytes(queryDsl), } - err, result := orm.Search(agent.Setting{}, &q) + err, result := orm.Search(model.Setting{}, &q) if err != nil { return nil, fmt.Errorf("search settings error: %w", err) } @@ -130,11 +130,11 @@ func GetAgentSettings(agentID string, timestamp int64) ([]agent.Setting, error) return nil, nil } var ( - settings []agent.Setting + settings []model.Setting hasUpdated bool ) for _, row := range result.Result { - setting := agent.Setting{} + setting := model.Setting{} buf, err := util.ToJSONBytes(row) if err != nil { return nil, err @@ -158,7 +158,7 @@ func getClusterConfigReferenceName(clusterID, nodeUUID string) string { return fmt.Sprintf("%s_%s", clusterID, nodeUUID) } -func TransformSettingsToConfig(setting *model.TaskSetting, clusterID, nodeUUID string) ([]util.MapStr, []string, error) { +func TransformSettingsToConfig(setting *model2.TaskSetting, clusterID, nodeUUID string) ([]util.MapStr, []string, error) { if setting == nil { return nil, nil, fmt.Errorf("empty setting") } @@ -279,6 +279,7 @@ func newClusterMetricPipeline(processorName string, clusterID string, nodeUUID s "name": getMetricPipelineName(clusterID, processorName), "auto_start": true, "keep_running": true, + "singleton": true, "retry_delay_in_ms": 10000, "processor": []util.MapStr{cfg}, } @@ -290,22 +291,22 @@ func getMetricPipelineName(clusterID, processorName string) string{ } -func LoadAgentsFromES(clusterID string) ([]agent.Instance, error) { +func LoadAgentsFromES(clusterID string) ([]model.Instance, error) { q := orm.Query{ Size: 1000, } if clusterID != "" { q.Conds = orm.And(orm.Eq("id", clusterID)) } - err, result := orm.Search(agent.Instance{}, &q) + err, result := orm.Search(model.Instance{}, &q) if err != nil { return nil, fmt.Errorf("query agent error: %w", err) } if len(result.Result) > 0 { - var agents = make([]agent.Instance, 0, len(result.Result)) + var agents = make([]model.Instance, 0, len(result.Result)) for _, row := range result.Result { - ag := agent.Instance{} + ag := model.Instance{} bytes := util.MustToJSONBytes(row) err = util.FromJSONBytes(bytes, &ag) if err != nil { @@ -417,7 +418,7 @@ func GetAgentIngestConfig() (string, *elastic.BasicAuth, error) { } if emptyIngestClusterEndpoint { cfg := elastic.GetConfig(global.MustLookupString(elastic.GlobalSystemElasticsearchID)) - endpoint = cfg.Endpoint + endpoint = cfg.GetAnyEndpoint() } var ( @@ -441,101 +442,14 @@ func GetAgentIngestConfig() (string, *elastic.BasicAuth, error) { cfg := elastic.GetConfig(global.MustLookupString(elastic.GlobalSystemElasticsearchID)) basicAuth = *cfg.BasicAuth } - tpl := `elasticsearch: - - name: default - enabled: true - endpoint: %s - discovery: - enabled: true - basic_auth: - username: %s - password: $[[keystore.ingest_cluster_password]] -metrics: - enabled: true - queue: metrics - network: - enabled: true - summary: true - details: true - memory: - metrics: - - swap - - memory - disk: - metrics: - - iops - - usage - cpu: - metrics: - - idle - - system - - user - - iowait - - load - instance: - enabled: true -pipeline: - - name: logs_indexing_merge - auto_start: true - keep_running: true - processor: - - indexing_merge: - index_name: ".infini_logs" - elasticsearch: "default" - input_queue: "logs" - idle_timeout_in_seconds: 10 - output_queue: - name: "logs_requests" - label: - tag: "logs" - worker_size: 1 - bulk_size_in_mb: 10 - - name: ingest_logs - auto_start: true - keep_running: true - processor: - - bulk_indexing: - bulk: - compress: true - batch_size_in_mb: 5 - batch_size_in_docs: 5000 - consumer: - fetch_max_messages: 100 - queues: - type: indexing_merge - tag: "logs" - when: - cluster_available: ["default"] - - name: metrics_indexing_merge - auto_start: true - keep_running: true - processor: - - indexing_merge: - elasticsearch: "default" - index_name: ".infini_metrics" - input_queue: "metrics" - output_queue: - name: "metrics_requests" - label: - tag: "metrics" - worker_size: 1 - bulk_size_in_mb: 5 - - name: ingest_metrics - auto_start: true - keep_running: true - processor: - - bulk_indexing: - bulk: - compress: true - batch_size_in_mb: 5 - batch_size_in_docs: 5000 - consumer: - fetch_max_messages: 100 - queues: - type: indexing_merge - tag: "metrics" - when: - cluster_available: ["default"]` + tpl := `configs.template: + - name: "ingest" + path: ./config/ingest_config.tpl + variable: + INGEST_CLUSTER_ID: "default_ingest_cluster" + INGEST_CLUSTER_ENDPOINT: ["%s"] + INGEST_CLUSTER_USERNAME: "%s" +` tpl = fmt.Sprintf(tpl, endpoint, basicAuth.Username) return tpl, &basicAuth, nil } \ No newline at end of file diff --git a/modules/agent/model/config.go b/modules/agent/model/config.go index f4bb4a2d..d731a3f8 100644 --- a/modules/agent/model/config.go +++ b/modules/agent/model/config.go @@ -5,20 +5,20 @@ package model type AgentConfig struct { - Enabled bool `config:"enabled"` - StateManager struct{ + Enabled bool `config:"enabled"` + StateManager struct { Enabled bool `config:"enabled"` } `config:"state_manager"` Setup *SetupConfig `config:"setup"` } type SetupConfig struct { - DownloadURL string `config:"download_url"` - Version string `config:"version"` - CACertFile string `config:"ca_cert"` - CAKeyFile string `config:"ca_key"` - ConsoleEndpoint string `config:"console_endpoint"` - IngestClusterEndpoint interface{} `config:"ingest_cluster_endpoint"` - IngestClusterCredentialID string `config:"ingest_cluster_credential_id"` - Port string `config:"port"` + DownloadURL string `config:"download_url"` + Version string `config:"version"` + CACertFile string `config:"ca_cert"` + CAKeyFile string `config:"ca_key"` + ConsoleEndpoint string `config:"console_endpoint"` + IngestClusterEndpoint interface{} `config:"ingest_cluster_endpoint"` + IngestClusterCredentialID string `config:"ingest_cluster_credential_id"` + Port string `config:"port"` } diff --git a/modules/agent/state/state.go b/modules/agent/state/state.go index b833f956..a4ff8342 100644 --- a/modules/agent/state/state.go +++ b/modules/agent/state/state.go @@ -12,8 +12,8 @@ import ( "gopkg.in/yaml.v2" "infini.sh/console/modules/agent/client" "infini.sh/console/modules/agent/common" - "infini.sh/console/modules/agent/model" - "infini.sh/framework/core/agent" + model2 "infini.sh/console/modules/agent/model" + "infini.sh/framework/core/model" "infini.sh/framework/core/host" "infini.sh/framework/core/kv" "infini.sh/framework/core/orm" @@ -44,9 +44,9 @@ func IsEnabled() bool { } type IStateManager interface { - GetAgent(ID string) (*agent.Instance, error) - UpdateAgent(inst *agent.Instance, syncToES bool) (*agent.Instance, error) - GetTaskAgent(clusterID string) (*agent.Instance, error) + GetAgent(ID string) (*model.Instance, error) + UpdateAgent(inst *model.Instance, syncToES bool) (*model.Instance, error) + GetTaskAgent(clusterID string) (*model.Instance, error) DeleteAgent(agentID string) error LoopState() Stop() @@ -87,7 +87,7 @@ func (sm *StateManager) checkAgentStatus() { for agentID := range onlineAgentIDs { if _, ok := sm.agentIds[agentID]; !ok { log.Infof("status of agent [%s] changed to online", agentID) - sm.agentIds[agentID] = model.StatusOnline + sm.agentIds[agentID] = model2.StatusOnline } } sm.agentMutex.Unlock() @@ -104,25 +104,25 @@ func (sm *StateManager) checkAgentStatus() { sm.syncSettings(agentID) sm.syncIngestSettings(agentID) if _, ok := onlineAgentIDs[agentID]; ok { - host.UpdateHostAgentStatus(agentID, model.StatusOnline) - if status == model.StatusOnline { + host.UpdateHostAgentStatus(agentID, model2.StatusOnline) + if status == model2.StatusOnline { return } // status change to online sm.agentMutex.Lock() - sm.agentIds[agentID] = model.StatusOnline + sm.agentIds[agentID] = model2.StatusOnline sm.agentMutex.Unlock() log.Infof("status of agent [%s] changed to online", agentID) return }else{ // already offline - if status == model.StatusOffline { + if status == model2.StatusOffline { return } } // status change to offline sm.agentMutex.Lock() - sm.agentIds[agentID] = model.StatusOffline + sm.agentIds[agentID] = model2.StatusOffline sm.agentMutex.Unlock() ag, err := sm.GetAgent(agentID) if err != nil { @@ -131,7 +131,7 @@ func (sm *StateManager) checkAgentStatus() { } return } - ag.Status = model.StatusOffline + ag.Status = model2.StatusOffline log.Infof("agent [%s] is offline", ag.Endpoint) _, err = sm.UpdateAgent(ag, true) if err != nil { @@ -139,13 +139,13 @@ func (sm *StateManager) checkAgentStatus() { return } //update host agent status - host.UpdateHostAgentStatus(ag.ID, model.StatusOffline) + host.UpdateHostAgentStatus(ag.ID, model2.StatusOffline) }(agentID) } } func (sm *StateManager) getLastSyncSettingsTimestamp(agentID string) int64{ - vbytes, err := kv.GetValue(model.KVSyncDynamicTaskSettings, []byte(agentID)) + vbytes, err := kv.GetValue(model2.KVSyncDynamicTaskSettings, []byte(agentID)) if err != nil { log.Error(err) } @@ -191,7 +191,7 @@ func (sm *StateManager) syncSettings(agentID string) { clusterCfg := util.MapStr{ "name": cfg.ID, "enabled": true, - "endpoint": cfg.Endpoint, + "endpoint": cfg.GetAnyEndpoint(), } if cfg.BasicAuth != nil && cfg.BasicAuth.Password != ""{ cid := cfg.ID @@ -223,16 +223,17 @@ func (sm *StateManager) syncSettings(agentID string) { log.Error("serialize config to yaml error: ", err) return } - err = agClient.SaveDynamicConfig(context.Background(), ag.GetEndpoint(), "dynamic_task", string(cfgBytes)) + //TODO + err = agClient.SaveDynamicConfig(context.Background(), ag.GetEndpoint(), "dynamic_task.yml", string(cfgBytes)) newTimestampStr := strconv.FormatInt(newTimestamp, 10) - err = kv.AddValue(model.KVSyncDynamicTaskSettings, []byte(agentID), []byte(newTimestampStr)) + err = kv.AddValue(model2.KVSyncDynamicTaskSettings, []byte(agentID), []byte(newTimestampStr)) if err != nil { log.Error(err) } } func (sm *StateManager) syncIngestSettings(agentID string) { - v, err := kv.GetValue(model.KVAgentIngestConfigChanged, []byte(agentID)) + v, err := kv.GetValue(model2.KVAgentIngestConfigChanged, []byte(agentID)) if err != nil { log.Error(err) } @@ -248,11 +249,11 @@ func (sm *StateManager) syncIngestSettings(agentID string) { } err = sm.agentClient.SaveIngestConfig(context.Background(), ag.GetEndpoint()) if err == nil { - kv.AddValue(model.KVAgentIngestConfigChanged,[]byte(agentID), []byte("0")) + kv.AddValue(model2.KVAgentIngestConfigChanged,[]byte(agentID), []byte("0")) } } -func (sm *StateManager) getAvailableAgent(clusterID string) (*agent.Instance, error) { +func (sm *StateManager) getAvailableAgent(clusterID string) (*model.Instance, error) { agents, err := common.LoadAgentsFromES(clusterID) if err != nil { return nil, err @@ -289,14 +290,14 @@ func (sm *StateManager) Stop() { <-sm.stopCompleteC } -func (sm *StateManager) GetAgent(ID string) (*agent.Instance, error) { +func (sm *StateManager) GetAgent(ID string) (*model.Instance, error) { buf, err := kv.GetValue(sm.KVKey, []byte(ID)) if err != nil { return nil, err } strTime, _ := jsonparser.GetString(buf, "timestamp") timestamp, _ := time.Parse(time.RFC3339, strTime) - inst := &agent.Instance{} + inst := &model.Instance{} inst.ID = ID if time.Since(timestamp) > sm.TTL { exists, err := orm.Get(inst) @@ -317,7 +318,7 @@ func (sm *StateManager) GetAgent(ID string) (*agent.Instance, error) { return inst, err } -func (sm *StateManager) UpdateAgent(inst *agent.Instance, syncToES bool) (*agent.Instance, error) { +func (sm *StateManager) UpdateAgent(inst *model.Instance, syncToES bool) (*model.Instance, error) { //inst.Timestamp = time.Now() err := kv.AddValue(sm.KVKey, []byte(inst.ID), util.MustToJSONBytes(inst)) if syncToES { @@ -332,7 +333,7 @@ func (sm *StateManager) UpdateAgent(inst *agent.Instance, syncToES bool) (*agent return inst, err } -func (sm *StateManager) GetTaskAgent(clusterID string) (*agent.Instance, error) { +func (sm *StateManager) GetTaskAgent(clusterID string) (*model.Instance, error) { return nil, nil } diff --git a/plugin/api/gateway/api.go b/plugin/api/gateway/api.go index 6ef82a9e..ad6d5b09 100644 --- a/plugin/api/gateway/api.go +++ b/plugin/api/gateway/api.go @@ -5,14 +5,7 @@ package gateway import ( - "crypto/tls" "infini.sh/framework/core/api" - "infini.sh/framework/core/api/rbac/enum" - "net" - "net/http" - "net/url" - log "github.com/cihub/seelog" - "time" ) type GatewayAPI struct { @@ -20,47 +13,48 @@ type GatewayAPI struct { } func InitAPI() { - gateway:=GatewayAPI{} - api.HandleAPIMethod(api.POST, "/gateway/instance/try_connect", gateway.RequireLogin(gateway.tryConnect)) - api.HandleAPIMethod(api.GET, "/gateway/instance/:instance_id", gateway.RequirePermission(gateway.getInstance, enum.PermissionGatewayInstanceRead)) - api.HandleAPIMethod(api.POST, "/gateway/instance", gateway.RequirePermission(gateway.createInstance, enum.PermissionGatewayInstanceWrite)) - api.HandleAPIMethod(api.PUT, "/gateway/instance/:instance_id", gateway.RequirePermission(gateway.updateInstance, enum.PermissionGatewayInstanceWrite)) - api.HandleAPIMethod(api.DELETE, "/gateway/instance/:instance_id", gateway.RequirePermission(gateway.deleteInstance, enum.PermissionGatewayInstanceWrite)) - api.HandleAPIMethod(api.GET, "/gateway/instance/_search", gateway.RequirePermission(gateway.searchInstance, enum.PermissionGatewayInstanceRead)) - api.HandleAPIMethod(api.POST, "/gateway/instance/status", gateway.RequirePermission(gateway.getInstanceStatus, enum.PermissionGatewayInstanceRead)) + //gateway:=GatewayAPI{} + //api.HandleAPIMethod(api.POST, "/gateway/instance/try_connect", gateway.RequireLogin(gateway.tryConnect)) + //api.HandleAPIMethod(api.GET, "/gateway/instance/:instance_id", gateway.RequirePermission(gateway.getInstance, enum.PermissionGatewayInstanceRead)) + //api.HandleAPIMethod(api.POST, "/gateway/instance", gateway.RequirePermission(gateway.createInstance, enum.PermissionGatewayInstanceWrite)) + //api.HandleAPIMethod(api.PUT, "/gateway/instance/:instance_id", gateway.RequirePermission(gateway.updateInstance, enum.PermissionGatewayInstanceWrite)) + //api.HandleAPIMethod(api.DELETE, "/gateway/instance/:instance_id", gateway.RequirePermission(gateway.deleteInstance, enum.PermissionGatewayInstanceWrite)) + //api.HandleAPIMethod(api.GET, "/gateway/instance/_search", gateway.RequirePermission(gateway.searchInstance, enum.PermissionGatewayInstanceRead)) + //api.HandleAPIMethod(api.POST, "/gateway/instance/status", gateway.RequirePermission(gateway.getInstanceStatus, enum.PermissionGatewayInstanceRead)) + // + //api.HandleAPIMethod(api.POST, "/gateway/instance/:instance_id/_proxy", gateway.RequirePermission(gateway.proxy, enum.PermissionGatewayInstanceRead)) - api.HandleAPIMethod(api.POST, "/gateway/instance/:instance_id/_proxy", gateway.RequirePermission(gateway.proxy, enum.PermissionGatewayInstanceRead)) - - api.HandleAPIMethod(api.GET, "/_platform/nodes", gateway.getExecutionNodes) - api.HandleAPIFunc("/ws_proxy", func(w http.ResponseWriter, req *http.Request) { - log.Debug(req.RequestURI) - endpoint := req.URL.Query().Get("endpoint") - path := req.URL.Query().Get("path") - var tlsConfig = &tls.Config{ - InsecureSkipVerify: true, - } - target, err := url.Parse(endpoint) - if err != nil { - log.Error(err) - return - } - newURL, err := url.Parse(path) - if err != nil { - log.Error(err) - return - } - req.URL.Path = newURL.Path - req.URL.RawPath = newURL.RawPath - req.URL.RawQuery = "" - req.RequestURI = req.URL.RequestURI() - req.Header.Set("HOST", target.Host) - req.Host = target.Host - wsProxy := NewSingleHostReverseProxy(target) - wsProxy.Dial = (&net.Dialer{ - Timeout: 30 * time.Second, - KeepAlive: 30 * time.Second, - }).Dial - wsProxy.TLSClientConfig = tlsConfig - wsProxy.ServeHTTP(w, req) - }) + //api.HandleAPIMethod(api.GET, "/_platform/nodes", gateway.getExecutionNodes) + // + //api.HandleAPIFunc("/ws_proxy", func(w http.ResponseWriter, req *http.Request) { + // log.Debug(req.RequestURI) + // endpoint := req.URL.Query().Get("endpoint") + // path := req.URL.Query().Get("path") + // var tlsConfig = &tls.Config{ + // InsecureSkipVerify: true, + // } + // target, err := url.Parse(endpoint) + // if err != nil { + // log.Error(err) + // return + // } + // newURL, err := url.Parse(path) + // if err != nil { + // log.Error(err) + // return + // } + // req.URL.Path = newURL.Path + // req.URL.RawPath = newURL.RawPath + // req.URL.RawQuery = "" + // req.RequestURI = req.URL.RequestURI() + // req.Header.Set("HOST", target.Host) + // req.Host = target.Host + // wsProxy := NewSingleHostReverseProxy(target) + // wsProxy.Dial = (&net.Dialer{ + // Timeout: 30 * time.Second, + // KeepAlive: 30 * time.Second, + // }).Dial + // wsProxy.TLSClientConfig = tlsConfig + // wsProxy.ServeHTTP(w, req) + //}) } \ No newline at end of file diff --git a/plugin/api/gateway/instance.go b/plugin/api/gateway/instance.go index e07ccca2..3ffe59df 100644 --- a/plugin/api/gateway/instance.go +++ b/plugin/api/gateway/instance.go @@ -1,495 +1,455 @@ -/* Copyright © INFINI Ltd. All rights reserved. - * web: https://infinilabs.com - * mail: hello#infini.ltd */ - +///* Copyright © INFINI Ltd. All rights reserved. +// * web: https://infinilabs.com +// * mail: hello#infini.ltd */ +// package gateway +// +//import ( +// "fmt" +// log "github.com/cihub/seelog" +// "github.com/segmentio/encoding/json" +// "infini.sh/console/model" +// httprouter "infini.sh/framework/core/api/router" +// elastic2 "infini.sh/framework/core/elastic" +// "infini.sh/framework/core/model" +// "infini.sh/framework/core/orm" +// "infini.sh/framework/core/proxy" +// "infini.sh/framework/core/task" +// "infini.sh/framework/core/util" +// "infini.sh/framework/modules/elastic" +// "net/http" +// "net/url" +// "strconv" +// "strings" +// "time" +//) +// -import ( - "fmt" - log "github.com/cihub/seelog" - "github.com/segmentio/encoding/json" - "infini.sh/console/model" - "infini.sh/framework/core/agent" - httprouter "infini.sh/framework/core/api/router" - elastic2 "infini.sh/framework/core/elastic" - "infini.sh/framework/core/orm" - "infini.sh/framework/core/proxy" - "infini.sh/framework/core/task" - "infini.sh/framework/core/util" - "infini.sh/framework/modules/elastic" - "net/http" - "net/url" - "strconv" - "strings" - "time" -) - -func (h *GatewayAPI) createInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - var obj = &model.Instance{} - err := h.DecodeJSON(req, obj) - if err != nil { - h.WriteError(w, err.Error(), http.StatusInternalServerError) - log.Error(err) - return - } - - res, err := h.doConnect(obj.Endpoint, obj.BasicAuth) - if err != nil { - h.WriteError(w, err.Error(), http.StatusInternalServerError) - log.Error(err) - return - } - obj.ID = res.ID - - exists, err := orm.Get(obj) - if err != nil && err != elastic.ErrNotFound { - h.WriteError(w, err.Error(), http.StatusInternalServerError) - log.Error(err) - return - } - if exists { - h.WriteError(w, "gateway instance already registered", http.StatusInternalServerError) - return - } - err = orm.Create(nil, obj) - if err != nil { - h.WriteError(w, err.Error(), http.StatusInternalServerError) - log.Error(err) - return - } - - h.WriteJSON(w, util.MapStr{ - "_id": obj.ID, - "result": "created", - }, 200) - -} - -func (h *GatewayAPI) getInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - id := ps.MustGetParameter("instance_id") - - obj := model.Instance{} - obj.ID = id - - exists, err := orm.Get(&obj) - if !exists || err != nil { - h.WriteJSON(w, util.MapStr{ - "_id": id, - "found": false, - }, http.StatusNotFound) - return - } - if err != nil { - h.WriteError(w, err.Error(), http.StatusInternalServerError) - log.Error(err) - return - } - - h.WriteJSON(w, util.MapStr{ - "found": true, - "_id": id, - "_source": obj, - }, 200) -} - -func (h *GatewayAPI) updateInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - id := ps.MustGetParameter("instance_id") - obj := model.Instance{} - - obj.ID = id - exists, err := orm.Get(&obj) - if !exists || err != nil { - h.WriteJSON(w, util.MapStr{ - "_id": id, - "result": "not_found", - }, http.StatusNotFound) - return - } - - id = obj.ID - create := obj.Created - obj = model.Instance{} - err = h.DecodeJSON(req, &obj) - if err != nil { - h.WriteError(w, err.Error(), http.StatusInternalServerError) - log.Error(err) - return - } - - //protect - obj.ID = id - obj.Created = create - err = orm.Update(nil, &obj) - if err != nil { - h.WriteError(w, err.Error(), http.StatusInternalServerError) - log.Error(err) - return - } - - h.WriteJSON(w, util.MapStr{ - "_id": obj.ID, - "result": "updated", - }, 200) -} - -func (h *GatewayAPI) deleteInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - id := ps.MustGetParameter("instance_id") - - obj := model.Instance{} - obj.ID = id - - exists, err := orm.Get(&obj) - if !exists || err != nil { - h.WriteJSON(w, util.MapStr{ - "_id": id, - "result": "not_found", - }, http.StatusNotFound) - return - } - - //check reference - query := util.MapStr{ - "size": 1, - "query": util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "metadata.labels.permit_nodes.id": util.MapStr{ - "value": id, - }, - }, - }, - { - "terms": util.MapStr{ - "metadata.type": []string{"cluster_migration", "cluster_comparison"}, - }, - }, - }, - "must_not": []util.MapStr{ - { - "terms": util.MapStr{ - "status": []string{task.StatusError, task.StatusComplete}, - }, - }, - }, - }, - }, - } - q := &orm.Query{ - RawQuery: util.MustToJSONBytes(query), - } - err, result := orm.Search(task.Task{}, q) - if err != nil { - h.WriteError(w, err.Error(), http.StatusInternalServerError) - log.Error(err) - return - } - if len(result.Result) > 0 { - var taskId interface{} - if m, ok := result.Result[0].(map[string]interface{}); ok { - taskId = m["id"] - } - h.WriteError(w, fmt.Sprintf("failed to delete gateway instance [%s] since it is used by task [%v]", id, taskId), http.StatusInternalServerError) - return - } - - err = orm.Delete(nil, &obj) - if err != nil { - h.WriteError(w, err.Error(), http.StatusInternalServerError) - log.Error(err) - return - } - - h.WriteJSON(w, util.MapStr{ - "_id": obj.ID, - "result": "deleted", - }, 200) -} - -func (h *GatewayAPI) searchInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - - var ( - keyword = h.GetParameterOrDefault(req, "keyword", "") - queryDSL = `{"query":{"bool":{"must":[%s]}}, "size": %d, "from": %d}` - strSize = h.GetParameterOrDefault(req, "size", "20") - strFrom = h.GetParameterOrDefault(req, "from", "0") - mustBuilder = &strings.Builder{} - ) - if keyword != "" { - mustBuilder.WriteString(fmt.Sprintf(`{"query_string":{"default_field":"*","query": "%s"}}`, keyword)) - } - size, _ := strconv.Atoi(strSize) - if size <= 0 { - size = 20 - } - from, _ := strconv.Atoi(strFrom) - if from < 0 { - from = 0 - } - - q := orm.Query{} - queryDSL = fmt.Sprintf(queryDSL, mustBuilder.String(), size, from) - q.RawQuery = []byte(queryDSL) - - err, res := orm.Search(&model.Instance{}, &q) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - h.Write(w, res.Raw) -} - -func (h *GatewayAPI) getInstanceStatus(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - var instanceIDs = []string{} - err := h.DecodeJSON(req, &instanceIDs) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - if len(instanceIDs) == 0 { - h.WriteJSON(w, util.MapStr{}, http.StatusOK) - return - } - q := orm.Query{} - queryDSL := util.MapStr{ - "query": util.MapStr{ - "terms": util.MapStr{ - "_id": instanceIDs, - }, - }, - } - q.RawQuery = util.MustToJSONBytes(queryDSL) - - err, res := orm.Search(&model.Instance{}, &q) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - result := util.MapStr{} - for _, item := range res.Result { - instance := util.MapStr(item.(map[string]interface{})) - if err != nil { - log.Error(err) - continue - } - endpoint, _ := instance.GetValue("endpoint") - username, _ := instance.GetValue("basic_auth.username") - if username == nil { - username = "" - } - password, _ := instance.GetValue("basic_auth.password") - if password == nil { - password = "" - } - gid, _ := instance.GetValue("id") - res, err := proxy.DoProxyRequest(&proxy.Request{ - Endpoint: endpoint.(string), - Method: http.MethodGet, - Path: "/stats", - BasicAuth: agent.BasicAuth{ - Username: username.(string), - Password: password.(string), - }, - }) - if err != nil { - log.Error(err) - result[gid.(string)] = util.MapStr{} - continue - } - var resMap = util.MapStr{} - err = util.FromJSONBytes(res.Body, &resMap) - if err != nil { - result[gid.(string)] = util.MapStr{} - log.Errorf("get stats of %v error: %v", endpoint, err) - continue - } - - result[gid.(string)] = resMap - } - h.WriteJSON(w, result, http.StatusOK) -} -func (h *GatewayAPI) proxy(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - var ( - method = h.Get(req, "method", "GET") - path = h.Get(req, "path", "") - ) - instanceID := ps.MustGetParameter("instance_id") - - obj := model.Instance{} - obj.ID = instanceID - - exists, err := orm.Get(&obj) - if err != nil { - h.WriteError(w, err.Error(), http.StatusInternalServerError) - log.Error(err) - return - } - if !exists { - h.WriteJSON(w, util.MapStr{ - "error": "gateway instance not found", - }, http.StatusNotFound) - return - } - res, err := proxy.DoProxyRequest(&proxy.Request{ - Method: method, - Endpoint: obj.Endpoint, - Path: path, - Body: req.Body, - BasicAuth: obj.BasicAuth, - ContentLength: int(req.ContentLength), - }) - if err != nil { - h.WriteError(w, err.Error(), http.StatusInternalServerError) - log.Error(err) - return - } - h.WriteHeader(w, res.StatusCode) - h.Write(w, res.Body) -} - -type GatewayConnectResponse struct { - ID string `json:"id"` - Name string `json:"name"` - Tagline string `json:"tagline"` - Version struct { - BuildDate string `json:"build_date"` - BuildHash string `json:"build_hash"` - EOLDate string `json:"eol_date"` - Number string `json:"number"` - } `json:"version"` - -} -func (h *GatewayAPI) doConnect(endpoint string, basicAuth agent.BasicAuth) (*GatewayConnectResponse, error) { - res, err := proxy.DoProxyRequest(&proxy.Request{ - Method: http.MethodGet, - Endpoint: endpoint, - Path: "/_framework/api/_info", - BasicAuth: basicAuth, - }) - if err != nil { - return nil, err - } - if res.StatusCode == http.StatusNotFound { - return nil, fmt.Errorf("unknow gateway version") - } - b := res.Body - gres := &GatewayConnectResponse{} - err = json.Unmarshal(b, gres) - return gres, err - -} - -func (h *GatewayAPI) tryConnect(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - var reqBody = struct { - Endpoint string `json:"endpoint"` - BasicAuth agent.BasicAuth - }{} - err := h.DecodeJSON(req, &reqBody) - if err != nil { - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - connectRes, err := h.doConnect(reqBody.Endpoint, reqBody.BasicAuth) - if err != nil { - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - h.WriteJSON(w, connectRes, http.StatusOK) -} - -func (h *GatewayAPI) getExecutionNodes(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ - var ( - keyword = h.GetParameterOrDefault(req, "keyword", "") - strSize = h.GetParameterOrDefault(req, "size", "10") - strFrom = h.GetParameterOrDefault(req, "from", "0") - ) - size, _ := strconv.Atoi(strSize) - if size <= 0 { - size = 10 - } - from, _ := strconv.Atoi(strFrom) - if from < 0 { - from = 0 - } - gatewayIndexName := orm.GetIndexName(model.Instance{}) - - query := util.MapStr{ - "size": size, - "from": from, - "sort": []util.MapStr{ - { - "created": util.MapStr{ - "order": "desc", - }, - }, - }, - } - if keyword != "" { - query["query"] = util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "prefix": util.MapStr{ - "name": util.MapStr{ - "value": keyword, - }, - }, - }, - }, - }, - } - } - q := orm.Query{ - IndexName: gatewayIndexName, - RawQuery: util.MustToJSONBytes(query), - } - err, result := orm.Search(nil, &q) - if err != nil { - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - - searchRes := elastic2.SearchResponse{} - err = util.FromJSONBytes(result.Raw, &searchRes) - if err != nil||searchRes.ESError!=nil { - msg:=fmt.Sprintf("%v,%v",err,searchRes.ESError) - h.WriteError(w, msg, http.StatusInternalServerError) - return - } - var nodes = []util.MapStr{} - - for _, hit := range searchRes.Hits.Hits { - buf := util.MustToJSONBytes(hit.Source) - inst := model.Instance{} - err = util.FromJSONBytes(buf, &inst) - if err != nil { - log.Error(err) - continue - } - node := util.MapStr{ - "id": inst.ID, - "name": inst.Name, - "available": false, - "type": "gateway", - } - ul, err := url.Parse(inst.Endpoint) - if err != nil { - log.Error(err) - continue - } - node["host"] = ul.Host - err = inst.TryConnectWithTimeout(time.Second) - if err != nil { - log.Error(err) - }else{ - node["available"] = true - } - - nodes = append(nodes, node) - } - h.WriteJSON(w, nodes, http.StatusOK) -} +//func (h *GatewayAPI) getInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { +// id := ps.MustGetParameter("instance_id") +// +// obj := model.Instance{} +// obj.ID = id +// +// exists, err := orm.Get(&obj) +// if !exists || err != nil { +// h.WriteJSON(w, util.MapStr{ +// "_id": id, +// "found": false, +// }, http.StatusNotFound) +// return +// } +// if err != nil { +// h.WriteError(w, err.Error(), http.StatusInternalServerError) +// log.Error(err) +// return +// } +// +// h.WriteJSON(w, util.MapStr{ +// "found": true, +// "_id": id, +// "_source": obj, +// }, 200) +//} +// +//func (h *GatewayAPI) updateInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { +// id := ps.MustGetParameter("instance_id") +// obj := model.Instance{} +// +// obj.ID = id +// exists, err := orm.Get(&obj) +// if !exists || err != nil { +// h.WriteJSON(w, util.MapStr{ +// "_id": id, +// "result": "not_found", +// }, http.StatusNotFound) +// return +// } +// +// id = obj.ID +// create := obj.Created +// obj = model.Instance{} +// err = h.DecodeJSON(req, &obj) +// if err != nil { +// h.WriteError(w, err.Error(), http.StatusInternalServerError) +// log.Error(err) +// return +// } +// +// //protect +// obj.ID = id +// obj.Created = create +// err = orm.Update(nil, &obj) +// if err != nil { +// h.WriteError(w, err.Error(), http.StatusInternalServerError) +// log.Error(err) +// return +// } +// +// h.WriteJSON(w, util.MapStr{ +// "_id": obj.ID, +// "result": "updated", +// }, 200) +//} +// +//func (h *GatewayAPI) deleteInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { +// id := ps.MustGetParameter("instance_id") +// +// obj := model.Instance{} +// obj.ID = id +// +// exists, err := orm.Get(&obj) +// if !exists || err != nil { +// h.WriteJSON(w, util.MapStr{ +// "_id": id, +// "result": "not_found", +// }, http.StatusNotFound) +// return +// } +// +// //check reference +// query := util.MapStr{ +// "size": 1, +// "query": util.MapStr{ +// "bool": util.MapStr{ +// "must": []util.MapStr{ +// { +// "term": util.MapStr{ +// "metadata.labels.permit_nodes.id": util.MapStr{ +// "value": id, +// }, +// }, +// }, +// { +// "terms": util.MapStr{ +// "metadata.type": []string{"cluster_migration", "cluster_comparison"}, +// }, +// }, +// }, +// "must_not": []util.MapStr{ +// { +// "terms": util.MapStr{ +// "status": []string{task.StatusError, task.StatusComplete}, +// }, +// }, +// }, +// }, +// }, +// } +// q := &orm.Query{ +// RawQuery: util.MustToJSONBytes(query), +// } +// err, result := orm.Search(task.Task{}, q) +// if err != nil { +// h.WriteError(w, err.Error(), http.StatusInternalServerError) +// log.Error(err) +// return +// } +// if len(result.Result) > 0 { +// var taskId interface{} +// if m, ok := result.Result[0].(map[string]interface{}); ok { +// taskId = m["id"] +// } +// h.WriteError(w, fmt.Sprintf("failed to delete gateway instance [%s] since it is used by task [%v]", id, taskId), http.StatusInternalServerError) +// return +// } +// +// err = orm.Delete(nil, &obj) +// if err != nil { +// h.WriteError(w, err.Error(), http.StatusInternalServerError) +// log.Error(err) +// return +// } +// +// h.WriteJSON(w, util.MapStr{ +// "_id": obj.ID, +// "result": "deleted", +// }, 200) +//} +// +//func (h *GatewayAPI) searchInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { +// +// var ( +// keyword = h.GetParameterOrDefault(req, "keyword", "") +// queryDSL = `{"query":{"bool":{"must":[%s]}}, "size": %d, "from": %d}` +// strSize = h.GetParameterOrDefault(req, "size", "20") +// strFrom = h.GetParameterOrDefault(req, "from", "0") +// mustBuilder = &strings.Builder{} +// ) +// if keyword != "" { +// mustBuilder.WriteString(fmt.Sprintf(`{"query_string":{"default_field":"*","query": "%s"}}`, keyword)) +// } +// size, _ := strconv.Atoi(strSize) +// if size <= 0 { +// size = 20 +// } +// from, _ := strconv.Atoi(strFrom) +// if from < 0 { +// from = 0 +// } +// +// q := orm.Query{} +// queryDSL = fmt.Sprintf(queryDSL, mustBuilder.String(), size, from) +// q.RawQuery = []byte(queryDSL) +// +// err, res := orm.Search(&model.Instance{}, &q) +// if err != nil { +// log.Error(err) +// h.WriteError(w, err.Error(), http.StatusInternalServerError) +// return +// } +// h.Write(w, res.Raw) +//} +// +//func (h *GatewayAPI) getInstanceStatus(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { +// var instanceIDs = []string{} +// err := h.DecodeJSON(req, &instanceIDs) +// if err != nil { +// log.Error(err) +// h.WriteError(w, err.Error(), http.StatusInternalServerError) +// return +// } +// if len(instanceIDs) == 0 { +// h.WriteJSON(w, util.MapStr{}, http.StatusOK) +// return +// } +// q := orm.Query{} +// queryDSL := util.MapStr{ +// "query": util.MapStr{ +// "terms": util.MapStr{ +// "_id": instanceIDs, +// }, +// }, +// } +// q.RawQuery = util.MustToJSONBytes(queryDSL) +// +// err, res := orm.Search(&model.Instance{}, &q) +// if err != nil { +// log.Error(err) +// h.WriteError(w, err.Error(), http.StatusInternalServerError) +// return +// } +// result := util.MapStr{} +// for _, item := range res.Result { +// instance := util.MapStr(item.(map[string]interface{})) +// if err != nil { +// log.Error(err) +// continue +// } +// endpoint, _ := instance.GetValue("endpoint") +// username, _ := instance.GetValue("basic_auth.username") +// if username == nil { +// username = "" +// } +// password, _ := instance.GetValue("basic_auth.password") +// if password == nil { +// password = "" +// } +// gid, _ := instance.GetValue("id") +// res, err := proxy.DoProxyRequest(&proxy.Request{ +// Endpoint: endpoint.(string), +// Method: http.MethodGet, +// Path: "/stats", +// BasicAuth: instance.BasicAuth{ +// Username: username.(string), +// Password: password.(string), +// }, +// }) +// if err != nil { +// log.Error(err) +// result[gid.(string)] = util.MapStr{} +// continue +// } +// var resMap = util.MapStr{} +// err = util.FromJSONBytes(res.Body, &resMap) +// if err != nil { +// result[gid.(string)] = util.MapStr{} +// log.Errorf("get stats of %v error: %v", endpoint, err) +// continue +// } +// +// result[gid.(string)] = resMap +// } +// h.WriteJSON(w, result, http.StatusOK) +//} +//func (h *GatewayAPI) proxy(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { +// var ( +// method = h.Get(req, "method", "GET") +// path = h.Get(req, "path", "") +// ) +// instanceID := ps.MustGetParameter("instance_id") +// +// obj := model.Instance{} +// obj.ID = instanceID +// +// exists, err := orm.Get(&obj) +// if err != nil { +// h.WriteError(w, err.Error(), http.StatusInternalServerError) +// log.Error(err) +// return +// } +// if !exists { +// h.WriteJSON(w, util.MapStr{ +// "error": "gateway instance not found", +// }, http.StatusNotFound) +// return +// } +// res, err := proxy.DoProxyRequest(&proxy.Request{ +// Method: method, +// Endpoint: obj.Endpoint, +// Path: path, +// Body: req.Body, +// BasicAuth: obj.BasicAuth, +// ContentLength: int(req.ContentLength), +// }) +// if err != nil { +// h.WriteError(w, err.Error(), http.StatusInternalServerError) +// log.Error(err) +// return +// } +// h.WriteHeader(w, res.StatusCode) +// h.Write(w, res.Body) +//} +// +//type GatewayConnectResponse struct { +// ID string `json:"id"` +// Name string `json:"name"` +// Tagline string `json:"tagline"` +// Version struct { +// BuildDate string `json:"build_date"` +// BuildHash string `json:"build_hash"` +// EOLDate string `json:"eol_date"` +// Number string `json:"number"` +// } `json:"version"` +// +//} +//func (h *GatewayAPI) doConnect(endpoint string, basicAuth model.BasicAuth) (*GatewayConnectResponse, error) { +// res, err := proxy.DoProxyRequest(&proxy.Request{ +// Method: http.MethodGet, +// Endpoint: endpoint, +// Path: "/_info", +// BasicAuth: basicAuth, +// }) +// if err != nil { +// return nil, err +// } +// if res.StatusCode == http.StatusNotFound { +// return nil, fmt.Errorf("unknow gateway version") +// } +// b := res.Body +// gres := &GatewayConnectResponse{} +// err = json.Unmarshal(b, gres) +// return gres, err +// +//} +// +//func (h *GatewayAPI) tryConnect(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { +// var reqBody = struct { +// Endpoint string `json:"endpoint"` +// BasicAuth model.BasicAuth +// }{} +// err := h.DecodeJSON(req, &reqBody) +// if err != nil { +// h.WriteError(w, err.Error(), http.StatusInternalServerError) +// return +// } +// connectRes, err := h.doConnect(reqBody.Endpoint, reqBody.BasicAuth) +// if err != nil { +// h.WriteError(w, err.Error(), http.StatusInternalServerError) +// return +// } +// h.WriteJSON(w, connectRes, http.StatusOK) +//} +// +//func (h *GatewayAPI) getExecutionNodes(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ +// var ( +// keyword = h.GetParameterOrDefault(req, "keyword", "") +// strSize = h.GetParameterOrDefault(req, "size", "10") +// strFrom = h.GetParameterOrDefault(req, "from", "0") +// ) +// size, _ := strconv.Atoi(strSize) +// if size <= 0 { +// size = 10 +// } +// from, _ := strconv.Atoi(strFrom) +// if from < 0 { +// from = 0 +// } +// gatewayIndexName := orm.GetIndexName(model.Instance{}) +// +// query := util.MapStr{ +// "size": size, +// "from": from, +// "sort": []util.MapStr{ +// { +// "created": util.MapStr{ +// "order": "desc", +// }, +// }, +// }, +// } +// if keyword != "" { +// query["query"] = util.MapStr{ +// "bool": util.MapStr{ +// "must": []util.MapStr{ +// { +// "prefix": util.MapStr{ +// "name": util.MapStr{ +// "value": keyword, +// }, +// }, +// }, +// }, +// }, +// } +// } +// q := orm.Query{ +// IndexName: gatewayIndexName, +// RawQuery: util.MustToJSONBytes(query), +// } +// err, result := orm.Search(nil, &q) +// if err != nil { +// h.WriteError(w, err.Error(), http.StatusInternalServerError) +// return +// } +// +// searchRes := elastic2.SearchResponse{} +// err = util.FromJSONBytes(result.Raw, &searchRes) +// if err != nil||searchRes.ESError!=nil { +// msg:=fmt.Sprintf("%v,%v",err,searchRes.ESError) +// h.WriteError(w, msg, http.StatusInternalServerError) +// return +// } +// var nodes = []util.MapStr{} +// +// for _, hit := range searchRes.Hits.Hits { +// buf := util.MustToJSONBytes(hit.Source) +// inst := model.Instance{} +// err = util.FromJSONBytes(buf, &inst) +// if err != nil { +// log.Error(err) +// continue +// } +// node := util.MapStr{ +// "id": inst.ID, +// "name": inst.Name, +// "available": false, +// "type": "gateway", +// } +// ul, err := url.Parse(inst.Endpoint) +// if err != nil { +// log.Error(err) +// continue +// } +// node["host"] = ul.Host +// err = inst.TryConnectWithTimeout(time.Second) +// if err != nil { +// log.Error(err) +// }else{ +// node["available"] = true +// } +// +// nodes = append(nodes, node) +// } +// h.WriteJSON(w, nodes, http.StatusOK) +//} diff --git a/plugin/api/gateway/websocket_proxy.go b/plugin/api/gateway/websocket_proxy.go deleted file mode 100644 index 99fe929e..00000000 --- a/plugin/api/gateway/websocket_proxy.go +++ /dev/null @@ -1,186 +0,0 @@ -/* Copyright © INFINI Ltd. All rights reserved. - * Web: https://infinilabs.com - * Email: hello#infini.ltd */ - -package gateway - -import ( - "crypto/tls" - "io" - log "github.com/cihub/seelog" - "net" - "net/http" - "net/url" - "strings" -) - -// ReverseProxy is a WebSocket reverse proxy. It will not work with a regular -// HTTP request, so it is the caller's responsiblity to ensure the incoming -// request is a WebSocket request. -type ReverseProxy struct { - // Director must be a function which modifies - // the request into a new request to be sent - // using Transport. Its response is then copied - // back to the original client unmodified. - Director func(*http.Request) - - // Dial specifies the dial function for dialing the proxied - // server over tcp. - // If Dial is nil, net.Dial is used. - Dial func(network, addr string) (net.Conn, error) - - // TLSClientConfig specifies the TLS configuration to use for 'wss'. - // If nil, the default configuration is used. - TLSClientConfig *tls.Config - - // ErrorLog specifies an optional logger for errors - // that occur when attempting to proxy the request. - // If nil, logging goes to os.Stderr via the log package's - // standard logger. - ErrorLog *log.LoggerInterface -} - -// stolen from net/http/httputil. singleJoiningSlash ensures that the route -// '/a/' joined with '/b' becomes '/a/b'. -func singleJoiningSlash(a, b string) string { - aslash := strings.HasSuffix(a, "/") - bslash := strings.HasPrefix(b, "/") - switch { - case aslash && bslash: - return a + b[1:] - case !aslash && !bslash: - return a + "/" + b - } - return a + b -} - -// NewSingleHostReverseProxy returns a new websocket ReverseProxy. The path -// rewrites follow the same rules as the httputil.ReverseProxy. If the target -// url has the path '/foo' and the incoming request '/bar', the request path -// will be updated to '/foo/bar' before forwarding. -// Scheme should specify if 'ws' or 'wss' should be used. -func NewSingleHostReverseProxy(target *url.URL) *ReverseProxy { - targetQuery := target.RawQuery - director := func(req *http.Request) { - req.URL.Scheme = target.Scheme - req.URL.Host = target.Host - req.URL.Path = singleJoiningSlash(target.Path, req.URL.Path) - if targetQuery == "" || req.URL.RawQuery == "" { - req.URL.RawQuery = targetQuery + req.URL.RawQuery - } else { - req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery - } - } - return &ReverseProxy{Director: director} -} - -// Function to implement the http.Handler interface. -func (p *ReverseProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { - logFunc := log.Errorf - - if !IsWebSocketRequest(r) { - http.Error(w, "Cannot handle non-WebSocket requests", 500) - logFunc("Received a request that was not a WebSocket request") - return - } - - outreq := new(http.Request) - // shallow copying - *outreq = *r - p.Director(outreq) - host := outreq.URL.Host - - if clientIP, _, err := net.SplitHostPort(r.RemoteAddr); err == nil { - // If we aren't the first proxy retain prior - // X-Forwarded-For information as a comma+space - // separated list and fold multiple headers into one. - if prior, ok := outreq.Header["X-Forwarded-For"]; ok { - clientIP = strings.Join(prior, ", ") + ", " + clientIP - } - outreq.Header.Set("X-Forwarded-For", clientIP) - } - - dial := p.Dial - if dial == nil { - dial = net.Dial - } - - // if host does not specify a port, use the default http port - if !strings.Contains(host, ":") { - if outreq.URL.Scheme == "wss" { - host = host + ":443" - } else { - host = host + ":80" - } - } - - if outreq.URL.Scheme == "wss" { - var tlsConfig *tls.Config - if p.TLSClientConfig == nil { - tlsConfig = &tls.Config{} - } else { - tlsConfig = p.TLSClientConfig - } - dial = func(network, address string) (net.Conn, error) { - return tls.Dial("tcp", host, tlsConfig) - } - } - - d, err := dial("tcp", host) - if err != nil { - http.Error(w, "Error forwarding request.", 500) - logFunc("Error dialing websocket backend %s: %v", outreq.URL, err) - return - } - // All request generated by the http package implement this interface. - hj, ok := w.(http.Hijacker) - if !ok { - http.Error(w, "Not a hijacker?", 500) - return - } - // Hijack() tells the http package not to do anything else with the connection. - // After, it bcomes this functions job to manage it. `nc` is of type *net.Conn. - nc, _, err := hj.Hijack() - if err != nil { - logFunc("Hijack error: %v", err) - return - } - defer nc.Close() // must close the underlying net connection after hijacking - defer d.Close() - - // write the modified incoming request to the dialed connection - err = outreq.Write(d) - if err != nil { - logFunc("Error copying request to target: %v", err) - return - } - errc := make(chan error, 2) - cp := func(dst io.Writer, src io.Reader) { - _, err := io.Copy(dst, src) - errc <- err - } - go cp(d, nc) - go cp(nc, d) - <-errc -} - -// IsWebSocketRequest returns a boolean indicating whether the request has the -// headers of a WebSocket handshake request. -func IsWebSocketRequest(r *http.Request) bool { - contains := func(key, val string) bool { - vv := strings.Split(r.Header.Get(key), ",") - for _, v := range vv { - if val == strings.ToLower(strings.TrimSpace(v)) { - return true - } - } - return false - } - if !contains("Connection", "upgrade") { - return false - } - if !contains("Upgrade", "websocket") { - return false - } - return true -} diff --git a/plugin/setup/setup.go b/plugin/setup/setup.go index 7cd0f6e0..b746a653 100644 --- a/plugin/setup/setup.go +++ b/plugin/setup/setup.go @@ -495,10 +495,6 @@ func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps http } } - if err != nil { - panic(err) - } - //处理索引 elastic2.InitSchema() //init security @@ -573,7 +569,7 @@ func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps http _, err = util.FilePutContent(file, fmt.Sprintf("configs.template:\n - name: \"system\"\n path: ./config/system_config.tpl\n variable:\n "+ "CLUSTER_ID: %v\n CLUSTER_ENDPINT: \"%v\"\n "+ "CLUSTER_USER: \"%v\"\n CLUSTER_VER: \"%v\"\n CLUSTER_DISTRIBUTION: \"%v\"\n INDEX_PREFIX: \"%v\"", - GlobalSystemElasticsearchID, cfg.Endpoint, cfg.BasicAuth.Username, cfg.Version, cfg.Distribution, cfg1.IndexPrefix)) + GlobalSystemElasticsearchID, cfg.GetAnyEndpoint(), cfg.BasicAuth.Username, cfg.Version, cfg.Distribution, cfg1.IndexPrefix)) if err != nil { panic(err) } diff --git a/plugin/task_manager/common_api.go b/plugin/task_manager/common_api.go index 08241dda..087f6ce3 100644 --- a/plugin/task_manager/common_api.go +++ b/plugin/task_manager/common_api.go @@ -3,6 +3,7 @@ package task_manager import ( "errors" "fmt" + model2 "infini.sh/console/model" "net/http" "strconv" "strings" @@ -10,7 +11,6 @@ import ( log "github.com/cihub/seelog" - "infini.sh/console/model" migration_util "infini.sh/console/plugin/task_manager/util" httprouter "infini.sh/framework/core/api/router" @@ -560,7 +560,7 @@ func (h *APIHandler) getChildPipelineInfosFromGateway(pipelineTaskIDs map[string var err error for instID, taskIDs := range pipelineTaskIDs { - inst := &model.Instance{} + inst := &model2.TaskWorker{} inst.ID = instID _, err = orm.Get(inst) if err != nil { diff --git a/plugin/task_manager/model/scheduler.go b/plugin/task_manager/model/scheduler.go index d1526f5f..cd595ae2 100644 --- a/plugin/task_manager/model/scheduler.go +++ b/plugin/task_manager/model/scheduler.go @@ -2,13 +2,12 @@ package model import ( "errors" - "infini.sh/console/model" ) type Scheduler interface { - GetPreferenceInstance(config ExecutionConfig) (instance *model.Instance, err error) - GetInstance(instanceID string) (instance *model.Instance, err error) + GetPreferenceInstance(config ExecutionConfig) (instance *model.TaskWorker, err error) + GetInstance(instanceID string) (instance *model.TaskWorker, err error) IncrInstanceJobs(instanceID string) DecrInstanceJobs(instanceID string) RefreshInstanceJobsFromES() error diff --git a/plugin/task_manager/pipeline_task/pipeline_task.go b/plugin/task_manager/pipeline_task/pipeline_task.go index 0c8e0133..6b41326e 100644 --- a/plugin/task_manager/pipeline_task/pipeline_task.go +++ b/plugin/task_manager/pipeline_task/pipeline_task.go @@ -3,13 +3,13 @@ package pipeline_task import ( "errors" "fmt" + "infini.sh/console/model" "strconv" "strings" "time" log "github.com/cihub/seelog" - "infini.sh/console/model" migration_model "infini.sh/console/plugin/task_manager/model" migration_util "infini.sh/console/plugin/task_manager/util" @@ -169,7 +169,7 @@ func (p *processor) handlePendingStopPipelineTask(taskItem *task.Task) error { return nil } -func (p *processor) cleanGatewayPipeline(taskItem *task.Task) (instance *model.Instance, err error) { +func (p *processor) cleanGatewayPipeline(taskItem *task.Task) (instance *model.TaskWorker, err error) { instance, err = p.getPipelineExecutionInstance(taskItem) if err != nil { log.Errorf("failed to get execution instance for task [%s], err: %v", taskItem.ID, err) @@ -184,7 +184,7 @@ func (p *processor) cleanGatewayPipeline(taskItem *task.Task) (instance *model.I return instance, nil } -func (p *processor) getPipelineExecutionInstance(taskItem *task.Task) (*model.Instance, error) { +func (p *processor) getPipelineExecutionInstance(taskItem *task.Task) (*model.TaskWorker, error) { instanceID, _ := util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"]) instance, err := p.scheduler.GetInstance(instanceID) if err != nil { diff --git a/plugin/task_manager/scheduler/scheduler.go b/plugin/task_manager/scheduler/scheduler.go index 1d7413c4..66ebea41 100644 --- a/plugin/task_manager/scheduler/scheduler.go +++ b/plugin/task_manager/scheduler/scheduler.go @@ -53,7 +53,7 @@ func NewScheduler(elasticsearch, indexName string, checkInstanceAvailable bool, return scheduler, nil } -func (p *scheduler) GetPreferenceInstance(config migration_model.ExecutionConfig) (*model.Instance, error) { +func (p *scheduler) GetPreferenceInstance(config migration_model.ExecutionConfig) (*model.TaskWorker, error) { var ( err error minID string @@ -64,7 +64,7 @@ func (p *scheduler) GetPreferenceInstance(config migration_model.ExecutionConfig instanceTotal := p.getInstanceState(node.ID).Total if instanceTotal < minTotal { if p.CheckInstanceAvailable { - tempInst := model.Instance{} + tempInst := model.TaskWorker{} tempInst.ID = node.ID _, err = orm.Get(&tempInst) if err != nil { @@ -95,11 +95,11 @@ func (p *scheduler) GetPreferenceInstance(config migration_model.ExecutionConfig return instance, nil } -func (p *scheduler) GetInstance(instanceID string) (*model.Instance, error) { +func (p *scheduler) GetInstance(instanceID string) (*model.TaskWorker, error) { if instanceID == "" { return nil, errors.New("invalid instanceID") } - instance := model.Instance{} + instance := model.TaskWorker{} instance.ID = instanceID _, err := orm.Get(&instance) @@ -114,7 +114,7 @@ func (p *scheduler) GetInstance(instanceID string) (*model.Instance, error) { return &instance, nil } -func (p *scheduler) initializeInstance(instance *model.Instance) error { +func (p *scheduler) initializeInstance(instance *model.TaskWorker) error { lastInitializedAt := p.getLastInitializedAt(instance.ID) if time.Now().Sub(lastInitializedAt) < initializeInterval { return nil @@ -162,7 +162,7 @@ func (p *scheduler) initializeInstance(instance *model.Instance) error { // user could change the following configurations manually: // - input_queue (metrics.logging_queue) // - elasticsearch (elasticsearch.name) -func (p *scheduler) createPipelineLoggingMerge(instance *model.Instance) error { +func (p *scheduler) createPipelineLoggingMerge(instance *model.TaskWorker) error { cfg := &migration_model.PipelineTaskConfig{ Name: "pipeline_logging_merge", AutoStart: true, @@ -194,7 +194,7 @@ func (p *scheduler) createPipelineLoggingMerge(instance *model.Instance) error { return nil } -func (p *scheduler) createIngestPipelineLogging(instance *model.Instance) error { +func (p *scheduler) createIngestPipelineLogging(instance *model.TaskWorker) error { cfg := &migration_model.PipelineTaskConfig{ Name: "ingest_pipeline_logging", AutoStart: true, diff --git a/service/alerting/env.go b/service/alerting/env.go index b1f3c340..876ad91b 100644 --- a/service/alerting/env.go +++ b/service/alerting/env.go @@ -6,8 +6,8 @@ package alerting import ( "fmt" - "infini.sh/framework/core/config" config2 "infini.sh/console/config" + "infini.sh/framework/core/config" "infini.sh/framework/core/env" "infini.sh/framework/core/global" "infini.sh/framework/core/kv" @@ -53,6 +53,6 @@ func GetInnerConsoleEndpoint() (string, error){ if !ok { return "", fmt.Errorf("web config not exists") } - endpoint := fmt.Sprintf("%s://%s", appConfig.GetSchema(), appConfig.Network.GetPublishAddr()) + endpoint := appConfig.GetEndpoint() return endpoint, nil }