fix: build error with basicAuth.Password
This commit is contained in:
parent
80bb9b61a7
commit
ae3b792f7f
|
@ -14,7 +14,7 @@ import (
|
||||||
"infini.sh/framework/core/util"
|
"infini.sh/framework/core/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
func bootstrapRequirementCheck() error{
|
func bootstrapRequirementCheck() error {
|
||||||
err := checkElasticsearchRequirements()
|
err := checkElasticsearchRequirements()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -22,8 +22,7 @@ func bootstrapRequirementCheck() error{
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func checkElasticsearchRequirements() error {
|
||||||
func checkElasticsearchRequirements() error{
|
|
||||||
log.Trace("start to check system cluster requirement")
|
log.Trace("start to check system cluster requirement")
|
||||||
var esConfigs = []elastic.ElasticsearchConfig{}
|
var esConfigs = []elastic.ElasticsearchConfig{}
|
||||||
ok, err := env.ParseConfig("elasticsearch", &esConfigs)
|
ok, err := env.ParseConfig("elasticsearch", &esConfigs)
|
||||||
|
@ -34,17 +33,17 @@ func checkElasticsearchRequirements() error{
|
||||||
return fmt.Errorf("elasticsearch config section not found")
|
return fmt.Errorf("elasticsearch config section not found")
|
||||||
}
|
}
|
||||||
|
|
||||||
elasticsearchID:=global.Lookup(elastic.GlobalSystemElasticsearchID)
|
elasticsearchID := global.Lookup(elastic.GlobalSystemElasticsearchID)
|
||||||
|
|
||||||
if elasticsearchID == nil||elasticsearchID=="" {
|
if elasticsearchID == nil || elasticsearchID == "" {
|
||||||
return fmt.Errorf("cluster config in web section can not be empty")
|
return fmt.Errorf("cluster config in web section can not be empty")
|
||||||
}
|
}
|
||||||
|
|
||||||
esID:=elasticsearchID.(string)
|
esID := elasticsearchID.(string)
|
||||||
|
|
||||||
var targetEsConfig *elastic.ElasticsearchConfig
|
var targetEsConfig *elastic.ElasticsearchConfig
|
||||||
for _, esConfig := range esConfigs {
|
for _, esConfig := range esConfigs {
|
||||||
if esConfig.ID == esID||(esConfig.ID==""&&esConfig.Name==esID) {
|
if esConfig.ID == esID || (esConfig.ID == "" && esConfig.Name == esID) {
|
||||||
targetEsConfig = &esConfig
|
targetEsConfig = &esConfig
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -54,7 +53,7 @@ func checkElasticsearchRequirements() error{
|
||||||
}
|
}
|
||||||
var req = util.NewGetRequest(targetEsConfig.GetAnyEndpoint(), nil)
|
var req = util.NewGetRequest(targetEsConfig.GetAnyEndpoint(), nil)
|
||||||
if targetEsConfig.BasicAuth != nil {
|
if targetEsConfig.BasicAuth != nil {
|
||||||
req.SetBasicAuth(targetEsConfig.BasicAuth.Username, targetEsConfig.BasicAuth.Password)
|
req.SetBasicAuth(targetEsConfig.BasicAuth.Username, targetEsConfig.BasicAuth.Password.Get())
|
||||||
}
|
}
|
||||||
|
|
||||||
result, err := util.ExecuteRequest(req)
|
result, err := util.ExecuteRequest(req)
|
||||||
|
@ -62,11 +61,10 @@ func checkElasticsearchRequirements() error{
|
||||||
return fmt.Errorf("check system cluster requirement error: %v", err)
|
return fmt.Errorf("check system cluster requirement error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if result==nil||result.Body==nil||len(result.Body)==0{
|
if result == nil || result.Body == nil || len(result.Body) == 0 {
|
||||||
return fmt.Errorf("failed to retrive cluster version info")
|
return fmt.Errorf("failed to retrive cluster version info")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
versionNumber, err := jsonparser.GetString(result.Body, "version", "number")
|
versionNumber, err := jsonparser.GetString(result.Body, "version", "number")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("check system cluster requirement error: %v, got response: %s", err, string(result.Body))
|
return fmt.Errorf("check system cluster requirement error: %v, got response: %s", err, string(result.Body))
|
||||||
|
@ -78,7 +76,7 @@ func checkElasticsearchRequirements() error{
|
||||||
return fmt.Errorf("unkonw cluster distribution: %v", distribution)
|
return fmt.Errorf("unkonw cluster distribution: %v", distribution)
|
||||||
}
|
}
|
||||||
cr, err := util.VersionCompare(versionNumber, "5.3")
|
cr, err := util.VersionCompare(versionNumber, "5.3")
|
||||||
if err !=nil {
|
if err != nil {
|
||||||
return fmt.Errorf("check system cluster requirement error: %v", err)
|
return fmt.Errorf("check system cluster requirement error: %v", err)
|
||||||
}
|
}
|
||||||
if cr == -1 {
|
if cr == -1 {
|
||||||
|
|
|
@ -1,10 +1,10 @@
|
||||||
package config
|
package config
|
||||||
|
|
||||||
const LastCommitLog = "48882e67badf2813406d1b9bdc65c20f22c0f8fd"
|
const LastCommitLog = "N/A"
|
||||||
const BuildDate = "2024-03-20T02:20:55Z"
|
const BuildDate = "N/A"
|
||||||
|
|
||||||
const EOLDate = "2024-12-31T10:10:10Z"
|
const EOLDate = "N/A"
|
||||||
|
|
||||||
const Version = "1.0.0_SNAPSHOT"
|
const Version = "0.0.1-SNAPSHOT"
|
||||||
|
|
||||||
const BuildNumber = "001"
|
const BuildNumber = "001"
|
||||||
|
|
|
@ -15,6 +15,7 @@ import (
|
||||||
"infini.sh/framework/core/util"
|
"infini.sh/framework/core/util"
|
||||||
"infini.sh/framework/modules/pipeline"
|
"infini.sh/framework/modules/pipeline"
|
||||||
)
|
)
|
||||||
|
|
||||||
type TaskWorker struct {
|
type TaskWorker struct {
|
||||||
model.Instance
|
model.Instance
|
||||||
}
|
}
|
||||||
|
@ -134,7 +135,7 @@ func (inst *TaskWorker) TryConnectWithTimeout(duration time.Duration) error {
|
||||||
|
|
||||||
func (inst *TaskWorker) doRequest(req *util.Request, resBody interface{}) error {
|
func (inst *TaskWorker) doRequest(req *util.Request, resBody interface{}) error {
|
||||||
if inst.BasicAuth != nil && inst.BasicAuth.Username != "" {
|
if inst.BasicAuth != nil && inst.BasicAuth.Username != "" {
|
||||||
req.SetBasicAuth(inst.BasicAuth.Username, inst.BasicAuth.Password)
|
req.SetBasicAuth(inst.BasicAuth.Username, inst.BasicAuth.Password.Get())
|
||||||
}
|
}
|
||||||
result, err := util.ExecuteRequest(req)
|
result, err := util.ExecuteRequest(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -25,7 +25,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
//node -> binding item
|
// node -> binding item
|
||||||
func GetEnrolledNodesByAgent(instanceID string) (map[string]BindingItem, error) {
|
func GetEnrolledNodesByAgent(instanceID string) (map[string]BindingItem, error) {
|
||||||
|
|
||||||
//get nodes settings where agent id = instance id
|
//get nodes settings where agent id = instance id
|
||||||
|
@ -177,7 +177,7 @@ func refreshNodesInfo(instanceID, instanceEndpoint string) (*elastic.DiscoveryRe
|
||||||
return nodesInfo, nil
|
return nodesInfo, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
//get nodes info via agent
|
// get nodes info via agent
|
||||||
func GetElasticsearchNodesViaAgent(ctx context.Context, endpoint string) (*elastic.DiscoveryResult, error) {
|
func GetElasticsearchNodesViaAgent(ctx context.Context, endpoint string) (*elastic.DiscoveryResult, error) {
|
||||||
req := &util.Request{
|
req := &util.Request{
|
||||||
Method: http.MethodGet,
|
Method: http.MethodGet,
|
||||||
|
@ -186,7 +186,7 @@ func GetElasticsearchNodesViaAgent(ctx context.Context, endpoint string) (*elast
|
||||||
}
|
}
|
||||||
|
|
||||||
obj := elastic.DiscoveryResult{}
|
obj := elastic.DiscoveryResult{}
|
||||||
_, err := server.ProxyAgentRequest("elasticsearch",endpoint, req, &obj)
|
_, err := server.ProxyAgentRequest("elasticsearch", endpoint, req, &obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -218,7 +218,7 @@ func GetElasticLogFiles(ctx context.Context, instance *model.Instance, logsPath
|
||||||
}
|
}
|
||||||
|
|
||||||
resBody := map[string]interface{}{}
|
resBody := map[string]interface{}{}
|
||||||
_, err := server.ProxyAgentRequest("elasticsearch",instance.GetEndpoint(), req, &resBody)
|
_, err := server.ProxyAgentRequest("elasticsearch", instance.GetEndpoint(), req, &resBody)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -237,7 +237,7 @@ func GetElasticLogFileContent(ctx context.Context, instance *model.Instance, bod
|
||||||
Body: util.MustToJSONBytes(body),
|
Body: util.MustToJSONBytes(body),
|
||||||
}
|
}
|
||||||
resBody := map[string]interface{}{}
|
resBody := map[string]interface{}{}
|
||||||
_, err := server.ProxyAgentRequest("elasticsearch",instance.GetEndpoint(), req, &resBody)
|
_, err := server.ProxyAgentRequest("elasticsearch", instance.GetEndpoint(), req, &resBody)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -319,7 +319,7 @@ func (h *APIHandler) getLogFileContent(w http.ResponseWriter, req *http.Request,
|
||||||
h.WriteJSON(w, res, http.StatusOK)
|
h.WriteJSON(w, res, http.StatusOK)
|
||||||
}
|
}
|
||||||
|
|
||||||
//instance, pathLogs
|
// instance, pathLogs
|
||||||
func getAgentByNodeID(clusterID, nodeID string) (*model.Instance, string, error) {
|
func getAgentByNodeID(clusterID, nodeID string) (*model.Instance, string, error) {
|
||||||
|
|
||||||
q := orm.Query{
|
q := orm.Query{
|
||||||
|
@ -457,10 +457,10 @@ func (h *APIHandler) autoEnrollESNode(w http.ResponseWriter, req *http.Request,
|
||||||
log.Infof("instance:%v,%v, success enroll %v nodes", instanceID, instanceEndpoint, len(pids))
|
log.Infof("instance:%v,%v, success enroll %v nodes", instanceID, instanceEndpoint, len(pids))
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(nodes.Nodes)>0{
|
if len(nodes.Nodes) > 0 {
|
||||||
for k,v:=range nodes.Nodes{
|
for k, v := range nodes.Nodes {
|
||||||
log.Debug(k,v.Status,v.Enrolled)
|
log.Debug(k, v.Status, v.Enrolled)
|
||||||
if !v.Enrolled{
|
if !v.Enrolled {
|
||||||
pids := h.bindInstanceToCluster(clusterInfo, nodes, instanceID, instanceEndpoint)
|
pids := h.bindInstanceToCluster(clusterInfo, nodes, instanceID, instanceEndpoint)
|
||||||
log.Infof("instance:%v,%v, success enroll %v nodes", instanceID, instanceEndpoint, len(pids))
|
log.Infof("instance:%v,%v, success enroll %v nodes", instanceID, instanceEndpoint, len(pids))
|
||||||
}
|
}
|
||||||
|
@ -554,13 +554,13 @@ func (h *APIHandler) bindInstanceToCluster(clusterInfo ClusterInfo, nodes *elast
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _,v:=range nodes.Nodes{
|
for _, v := range nodes.Nodes {
|
||||||
if !v.Enrolled{
|
if !v.Enrolled {
|
||||||
if v.NodeInfo!=nil{
|
if v.NodeInfo != nil {
|
||||||
pid:=v.NodeInfo.Process.Id
|
pid := v.NodeInfo.Process.Id
|
||||||
nodeHost:=v.NodeInfo.GetHttpPublishHost()
|
nodeHost := v.NodeInfo.GetHttpPublishHost()
|
||||||
nodeInfo:=h.internalProcessBind(clusterID,clusterUUID,instanceID,instanceEndpoint,pid,nodeHost,auth)
|
nodeInfo := h.internalProcessBind(clusterID, clusterUUID, instanceID, instanceEndpoint, pid, nodeHost, auth)
|
||||||
if nodeInfo!=nil{
|
if nodeInfo != nil {
|
||||||
discoveredPIDs[pid] = nodeInfo
|
discoveredPIDs[pid] = nodeInfo
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -570,19 +570,19 @@ func (h *APIHandler) bindInstanceToCluster(clusterInfo ClusterInfo, nodes *elast
|
||||||
//try connect
|
//try connect
|
||||||
for _, node := range nodes.UnknownProcess {
|
for _, node := range nodes.UnknownProcess {
|
||||||
|
|
||||||
pid:=node.PID
|
pid := node.PID
|
||||||
|
|
||||||
for _, v := range node.ListenAddresses {
|
for _, v := range node.ListenAddresses {
|
||||||
|
|
||||||
ip := v.IP
|
ip := v.IP
|
||||||
port:=v.Port
|
port := v.Port
|
||||||
|
|
||||||
if util.ContainStr(ip, "::") {
|
if util.ContainStr(ip, "::") {
|
||||||
ip = fmt.Sprintf("[%s]", ip)
|
ip = fmt.Sprintf("[%s]", ip)
|
||||||
}
|
}
|
||||||
nodeHost := fmt.Sprintf("%s:%d", ip, port)
|
nodeHost := fmt.Sprintf("%s:%d", ip, port)
|
||||||
nodeInfo:=h.internalProcessBind(clusterID,clusterUUID,instanceID,instanceEndpoint,pid,nodeHost,auth)
|
nodeInfo := h.internalProcessBind(clusterID, clusterUUID, instanceID, instanceEndpoint, pid, nodeHost, auth)
|
||||||
if nodeInfo!=nil{
|
if nodeInfo != nil {
|
||||||
discoveredPIDs[pid] = nodeInfo
|
discoveredPIDs[pid] = nodeInfo
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -594,14 +594,14 @@ func (h *APIHandler) bindInstanceToCluster(clusterInfo ClusterInfo, nodes *elast
|
||||||
return discoveredPIDs
|
return discoveredPIDs
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *APIHandler) internalProcessBind(clusterID,clusterUUID,instanceID,instanceEndpoint string,pid int,nodeHost string,auth *model.BasicAuth) *elastic.LocalNodeInfo{
|
func (h *APIHandler) internalProcessBind(clusterID, clusterUUID, instanceID, instanceEndpoint string, pid int, nodeHost string, auth *model.BasicAuth) *elastic.LocalNodeInfo {
|
||||||
success, tryAgain, nodeInfo := h.getESNodeInfoViaProxy(nodeHost, "http", auth, instanceEndpoint)
|
success, tryAgain, nodeInfo := h.getESNodeInfoViaProxy(nodeHost, "http", auth, instanceEndpoint)
|
||||||
if !success && tryAgain {
|
if !success && tryAgain {
|
||||||
//try https again
|
//try https again
|
||||||
success, tryAgain, nodeInfo = h.getESNodeInfoViaProxy(nodeHost, "https", auth, instanceEndpoint)
|
success, tryAgain, nodeInfo = h.getESNodeInfoViaProxy(nodeHost, "https", auth, instanceEndpoint)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debug(clusterUUID,nodeHost,instanceEndpoint,success, tryAgain, nodeInfo)
|
log.Debug(clusterUUID, nodeHost, instanceEndpoint, success, tryAgain, nodeInfo)
|
||||||
|
|
||||||
if success {
|
if success {
|
||||||
log.Debug("connect to es node success:", nodeHost, ", pid: ", pid)
|
log.Debug("connect to es node success:", nodeHost, ", pid: ", pid)
|
||||||
|
@ -631,7 +631,6 @@ func (h *APIHandler) internalProcessBind(clusterID,clusterUUID,instanceID,instan
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
func (h *APIHandler) getESNodeInfoViaProxy(esHost string, esSchema string, auth *model.BasicAuth, endpoint string) (success, tryAgain bool, info *elastic.LocalNodeInfo) {
|
func (h *APIHandler) getESNodeInfoViaProxy(esHost string, esSchema string, auth *model.BasicAuth, endpoint string) (success, tryAgain bool, info *elastic.LocalNodeInfo) {
|
||||||
esConfig := elastic.ElasticsearchConfig{Host: esHost, Schema: esSchema, BasicAuth: auth}
|
esConfig := elastic.ElasticsearchConfig{Host: esHost, Schema: esSchema, BasicAuth: auth}
|
||||||
return h.getESNodeInfoViaProxyWithConfig(&esConfig, auth, endpoint)
|
return h.getESNodeInfoViaProxyWithConfig(&esConfig, auth, endpoint)
|
||||||
|
@ -648,11 +647,11 @@ func (h *APIHandler) getESNodeInfoViaProxyWithConfig(cfg *elastic.ElasticsearchC
|
||||||
Body: body,
|
Body: body,
|
||||||
}
|
}
|
||||||
if auth != nil {
|
if auth != nil {
|
||||||
req.SetBasicAuth(auth.Username, auth.Password)
|
req.SetBasicAuth(auth.Username, auth.Password.Get())
|
||||||
}
|
}
|
||||||
|
|
||||||
obj := elastic.LocalNodeInfo{}
|
obj := elastic.LocalNodeInfo{}
|
||||||
res, err := server.ProxyAgentRequest("elasticsearch",endpoint, req, &obj)
|
res, err := server.ProxyAgentRequest("elasticsearch", endpoint, req, &obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if global.Env().IsDebug {
|
if global.Env().IsDebug {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
|
|
|
@ -165,8 +165,8 @@ func getAgentIngestConfigs(instance string, items map[string]BindingItem) (strin
|
||||||
}
|
}
|
||||||
|
|
||||||
metadata := elastic.GetMetadata(v.ClusterID)
|
metadata := elastic.GetMetadata(v.ClusterID)
|
||||||
if metadata == nil || metadata.Config == nil{
|
if metadata == nil || metadata.Config == nil {
|
||||||
log.Errorf("metadata is nil: %v",v.ClusterID)
|
log.Errorf("metadata is nil: %v", v.ClusterID)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
var clusterLevelEnabled = false
|
var clusterLevelEnabled = false
|
||||||
|
@ -189,32 +189,32 @@ func getAgentIngestConfigs(instance string, items map[string]BindingItem) (strin
|
||||||
}
|
}
|
||||||
if auth, ok := dv.(model.BasicAuth); ok {
|
if auth, ok := dv.(model.BasicAuth); ok {
|
||||||
username = auth.Username
|
username = auth.Username
|
||||||
password = auth.Password
|
password = auth.Password.Get()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
nodeInfo,err:=metadata2.GetNodeConfig(v.ClusterID,v.NodeUUID)
|
nodeInfo, err := metadata2.GetNodeConfig(v.ClusterID, v.NodeUUID)
|
||||||
if err!=nil{
|
if err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
publishAddress:=nodeInfo.Payload.NodeInfo.GetHttpPublishHost()
|
publishAddress := nodeInfo.Payload.NodeInfo.GetHttpPublishHost()
|
||||||
|
|
||||||
if publishAddress==""{
|
if publishAddress == "" {
|
||||||
log.Errorf("publish address is empty: %v",v.NodeUUID)
|
log.Errorf("publish address is empty: %v", v.NodeUUID)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
nodeEndPoint := metadata.PrepareEndpoint(publishAddress)
|
nodeEndPoint := metadata.PrepareEndpoint(publishAddress)
|
||||||
|
|
||||||
pathLogs:=nodeInfo.Payload.NodeInfo.GetPathLogs()
|
pathLogs := nodeInfo.Payload.NodeInfo.GetPathLogs()
|
||||||
|
|
||||||
if v.Updated > latestVersion {
|
if v.Updated > latestVersion {
|
||||||
latestVersion = v.Updated
|
latestVersion = v.Updated
|
||||||
}
|
}
|
||||||
|
|
||||||
taskID:=v.ClusterID+"_"+v.NodeUUID
|
taskID := v.ClusterID + "_" + v.NodeUUID
|
||||||
|
|
||||||
buffer.Write([]byte(fmt.Sprintf("\n - name: \"%v\"\n path: ./config/task_config.tpl\n "+
|
buffer.Write([]byte(fmt.Sprintf("\n - name: \"%v\"\n path: ./config/task_config.tpl\n "+
|
||||||
"variable:\n "+
|
"variable:\n "+
|
||||||
|
@ -228,7 +228,7 @@ func getAgentIngestConfigs(instance string, items map[string]BindingItem) (strin
|
||||||
"CLUSTER_LEVEL_TASKS_ENABLED: %v\n "+
|
"CLUSTER_LEVEL_TASKS_ENABLED: %v\n "+
|
||||||
"NODE_LEVEL_TASKS_ENABLED: %v\n "+
|
"NODE_LEVEL_TASKS_ENABLED: %v\n "+
|
||||||
"NODE_LOGS_PATH: \"%v\"\n\n\n", taskID, taskID,
|
"NODE_LOGS_PATH: \"%v\"\n\n\n", taskID, taskID,
|
||||||
v.ClusterID,v.ClusterUUID,v.NodeUUID, nodeEndPoint, username, password, clusterLevelEnabled, nodeLevelEnabled, pathLogs)))
|
v.ClusterID, v.ClusterUUID, v.NodeUUID, nodeEndPoint, username, password, clusterLevelEnabled, nodeLevelEnabled, pathLogs)))
|
||||||
}
|
}
|
||||||
|
|
||||||
hash := util.MD5digest(buffer.String())
|
hash := util.MD5digest(buffer.String())
|
||||||
|
|
|
@ -18,19 +18,20 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const emailServerConfigFile = "send_email.yml"
|
const emailServerConfigFile = "send_email.yml"
|
||||||
|
|
||||||
func RefreshEmailServer() error {
|
func RefreshEmailServer() error {
|
||||||
q := orm.Query{
|
q := orm.Query{
|
||||||
Size: 10,
|
Size: 10,
|
||||||
}
|
}
|
||||||
q.Conds = orm.And(orm.Eq("enabled", true))
|
q.Conds = orm.And(orm.Eq("enabled", true))
|
||||||
err, result := orm.Search(model.EmailServer{}, &q )
|
err, result := orm.Search(model.EmailServer{}, &q)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if len(result.Result) == 0 {
|
if len(result.Result) == 0 {
|
||||||
return StopEmailServer()
|
return StopEmailServer()
|
||||||
}
|
}
|
||||||
servers := make([]model.EmailServer,0, len(result.Result))
|
servers := make([]model.EmailServer, 0, len(result.Result))
|
||||||
for _, row := range result.Result {
|
for _, row := range result.Result {
|
||||||
emailServer := model.EmailServer{}
|
emailServer := model.EmailServer{}
|
||||||
buf := util.MustToJSONBytes(row)
|
buf := util.MustToJSONBytes(row)
|
||||||
|
@ -71,7 +72,7 @@ func CheckEmailPipelineExists() bool {
|
||||||
return util.FilesExists(sendEmailCfgFile)
|
return util.FilesExists(sendEmailCfgFile)
|
||||||
}
|
}
|
||||||
|
|
||||||
func getEmailPasswordKey(srv model.EmailServer) string{
|
func getEmailPasswordKey(srv model.EmailServer) string {
|
||||||
return fmt.Sprintf("%s_password", srv.ID)
|
return fmt.Sprintf("%s_password", srv.ID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -82,15 +83,15 @@ func GeneratePipelineConfig(servers []model.EmailServer) (string, error) {
|
||||||
smtpServers := map[string]util.MapStr{}
|
smtpServers := map[string]util.MapStr{}
|
||||||
for _, srv := range servers {
|
for _, srv := range servers {
|
||||||
key := getEmailPasswordKey(srv)
|
key := getEmailPasswordKey(srv)
|
||||||
err := keystore.SetValue(key, []byte(srv.Auth.Password))
|
err := keystore.SetValue(key, []byte(srv.Auth.Password.Get()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
smtpServers[srv.ID] = util.MapStr{
|
smtpServers[srv.ID] = util.MapStr{
|
||||||
"server": util.MapStr{
|
"server": util.MapStr{
|
||||||
"host": srv.Host,
|
"host": srv.Host,
|
||||||
"port": srv.Port,
|
"port": srv.Port,
|
||||||
"tls": srv.TLS,
|
"tls": srv.TLS,
|
||||||
"refresh_timestamp": time.Now().UnixMilli(),
|
"refresh_timestamp": time.Now().UnixMilli(),
|
||||||
},
|
},
|
||||||
"auth": util.MapStr{
|
"auth": util.MapStr{
|
||||||
|
@ -103,9 +104,9 @@ func GeneratePipelineConfig(servers []model.EmailServer) (string, error) {
|
||||||
pipelineCfg := util.MapStr{
|
pipelineCfg := util.MapStr{
|
||||||
"pipeline": []util.MapStr{
|
"pipeline": []util.MapStr{
|
||||||
{
|
{
|
||||||
"name": "send_email_service",
|
"name": "send_email_service",
|
||||||
"auto_start": true,
|
"auto_start": true,
|
||||||
"keep_running": true,
|
"keep_running": true,
|
||||||
"retry_delay_in_ms": 5000,
|
"retry_delay_in_ms": 5000,
|
||||||
"processor": []util.MapStr{
|
"processor": []util.MapStr{
|
||||||
{
|
{
|
||||||
|
@ -113,8 +114,8 @@ func GeneratePipelineConfig(servers []model.EmailServer) (string, error) {
|
||||||
"consumer": util.MapStr{
|
"consumer": util.MapStr{
|
||||||
"fetch_max_messages": 1,
|
"fetch_max_messages": 1,
|
||||||
},
|
},
|
||||||
"max_worker_size": 200,
|
"max_worker_size": 200,
|
||||||
"num_of_slices": 1,
|
"num_of_slices": 1,
|
||||||
"idle_timeout_in_seconds": 30,
|
"idle_timeout_in_seconds": 30,
|
||||||
"queue_selector": util.MapStr{
|
"queue_selector": util.MapStr{
|
||||||
"keys": []string{"email_messages"},
|
"keys": []string{"email_messages"},
|
||||||
|
@ -123,12 +124,12 @@ func GeneratePipelineConfig(servers []model.EmailServer) (string, error) {
|
||||||
{
|
{
|
||||||
"smtp": util.MapStr{
|
"smtp": util.MapStr{
|
||||||
"idle_timeout_in_seconds": 1,
|
"idle_timeout_in_seconds": 1,
|
||||||
"servers": smtpServers,
|
"servers": smtpServers,
|
||||||
"templates": util.MapStr{
|
"templates": util.MapStr{
|
||||||
"raw": util.MapStr{
|
"raw": util.MapStr{
|
||||||
"content_type": "text/plain",
|
"content_type": "text/plain",
|
||||||
"subject": "$[[subject]]",
|
"subject": "$[[subject]]",
|
||||||
"body": "$[[body]]",
|
"body": "$[[body]]",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
|
@ -8,7 +8,9 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/buger/jsonparser"
|
||||||
log "github.com/cihub/seelog"
|
log "github.com/cihub/seelog"
|
||||||
|
"github.com/gopkg.in/gomail.v2"
|
||||||
"infini.sh/console/model"
|
"infini.sh/console/model"
|
||||||
"infini.sh/console/model/alerting"
|
"infini.sh/console/model/alerting"
|
||||||
"infini.sh/console/plugin/api/email/common"
|
"infini.sh/console/plugin/api/email/common"
|
||||||
|
@ -17,8 +19,6 @@ import (
|
||||||
"infini.sh/framework/core/orm"
|
"infini.sh/framework/core/orm"
|
||||||
"infini.sh/framework/core/util"
|
"infini.sh/framework/core/util"
|
||||||
"net/http"
|
"net/http"
|
||||||
"github.com/buger/jsonparser"
|
|
||||||
"github.com/gopkg.in/gomail.v2"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
@ -67,7 +67,7 @@ func (h *EmailAPI) createEmailServer(w http.ResponseWriter, req *http.Request, p
|
||||||
h.WriteError(w, fmt.Sprintf("email server [%s:%d] already exists", obj.Host, obj.Port), http.StatusInternalServerError)
|
h.WriteError(w, fmt.Sprintf("email server [%s:%d] already exists", obj.Host, obj.Port), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if obj.CredentialID == "" && obj.Auth != nil && obj.Auth.Username != ""{
|
if obj.CredentialID == "" && obj.Auth != nil && obj.Auth.Username != "" {
|
||||||
credentialID, err := saveBasicAuthToCredential(obj)
|
credentialID, err := saveBasicAuthToCredential(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
|
@ -97,7 +97,7 @@ func (h *EmailAPI) createEmailServer(w http.ResponseWriter, req *http.Request, p
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func saveBasicAuthToCredential(srv *model.EmailServer)(string, error){
|
func saveBasicAuthToCredential(srv *model.EmailServer) (string, error) {
|
||||||
if srv == nil {
|
if srv == nil {
|
||||||
return "", fmt.Errorf("param email config can not be empty")
|
return "", fmt.Errorf("param email config can not be empty")
|
||||||
}
|
}
|
||||||
|
@ -108,7 +108,7 @@ func saveBasicAuthToCredential(srv *model.EmailServer)(string, error){
|
||||||
Payload: map[string]interface{}{
|
Payload: map[string]interface{}{
|
||||||
"basic_auth": map[string]interface{}{
|
"basic_auth": map[string]interface{}{
|
||||||
"username": srv.Auth.Username,
|
"username": srv.Auth.Username,
|
||||||
"password": srv.Auth.Password,
|
"password": srv.Auth.Password.Get(),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -267,8 +267,8 @@ func checkEmailServerReferenced(srv *model.EmailServer) error {
|
||||||
func (h *EmailAPI) searchEmailServer(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
func (h *EmailAPI) searchEmailServer(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
||||||
|
|
||||||
var (
|
var (
|
||||||
strSize = h.GetParameterOrDefault(req, "size", "20")
|
strSize = h.GetParameterOrDefault(req, "size", "20")
|
||||||
strFrom = h.GetParameterOrDefault(req, "from", "0")
|
strFrom = h.GetParameterOrDefault(req, "from", "0")
|
||||||
strEnabled = h.GetParameterOrDefault(req, "enabled", "true")
|
strEnabled = h.GetParameterOrDefault(req, "enabled", "true")
|
||||||
)
|
)
|
||||||
size, _ := strconv.Atoi(strSize)
|
size, _ := strconv.Atoi(strSize)
|
||||||
|
@ -286,7 +286,7 @@ func (h *EmailAPI) searchEmailServer(w http.ResponseWriter, req *http.Request, p
|
||||||
}
|
}
|
||||||
if strEnabled == "true" {
|
if strEnabled == "true" {
|
||||||
q.Conds = orm.And(orm.Eq("enabled", true))
|
q.Conds = orm.And(orm.Eq("enabled", true))
|
||||||
}else if strEnabled == "false" {
|
} else if strEnabled == "false" {
|
||||||
q.Conds = orm.And(orm.Eq("enabled", false))
|
q.Conds = orm.And(orm.Eq("enabled", false))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -307,7 +307,7 @@ func (h *EmailAPI) searchEmailServer(w http.ResponseWriter, req *http.Request, p
|
||||||
buf := hitsBuf.Bytes()
|
buf := hitsBuf.Bytes()
|
||||||
if buf[len(buf)-1] == ',' {
|
if buf[len(buf)-1] == ',' {
|
||||||
buf[len(buf)-1] = ']'
|
buf[len(buf)-1] = ']'
|
||||||
}else{
|
} else {
|
||||||
hitsBuf.Write([]byte("]"))
|
hitsBuf.Write([]byte("]"))
|
||||||
}
|
}
|
||||||
res.Raw, err = jsonparser.Set(res.Raw, hitsBuf.Bytes(), "hits", "hits")
|
res.Raw, err = jsonparser.Set(res.Raw, hitsBuf.Bytes(), "hits", "hits")
|
||||||
|
@ -340,7 +340,7 @@ func (h *EmailAPI) testEmailServer(w http.ResponseWriter, req *http.Request, ps
|
||||||
}
|
}
|
||||||
if reqBody.CredentialID != "" {
|
if reqBody.CredentialID != "" {
|
||||||
auth, err := common.GetBasicAuth(&reqBody.EmailServer)
|
auth, err := common.GetBasicAuth(&reqBody.EmailServer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -356,7 +356,7 @@ func (h *EmailAPI) testEmailServer(w http.ResponseWriter, req *http.Request, ps
|
||||||
message.SetHeader("Subject", "INFINI platform test email")
|
message.SetHeader("Subject", "INFINI platform test email")
|
||||||
|
|
||||||
message.SetBody("text/plain", "This is just a test email, do not reply!")
|
message.SetBody("text/plain", "This is just a test email, do not reply!")
|
||||||
d := gomail.NewDialerWithTimeout(reqBody.Host, reqBody.Port, reqBody.Auth.Username, reqBody.Auth.Password, 3*time.Second)
|
d := gomail.NewDialerWithTimeout(reqBody.Host, reqBody.Port, reqBody.Auth.Username, reqBody.Auth.Password.Get(), 3*time.Second)
|
||||||
d.TLSConfig = &tls.Config{InsecureSkipVerify: true}
|
d.TLSConfig = &tls.Config{InsecureSkipVerify: true}
|
||||||
d.SSL = reqBody.TLS
|
d.SSL = reqBody.TLS
|
||||||
|
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"crypto/md5"
|
"crypto/md5"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"infini.sh/framework/lib/go-ucfg"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
@ -97,7 +98,7 @@ func (module *Module) Start() error {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if basicAuth, ok := bv.(model.BasicAuth); ok {
|
if basicAuth, ok := bv.(model.BasicAuth); ok {
|
||||||
err = keystore.SetValue("SYSTEM_CLUSTER_PASS", []byte(basicAuth.Password))
|
err = keystore.SetValue("SYSTEM_CLUSTER_PASS", []byte(basicAuth.Password.Get()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
}
|
}
|
||||||
|
@ -283,7 +284,7 @@ func (module *Module) initTempClient(r *http.Request) (error, elastic.API, Setup
|
||||||
Endpoint: request.Cluster.Endpoint,
|
Endpoint: request.Cluster.Endpoint,
|
||||||
BasicAuth: &model.BasicAuth{
|
BasicAuth: &model.BasicAuth{
|
||||||
Username: request.Cluster.Username,
|
Username: request.Cluster.Username,
|
||||||
Password: request.Cluster.Password,
|
Password: ucfg.SecretString(request.Cluster.Password),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -458,7 +459,7 @@ func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps http
|
||||||
if oldCfg.CredentialID != "" && !secretMismatch {
|
if oldCfg.CredentialID != "" && !secretMismatch {
|
||||||
basicAuth, _ := elastic1.GetBasicAuth(&oldCfg)
|
basicAuth, _ := elastic1.GetBasicAuth(&oldCfg)
|
||||||
if basicAuth != nil {
|
if basicAuth != nil {
|
||||||
if basicAuth.Username == request.Cluster.Username && basicAuth.Password == request.Cluster.Password {
|
if basicAuth.Username == request.Cluster.Username && basicAuth.Password.Get() == request.Cluster.Password {
|
||||||
reuseOldCred = true
|
reuseOldCred = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -528,7 +529,7 @@ func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps http
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
err = keystore.SetValue("SYSTEM_CLUSTER_PASS", []byte(cfg.BasicAuth.Password))
|
err = keystore.SetValue("SYSTEM_CLUSTER_PASS", []byte(cfg.BasicAuth.Password.Get()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue