diff --git a/bootstrap_check.go b/bootstrap_check.go index ac7792cc..fd0353e0 100644 --- a/bootstrap_check.go +++ b/bootstrap_check.go @@ -14,7 +14,7 @@ import ( "infini.sh/framework/core/util" ) -func bootstrapRequirementCheck() error{ +func bootstrapRequirementCheck() error { err := checkElasticsearchRequirements() if err != nil { return err @@ -22,8 +22,7 @@ func bootstrapRequirementCheck() error{ return nil } - -func checkElasticsearchRequirements() error{ +func checkElasticsearchRequirements() error { log.Trace("start to check system cluster requirement") var esConfigs = []elastic.ElasticsearchConfig{} ok, err := env.ParseConfig("elasticsearch", &esConfigs) @@ -34,17 +33,17 @@ func checkElasticsearchRequirements() error{ return fmt.Errorf("elasticsearch config section not found") } - elasticsearchID:=global.Lookup(elastic.GlobalSystemElasticsearchID) + elasticsearchID := global.Lookup(elastic.GlobalSystemElasticsearchID) - if elasticsearchID == nil||elasticsearchID=="" { + if elasticsearchID == nil || elasticsearchID == "" { return fmt.Errorf("cluster config in web section can not be empty") } - esID:=elasticsearchID.(string) + esID := elasticsearchID.(string) var targetEsConfig *elastic.ElasticsearchConfig for _, esConfig := range esConfigs { - if esConfig.ID == esID||(esConfig.ID==""&&esConfig.Name==esID) { + if esConfig.ID == esID || (esConfig.ID == "" && esConfig.Name == esID) { targetEsConfig = &esConfig } } @@ -54,7 +53,7 @@ func checkElasticsearchRequirements() error{ } var req = util.NewGetRequest(targetEsConfig.GetAnyEndpoint(), nil) if targetEsConfig.BasicAuth != nil { - req.SetBasicAuth(targetEsConfig.BasicAuth.Username, targetEsConfig.BasicAuth.Password) + req.SetBasicAuth(targetEsConfig.BasicAuth.Username, targetEsConfig.BasicAuth.Password.Get()) } result, err := util.ExecuteRequest(req) @@ -62,11 +61,10 @@ func checkElasticsearchRequirements() error{ return fmt.Errorf("check system cluster requirement error: %v", err) } - if result==nil||result.Body==nil||len(result.Body)==0{ + if result == nil || result.Body == nil || len(result.Body) == 0 { return fmt.Errorf("failed to retrive cluster version info") } - versionNumber, err := jsonparser.GetString(result.Body, "version", "number") if err != nil { return fmt.Errorf("check system cluster requirement error: %v, got response: %s", err, string(result.Body)) @@ -78,11 +76,11 @@ func checkElasticsearchRequirements() error{ return fmt.Errorf("unkonw cluster distribution: %v", distribution) } cr, err := util.VersionCompare(versionNumber, "5.3") - if err !=nil { + if err != nil { return fmt.Errorf("check system cluster requirement error: %v", err) } if cr == -1 { return fmt.Errorf("system cluster version with distribution elasticsearch required to be version 5.3 and above, but got %s", versionNumber) } return nil -} \ No newline at end of file +} diff --git a/config/generated.go b/config/generated.go index 09faae8a..f52e02d3 100644 --- a/config/generated.go +++ b/config/generated.go @@ -1,10 +1,10 @@ package config -const LastCommitLog = "48882e67badf2813406d1b9bdc65c20f22c0f8fd" -const BuildDate = "2024-03-20T02:20:55Z" +const LastCommitLog = "N/A" +const BuildDate = "N/A" -const EOLDate = "2024-12-31T10:10:10Z" +const EOLDate = "N/A" -const Version = "1.0.0_SNAPSHOT" +const Version = "0.0.1-SNAPSHOT" -const BuildNumber = "001" +const BuildNumber = "001" diff --git a/model/instance.go b/model/instance.go index 46c8c6f9..9ec9f8f4 100644 --- a/model/instance.go +++ b/model/instance.go @@ -15,6 +15,7 @@ import ( "infini.sh/framework/core/util" "infini.sh/framework/modules/pipeline" ) + type TaskWorker struct { model.Instance } @@ -134,7 +135,7 @@ func (inst *TaskWorker) TryConnectWithTimeout(duration time.Duration) error { func (inst *TaskWorker) doRequest(req *util.Request, resBody interface{}) error { if inst.BasicAuth != nil && inst.BasicAuth.Username != "" { - req.SetBasicAuth(inst.BasicAuth.Username, inst.BasicAuth.Password) + req.SetBasicAuth(inst.BasicAuth.Username, inst.BasicAuth.Password.Get()) } result, err := util.ExecuteRequest(req) if err != nil { diff --git a/modules/agent/api/elasticsearch.go b/modules/agent/api/elasticsearch.go index 01806fb0..40786123 100644 --- a/modules/agent/api/elasticsearch.go +++ b/modules/agent/api/elasticsearch.go @@ -25,7 +25,7 @@ import ( "time" ) -//node -> binding item +// node -> binding item func GetEnrolledNodesByAgent(instanceID string) (map[string]BindingItem, error) { //get nodes settings where agent id = instance id @@ -177,7 +177,7 @@ func refreshNodesInfo(instanceID, instanceEndpoint string) (*elastic.DiscoveryRe return nodesInfo, nil } -//get nodes info via agent +// get nodes info via agent func GetElasticsearchNodesViaAgent(ctx context.Context, endpoint string) (*elastic.DiscoveryResult, error) { req := &util.Request{ Method: http.MethodGet, @@ -186,7 +186,7 @@ func GetElasticsearchNodesViaAgent(ctx context.Context, endpoint string) (*elast } obj := elastic.DiscoveryResult{} - _, err := server.ProxyAgentRequest("elasticsearch",endpoint, req, &obj) + _, err := server.ProxyAgentRequest("elasticsearch", endpoint, req, &obj) if err != nil { return nil, err } @@ -218,7 +218,7 @@ func GetElasticLogFiles(ctx context.Context, instance *model.Instance, logsPath } resBody := map[string]interface{}{} - _, err := server.ProxyAgentRequest("elasticsearch",instance.GetEndpoint(), req, &resBody) + _, err := server.ProxyAgentRequest("elasticsearch", instance.GetEndpoint(), req, &resBody) if err != nil { return nil, err } @@ -237,7 +237,7 @@ func GetElasticLogFileContent(ctx context.Context, instance *model.Instance, bod Body: util.MustToJSONBytes(body), } resBody := map[string]interface{}{} - _, err := server.ProxyAgentRequest("elasticsearch",instance.GetEndpoint(), req, &resBody) + _, err := server.ProxyAgentRequest("elasticsearch", instance.GetEndpoint(), req, &resBody) if err != nil { return nil, err } @@ -319,7 +319,7 @@ func (h *APIHandler) getLogFileContent(w http.ResponseWriter, req *http.Request, h.WriteJSON(w, res, http.StatusOK) } -//instance, pathLogs +// instance, pathLogs func getAgentByNodeID(clusterID, nodeID string) (*model.Instance, string, error) { q := orm.Query{ @@ -457,10 +457,10 @@ func (h *APIHandler) autoEnrollESNode(w http.ResponseWriter, req *http.Request, log.Infof("instance:%v,%v, success enroll %v nodes", instanceID, instanceEndpoint, len(pids)) } - if len(nodes.Nodes)>0{ - for k,v:=range nodes.Nodes{ - log.Debug(k,v.Status,v.Enrolled) - if !v.Enrolled{ + if len(nodes.Nodes) > 0 { + for k, v := range nodes.Nodes { + log.Debug(k, v.Status, v.Enrolled) + if !v.Enrolled { pids := h.bindInstanceToCluster(clusterInfo, nodes, instanceID, instanceEndpoint) log.Infof("instance:%v,%v, success enroll %v nodes", instanceID, instanceEndpoint, len(pids)) } @@ -554,13 +554,13 @@ func (h *APIHandler) bindInstanceToCluster(clusterInfo ClusterInfo, nodes *elast panic(err) } - for _,v:=range nodes.Nodes{ - if !v.Enrolled{ - if v.NodeInfo!=nil{ - pid:=v.NodeInfo.Process.Id - nodeHost:=v.NodeInfo.GetHttpPublishHost() - nodeInfo:=h.internalProcessBind(clusterID,clusterUUID,instanceID,instanceEndpoint,pid,nodeHost,auth) - if nodeInfo!=nil{ + for _, v := range nodes.Nodes { + if !v.Enrolled { + if v.NodeInfo != nil { + pid := v.NodeInfo.Process.Id + nodeHost := v.NodeInfo.GetHttpPublishHost() + nodeInfo := h.internalProcessBind(clusterID, clusterUUID, instanceID, instanceEndpoint, pid, nodeHost, auth) + if nodeInfo != nil { discoveredPIDs[pid] = nodeInfo } } @@ -570,19 +570,19 @@ func (h *APIHandler) bindInstanceToCluster(clusterInfo ClusterInfo, nodes *elast //try connect for _, node := range nodes.UnknownProcess { - pid:=node.PID + pid := node.PID for _, v := range node.ListenAddresses { ip := v.IP - port:=v.Port + port := v.Port if util.ContainStr(ip, "::") { ip = fmt.Sprintf("[%s]", ip) } nodeHost := fmt.Sprintf("%s:%d", ip, port) - nodeInfo:=h.internalProcessBind(clusterID,clusterUUID,instanceID,instanceEndpoint,pid,nodeHost,auth) - if nodeInfo!=nil{ + nodeInfo := h.internalProcessBind(clusterID, clusterUUID, instanceID, instanceEndpoint, pid, nodeHost, auth) + if nodeInfo != nil { discoveredPIDs[pid] = nodeInfo } } @@ -594,14 +594,14 @@ func (h *APIHandler) bindInstanceToCluster(clusterInfo ClusterInfo, nodes *elast return discoveredPIDs } -func (h *APIHandler) internalProcessBind(clusterID,clusterUUID,instanceID,instanceEndpoint string,pid int,nodeHost string,auth *model.BasicAuth) *elastic.LocalNodeInfo{ +func (h *APIHandler) internalProcessBind(clusterID, clusterUUID, instanceID, instanceEndpoint string, pid int, nodeHost string, auth *model.BasicAuth) *elastic.LocalNodeInfo { success, tryAgain, nodeInfo := h.getESNodeInfoViaProxy(nodeHost, "http", auth, instanceEndpoint) if !success && tryAgain { //try https again success, tryAgain, nodeInfo = h.getESNodeInfoViaProxy(nodeHost, "https", auth, instanceEndpoint) } - log.Debug(clusterUUID,nodeHost,instanceEndpoint,success, tryAgain, nodeInfo) + log.Debug(clusterUUID, nodeHost, instanceEndpoint, success, tryAgain, nodeInfo) if success { log.Debug("connect to es node success:", nodeHost, ", pid: ", pid) @@ -631,7 +631,6 @@ func (h *APIHandler) internalProcessBind(clusterID,clusterUUID,instanceID,instan return nil } - func (h *APIHandler) getESNodeInfoViaProxy(esHost string, esSchema string, auth *model.BasicAuth, endpoint string) (success, tryAgain bool, info *elastic.LocalNodeInfo) { esConfig := elastic.ElasticsearchConfig{Host: esHost, Schema: esSchema, BasicAuth: auth} return h.getESNodeInfoViaProxyWithConfig(&esConfig, auth, endpoint) @@ -648,11 +647,11 @@ func (h *APIHandler) getESNodeInfoViaProxyWithConfig(cfg *elastic.ElasticsearchC Body: body, } if auth != nil { - req.SetBasicAuth(auth.Username, auth.Password) + req.SetBasicAuth(auth.Username, auth.Password.Get()) } obj := elastic.LocalNodeInfo{} - res, err := server.ProxyAgentRequest("elasticsearch",endpoint, req, &obj) + res, err := server.ProxyAgentRequest("elasticsearch", endpoint, req, &obj) if err != nil { if global.Env().IsDebug { log.Error(err) diff --git a/modules/agent/api/remote_config.go b/modules/agent/api/remote_config.go index 6a21251f..3e2cdab5 100644 --- a/modules/agent/api/remote_config.go +++ b/modules/agent/api/remote_config.go @@ -165,8 +165,8 @@ func getAgentIngestConfigs(instance string, items map[string]BindingItem) (strin } metadata := elastic.GetMetadata(v.ClusterID) - if metadata == nil || metadata.Config == nil{ - log.Errorf("metadata is nil: %v",v.ClusterID) + if metadata == nil || metadata.Config == nil { + log.Errorf("metadata is nil: %v", v.ClusterID) continue } var clusterLevelEnabled = false @@ -189,32 +189,32 @@ func getAgentIngestConfigs(instance string, items map[string]BindingItem) (strin } if auth, ok := dv.(model.BasicAuth); ok { username = auth.Username - password = auth.Password + password = auth.Password.Get() } } - nodeInfo,err:=metadata2.GetNodeConfig(v.ClusterID,v.NodeUUID) - if err!=nil{ + nodeInfo, err := metadata2.GetNodeConfig(v.ClusterID, v.NodeUUID) + if err != nil { log.Error(err) continue } - publishAddress:=nodeInfo.Payload.NodeInfo.GetHttpPublishHost() + publishAddress := nodeInfo.Payload.NodeInfo.GetHttpPublishHost() - if publishAddress==""{ - log.Errorf("publish address is empty: %v",v.NodeUUID) + if publishAddress == "" { + log.Errorf("publish address is empty: %v", v.NodeUUID) continue } nodeEndPoint := metadata.PrepareEndpoint(publishAddress) - pathLogs:=nodeInfo.Payload.NodeInfo.GetPathLogs() + pathLogs := nodeInfo.Payload.NodeInfo.GetPathLogs() if v.Updated > latestVersion { latestVersion = v.Updated } - taskID:=v.ClusterID+"_"+v.NodeUUID + taskID := v.ClusterID + "_" + v.NodeUUID buffer.Write([]byte(fmt.Sprintf("\n - name: \"%v\"\n path: ./config/task_config.tpl\n "+ "variable:\n "+ @@ -228,7 +228,7 @@ func getAgentIngestConfigs(instance string, items map[string]BindingItem) (strin "CLUSTER_LEVEL_TASKS_ENABLED: %v\n "+ "NODE_LEVEL_TASKS_ENABLED: %v\n "+ "NODE_LOGS_PATH: \"%v\"\n\n\n", taskID, taskID, - v.ClusterID,v.ClusterUUID,v.NodeUUID, nodeEndPoint, username, password, clusterLevelEnabled, nodeLevelEnabled, pathLogs))) + v.ClusterID, v.ClusterUUID, v.NodeUUID, nodeEndPoint, username, password, clusterLevelEnabled, nodeLevelEnabled, pathLogs))) } hash := util.MD5digest(buffer.String()) diff --git a/plugin/api/email/common/pipeline.go b/plugin/api/email/common/pipeline.go index 9a85adf3..460779be 100644 --- a/plugin/api/email/common/pipeline.go +++ b/plugin/api/email/common/pipeline.go @@ -18,19 +18,20 @@ import ( ) const emailServerConfigFile = "send_email.yml" + func RefreshEmailServer() error { q := orm.Query{ Size: 10, } q.Conds = orm.And(orm.Eq("enabled", true)) - err, result := orm.Search(model.EmailServer{}, &q ) + err, result := orm.Search(model.EmailServer{}, &q) if err != nil { return err } if len(result.Result) == 0 { return StopEmailServer() } - servers := make([]model.EmailServer,0, len(result.Result)) + servers := make([]model.EmailServer, 0, len(result.Result)) for _, row := range result.Result { emailServer := model.EmailServer{} buf := util.MustToJSONBytes(row) @@ -71,7 +72,7 @@ func CheckEmailPipelineExists() bool { return util.FilesExists(sendEmailCfgFile) } -func getEmailPasswordKey(srv model.EmailServer) string{ +func getEmailPasswordKey(srv model.EmailServer) string { return fmt.Sprintf("%s_password", srv.ID) } @@ -82,15 +83,15 @@ func GeneratePipelineConfig(servers []model.EmailServer) (string, error) { smtpServers := map[string]util.MapStr{} for _, srv := range servers { key := getEmailPasswordKey(srv) - err := keystore.SetValue(key, []byte(srv.Auth.Password)) + err := keystore.SetValue(key, []byte(srv.Auth.Password.Get())) if err != nil { return "", err } smtpServers[srv.ID] = util.MapStr{ "server": util.MapStr{ - "host": srv.Host, - "port": srv.Port, - "tls": srv.TLS, + "host": srv.Host, + "port": srv.Port, + "tls": srv.TLS, "refresh_timestamp": time.Now().UnixMilli(), }, "auth": util.MapStr{ @@ -103,9 +104,9 @@ func GeneratePipelineConfig(servers []model.EmailServer) (string, error) { pipelineCfg := util.MapStr{ "pipeline": []util.MapStr{ { - "name": "send_email_service", - "auto_start": true, - "keep_running": true, + "name": "send_email_service", + "auto_start": true, + "keep_running": true, "retry_delay_in_ms": 5000, "processor": []util.MapStr{ { @@ -113,8 +114,8 @@ func GeneratePipelineConfig(servers []model.EmailServer) (string, error) { "consumer": util.MapStr{ "fetch_max_messages": 1, }, - "max_worker_size": 200, - "num_of_slices": 1, + "max_worker_size": 200, + "num_of_slices": 1, "idle_timeout_in_seconds": 30, "queue_selector": util.MapStr{ "keys": []string{"email_messages"}, @@ -123,12 +124,12 @@ func GeneratePipelineConfig(servers []model.EmailServer) (string, error) { { "smtp": util.MapStr{ "idle_timeout_in_seconds": 1, - "servers": smtpServers, + "servers": smtpServers, "templates": util.MapStr{ "raw": util.MapStr{ "content_type": "text/plain", - "subject": "$[[subject]]", - "body": "$[[body]]", + "subject": "$[[subject]]", + "body": "$[[body]]", }, }, }, diff --git a/plugin/api/email/server.go b/plugin/api/email/server.go index 2043cdec..a2263bc7 100644 --- a/plugin/api/email/server.go +++ b/plugin/api/email/server.go @@ -8,7 +8,9 @@ import ( "bytes" "crypto/tls" "fmt" + "github.com/buger/jsonparser" log "github.com/cihub/seelog" + "github.com/gopkg.in/gomail.v2" "infini.sh/console/model" "infini.sh/console/model/alerting" "infini.sh/console/plugin/api/email/common" @@ -17,8 +19,6 @@ import ( "infini.sh/framework/core/orm" "infini.sh/framework/core/util" "net/http" - "github.com/buger/jsonparser" - "github.com/gopkg.in/gomail.v2" "strconv" "time" ) @@ -67,7 +67,7 @@ func (h *EmailAPI) createEmailServer(w http.ResponseWriter, req *http.Request, p h.WriteError(w, fmt.Sprintf("email server [%s:%d] already exists", obj.Host, obj.Port), http.StatusInternalServerError) return } - if obj.CredentialID == "" && obj.Auth != nil && obj.Auth.Username != ""{ + if obj.CredentialID == "" && obj.Auth != nil && obj.Auth.Username != "" { credentialID, err := saveBasicAuthToCredential(obj) if err != nil { log.Error(err) @@ -97,7 +97,7 @@ func (h *EmailAPI) createEmailServer(w http.ResponseWriter, req *http.Request, p } -func saveBasicAuthToCredential(srv *model.EmailServer)(string, error){ +func saveBasicAuthToCredential(srv *model.EmailServer) (string, error) { if srv == nil { return "", fmt.Errorf("param email config can not be empty") } @@ -108,7 +108,7 @@ func saveBasicAuthToCredential(srv *model.EmailServer)(string, error){ Payload: map[string]interface{}{ "basic_auth": map[string]interface{}{ "username": srv.Auth.Username, - "password": srv.Auth.Password, + "password": srv.Auth.Password.Get(), }, }, } @@ -267,8 +267,8 @@ func checkEmailServerReferenced(srv *model.EmailServer) error { func (h *EmailAPI) searchEmailServer(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { var ( - strSize = h.GetParameterOrDefault(req, "size", "20") - strFrom = h.GetParameterOrDefault(req, "from", "0") + strSize = h.GetParameterOrDefault(req, "size", "20") + strFrom = h.GetParameterOrDefault(req, "from", "0") strEnabled = h.GetParameterOrDefault(req, "enabled", "true") ) size, _ := strconv.Atoi(strSize) @@ -286,7 +286,7 @@ func (h *EmailAPI) searchEmailServer(w http.ResponseWriter, req *http.Request, p } if strEnabled == "true" { q.Conds = orm.And(orm.Eq("enabled", true)) - }else if strEnabled == "false" { + } else if strEnabled == "false" { q.Conds = orm.And(orm.Eq("enabled", false)) } @@ -307,7 +307,7 @@ func (h *EmailAPI) searchEmailServer(w http.ResponseWriter, req *http.Request, p buf := hitsBuf.Bytes() if buf[len(buf)-1] == ',' { buf[len(buf)-1] = ']' - }else{ + } else { hitsBuf.Write([]byte("]")) } res.Raw, err = jsonparser.Set(res.Raw, hitsBuf.Bytes(), "hits", "hits") @@ -340,7 +340,7 @@ func (h *EmailAPI) testEmailServer(w http.ResponseWriter, req *http.Request, ps } if reqBody.CredentialID != "" { auth, err := common.GetBasicAuth(&reqBody.EmailServer) - if err != nil { + if err != nil { h.WriteError(w, err.Error(), http.StatusInternalServerError) return } @@ -356,7 +356,7 @@ func (h *EmailAPI) testEmailServer(w http.ResponseWriter, req *http.Request, ps message.SetHeader("Subject", "INFINI platform test email") message.SetBody("text/plain", "This is just a test email, do not reply!") - d := gomail.NewDialerWithTimeout(reqBody.Host, reqBody.Port, reqBody.Auth.Username, reqBody.Auth.Password, 3*time.Second) + d := gomail.NewDialerWithTimeout(reqBody.Host, reqBody.Port, reqBody.Auth.Username, reqBody.Auth.Password.Get(), 3*time.Second) d.TLSConfig = &tls.Config{InsecureSkipVerify: true} d.SSL = reqBody.TLS diff --git a/plugin/setup/setup.go b/plugin/setup/setup.go index 435a31cf..e20ca01e 100644 --- a/plugin/setup/setup.go +++ b/plugin/setup/setup.go @@ -6,6 +6,7 @@ import ( "crypto/md5" "encoding/hex" "fmt" + "infini.sh/framework/lib/go-ucfg" "io" "io/ioutil" "net/http" @@ -97,7 +98,7 @@ func (module *Module) Start() error { return } if basicAuth, ok := bv.(model.BasicAuth); ok { - err = keystore.SetValue("SYSTEM_CLUSTER_PASS", []byte(basicAuth.Password)) + err = keystore.SetValue("SYSTEM_CLUSTER_PASS", []byte(basicAuth.Password.Get())) if err != nil { log.Error(err) } @@ -283,7 +284,7 @@ func (module *Module) initTempClient(r *http.Request) (error, elastic.API, Setup Endpoint: request.Cluster.Endpoint, BasicAuth: &model.BasicAuth{ Username: request.Cluster.Username, - Password: request.Cluster.Password, + Password: ucfg.SecretString(request.Cluster.Password), }, } @@ -458,7 +459,7 @@ func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps http if oldCfg.CredentialID != "" && !secretMismatch { basicAuth, _ := elastic1.GetBasicAuth(&oldCfg) if basicAuth != nil { - if basicAuth.Username == request.Cluster.Username && basicAuth.Password == request.Cluster.Password { + if basicAuth.Username == request.Cluster.Username && basicAuth.Password.Get() == request.Cluster.Password { reuseOldCred = true } } @@ -528,7 +529,7 @@ func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps http panic(err) } } - err = keystore.SetValue("SYSTEM_CLUSTER_PASS", []byte(cfg.BasicAuth.Password)) + err = keystore.SetValue("SYSTEM_CLUSTER_PASS", []byte(cfg.BasicAuth.Password.Get())) if err != nil { panic(err) }