fix multi node tasks

This commit is contained in:
medcl 2023-10-25 11:38:10 +08:00
parent 36cbc265f8
commit 4415e63aa4
1 changed files with 19 additions and 5 deletions

View File

@ -155,7 +155,8 @@ func getAgentIngestConfigs(instance string, items map[string]BindingItem) (strin
} }
if v.ClusterID == "" { if v.ClusterID == "" {
panic("cluster id is empty") log.Error("cluster id is empty")
continue
} }
metadata := elastic.GetMetadata(v.ClusterID) metadata := elastic.GetMetadata(v.ClusterID)
@ -165,7 +166,6 @@ func getAgentIngestConfigs(instance string, items map[string]BindingItem) (strin
} }
var clusterLevelEnabled = false var clusterLevelEnabled = false
var nodeLevelEnabled = true var nodeLevelEnabled = true
var clusterEndPoint = metadata.Config.GetAnyEndpoint()
var username = "" var username = ""
var password = "" var password = ""
@ -173,12 +173,14 @@ func getAgentIngestConfigs(instance string, items map[string]BindingItem) (strin
if metadata.Config.AgentCredentialID != "" { if metadata.Config.AgentCredentialID != "" {
credential, err := common2.GetCredential(metadata.Config.AgentCredentialID) credential, err := common2.GetCredential(metadata.Config.AgentCredentialID)
if err != nil { if err != nil {
panic(err) log.Error(err)
continue
} }
var dv interface{} var dv interface{}
dv, err = credential.Decode() dv, err = credential.Decode()
if err != nil { if err != nil {
panic(err) log.Error(err)
continue
} }
if auth, ok := dv.(model.BasicAuth); ok { if auth, ok := dv.(model.BasicAuth); ok {
username = auth.Username username = auth.Username
@ -186,19 +188,31 @@ func getAgentIngestConfigs(instance string, items map[string]BindingItem) (strin
} }
} }
if v.PublishAddress==""{
log.Errorf("publish address is empty: %v",v.NodeUUID)
continue
}
nodeEndPoint := metadata.PrepareEndpoint(v.PublishAddress)
if v.Updated > latestVersion { if v.Updated > latestVersion {
latestVersion = v.Updated latestVersion = v.Updated
} }
taskID:=v.ClusterID+"_"+v.NodeUUID
buffer.Write([]byte(fmt.Sprintf("\n - name: \"%v\"\n path: ./config/task_config.tpl\n "+ buffer.Write([]byte(fmt.Sprintf("\n - name: \"%v\"\n path: ./config/task_config.tpl\n "+
"variable:\n "+ "variable:\n "+
"TASK_ID: %v\n "+
"CLUSTER_ID: %v\n "+ "CLUSTER_ID: %v\n "+
"NODE_UUID: %v\n "+
"CLUSTER_ENDPOINT: [\"%v\"]\n "+ "CLUSTER_ENDPOINT: [\"%v\"]\n "+
"CLUSTER_USERNAME: \"%v\"\n "+ "CLUSTER_USERNAME: \"%v\"\n "+
"CLUSTER_PASSWORD: \"%v\"\n "+ "CLUSTER_PASSWORD: \"%v\"\n "+
"CLUSTER_LEVEL_TASKS_ENABLED: %v\n "+ "CLUSTER_LEVEL_TASKS_ENABLED: %v\n "+
"NODE_LEVEL_TASKS_ENABLED: %v\n "+ "NODE_LEVEL_TASKS_ENABLED: %v\n "+
"NODE_LOGS_PATH: \"%v\"\n\n\n", v.NodeUUID, v.ClusterID, clusterEndPoint, username, password, clusterLevelEnabled, nodeLevelEnabled, v.PathLogs))) "NODE_LOGS_PATH: \"%v\"\n\n\n", taskID, taskID,
v.ClusterID,v.NodeUUID, nodeEndPoint, username, password, clusterLevelEnabled, nodeLevelEnabled, v.PathLogs)))
} }
hash := util.MD5digest(buffer.String()) hash := util.MD5digest(buffer.String())