diff --git a/api/internal/logic/cloud/registerclusterlogic.go b/api/internal/logic/cloud/registerclusterlogic.go index 93916bfe..f6aec99c 100644 --- a/api/internal/logic/cloud/registerclusterlogic.go +++ b/api/internal/logic/cloud/registerclusterlogic.go @@ -28,11 +28,11 @@ func NewRegisterClusterLogic(ctx context.Context, svcCtx *svc.ServiceContext) *R } func (l *RegisterClusterLogic) RegisterCluster(req *types.RegisterClusterReq) (*types.CloudResp, error) { - var ms []models.ScParticipantPhyInfo + var phyInfos []models.ScParticipantPhyInfo var resp types.CloudResp - l.svcCtx.DbEngin.Raw("select * from sc_participant_phy_info where `name` = ?", req.Name).Scan(&ms) - if len(ms) != 0 { + l.svcCtx.DbEngin.Raw("select * from sc_participant_phy_info where `name` = ?", req.Name).Scan(&phyInfos) + if len(phyInfos) != 0 { resp.Code = "400" resp.Msg = "cluster name already exist" resp.Data = "" @@ -40,7 +40,6 @@ func (l *RegisterClusterLogic) RegisterCluster(req *types.RegisterClusterReq) (* } participant := models.ScParticipantPhyInfo{} - participant.Token = req.Token participant.Name = req.Name participant.Address = req.Address @@ -50,10 +49,21 @@ func (l *RegisterClusterLogic) RegisterCluster(req *types.RegisterClusterReq) (* participant.CreatedTime = time.Now() participant.UpdatedTime = time.Now() + labelInfo := models.ScParticipantLabelInfo{} + labelInfo.Id = utils.GenSnowflakeID() + labelInfo.ParticipantId = participant.Id + labelInfo.CreatedTime = time.Now() + labelInfo.Key = "cloud" + labelInfo.Value = "sealos" + tx := l.svcCtx.DbEngin.Create(&participant) if tx.Error != nil { return nil, tx.Error } + tx2 := l.svcCtx.DbEngin.Create(&labelInfo) + if tx2.Error != nil { + return nil, tx.Error + } resp.Code = string(200) resp.Msg = "success" resp.Data = "participantId:" + strconv.FormatInt(participant.Id, 10) diff --git a/api/internal/mqs/ScheduleCloud.go b/api/internal/mqs/ScheduleCloud.go index d2e278a2..437d0e9e 100644 --- a/api/internal/mqs/ScheduleCloud.go +++ b/api/internal/mqs/ScheduleCloud.go @@ -43,6 +43,8 @@ func (l *CloudMq) Consume(val string) error { if err != nil { return err } + + //通过标签匹配筛选出集群范围 schdl.MatchLabels() // 调度算法 diff --git a/api/internal/types/types.go b/api/internal/types/types.go index daf60f54..6f8abfd2 100644 --- a/api/internal/types/types.go +++ b/api/internal/types/types.go @@ -3340,7 +3340,6 @@ type RegisterClusterReq struct { Name string `form:"name"` // 名称 Address string `form:"address"` // 地址 Token string `form:"token"` // 数算集群token - Type string `form:"type"` // 参与者类型:CLOUD-数算集群;AI-智算集群;HPC-超算集群 MetricsUrl string `form:"metricsUrl"` //监控url } @@ -3348,22 +3347,6 @@ type DeleteClusterReq struct { Name string `form:"name"` // 名称 } -type ListParticipantResp struct { - Code string `json:"code"` - Msg string `json:"msg"` - Data []ParticipantResp `json:"data"` -} - -type ParticipantResp struct { - Id int64 `json:"id"` // id - Name string `json:"name"` // 名称 - Address string `json:"address"` // 地址 - Token string `json:"token"` // 数算集群token - Type string `json:"type"` // 参与者类型:CLOUD-数算集群;AI-智算集群;HPC-超算集群 - ParticipantId int64 `json:"name"` // participant id - MetricsUrl string `json:"metricsUrl"` //监控url -} - type CloudResp struct { Code string `json:"code"` Msg string `json:"msg"` diff --git a/api/pkg/response/TaskInfo.go b/api/pkg/response/TaskInfo.go index 85b60267..861db255 100644 --- a/api/pkg/response/TaskInfo.go +++ b/api/pkg/response/TaskInfo.go @@ -18,10 +18,11 @@ import "fmt" type TaskInfo struct { TaskId int64 `json:"taskId,optional"` - NsID string `json:"nsID"` + NsID string `json:"nsID"` //云际平台传入namespace TaskType string `json:"taskType,optional"` MatchLabels map[string]string `json:"matchLabels"` - ParticipantId int64 `json:"participantId"` + ParticipantId int64 `json:"participantId,optional"` //湘江预留字段 + Clusters []string `json:"clusters,optional"` //云际平台传入集群名称列表 TenantId int64 `json:"tenantId"` Metadata interface{} `json:"metadata"` } diff --git a/pkg/scheduler/aiScheduler.go b/pkg/scheduler/aiScheduler.go index f0eb4cfa..f3ddb7a9 100644 --- a/pkg/scheduler/aiScheduler.go +++ b/pkg/scheduler/aiScheduler.go @@ -30,7 +30,7 @@ func NewAiScheduler(val string) *aiScheduler { return &aiScheduler{yamlString: val} } -func (as *aiScheduler) getNewStructForDb(task *response.TaskInfo, participantId int64) (interface{}, error) { +func (as *aiScheduler) getNewStructForDb(task *response.TaskInfo, participantId int64, replica int32) (interface{}, error) { ai := models.Ai{ ParticipantId: participantId, TaskId: task.TaskId, diff --git a/pkg/scheduler/algo/k8sStrategy.go b/pkg/scheduler/algo/k8sStrategy.go index d166bf6f..be17d571 100644 --- a/pkg/scheduler/algo/k8sStrategy.go +++ b/pkg/scheduler/algo/k8sStrategy.go @@ -193,8 +193,13 @@ func computeHighDegree(task *Task, resourcesolution []int, providerList []*Provi magnitude2 := mat.Norm(nowLeft, 2) // 计算余弦相似度 - cosine_similarity := dot_product / (magnitude1 * magnitude2) - highDegreeSum += cosine_similarity + //临时处理被除数为0的特殊情况 + var cosineSimilarity = 0.0 + if magnitude1 != 0 && magnitude2 != 0 { + cosineSimilarity = dot_product / (magnitude1 * magnitude2) + } + + highDegreeSum += cosineSimilarity } return highDegreeSum / float64(len(providerList)) diff --git a/pkg/scheduler/cloudScheduler.go b/pkg/scheduler/cloudScheduler.go index ade1c5f3..cc7c9c77 100644 --- a/pkg/scheduler/cloudScheduler.go +++ b/pkg/scheduler/cloudScheduler.go @@ -37,15 +37,6 @@ func NewCloudScheduler() *cloudScheduler { } func (cs *cloudScheduler) pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Strategy, error) { - ////参数为空,返回 nil - //if len(providers) == 0 || task == nil { - // return nil, errors.New("算法获取参数为空") - //} - // - ////仅有一个provider,返回nil - //if len(providers) == 1 { - // return nil, nil - //} //调度算法 strategy := algo.NewK8sStrategy(task, providers...) @@ -56,8 +47,9 @@ func (cs *cloudScheduler) pickOptimalStrategy(task *algo.Task, providers ...*alg return taskResult.MaxscoreStrategy, nil } -func (cs *cloudScheduler) getNewStructForDb(task *response.TaskInfo, participantId int64) (interface{}, error) { +func (cs *cloudScheduler) getNewStructForDb(task *response.TaskInfo, participantId int64, replica int32) (interface{}, error) { bytes, err := json.Marshal(task.Metadata) + //replica 需要替换到yaml中 if err != nil { return nil, err } @@ -118,7 +110,8 @@ func (cs *cloudScheduler) genTaskAndProviders(task *response.TaskInfo, dbEngin * providerList = append(providerList, provider) } - t := algo.NewTask(0, 1, 2, 75120000, 301214500, 1200, 2, 6, 2000) + replicas := task.Metadata.(map[string]interface{})["spec"].(map[string]interface{})["replicas"].(float64) + t := algo.NewTask(0, int(replicas), 2, 75120000, 301214500, 1200, 2, 6, 2000) return t, providerList } diff --git a/pkg/scheduler/common.go b/pkg/scheduler/common.go index ef6a78f5..46ce7d67 100644 --- a/pkg/scheduler/common.go +++ b/pkg/scheduler/common.go @@ -23,7 +23,7 @@ import ( ) type scheduleService interface { - getNewStructForDb(task *response.TaskInfo, participantId int64) (interface{}, error) + getNewStructForDb(task *response.TaskInfo, participantId int64, replica int32) (interface{}, error) pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Strategy, error) genTaskAndProviders(task *response.TaskInfo, dbEngin *gorm.DB) (*algo.Task, []*algo.Provider) } diff --git a/pkg/scheduler/hpcScheduler.go b/pkg/scheduler/hpcScheduler.go index 10bcb78f..99850118 100644 --- a/pkg/scheduler/hpcScheduler.go +++ b/pkg/scheduler/hpcScheduler.go @@ -31,7 +31,7 @@ func NewHpcScheduler(val string) *hpcScheduler { return &hpcScheduler{yamlString: val} } -func (h *hpcScheduler) getNewStructForDb(task *response.TaskInfo, participantId int64) (interface{}, error) { +func (h *hpcScheduler) getNewStructForDb(task *response.TaskInfo, participantId int64, replica int32) (interface{}, error) { hpc := models.Hpc{} utils.Convert(task.Metadata, &hpc) hpc.Id = utils.GenSnowflakeID() diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 0dc950b3..44bd7949 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -15,7 +15,6 @@ package scheduler import ( - "context" "encoding/json" "github.com/pkg/errors" "github.com/zeromicro/go-zero/core/logx" @@ -47,27 +46,42 @@ func NewScheduler(scheduleService scheduleService, val string, dbEngin *gorm.DB, return &scheduler{task: task, scheduleService: scheduleService, dbEngin: dbEngin, participantRpc: participantRpc, result: make(map[ParticipantId]Replicas, 0)}, nil } +func (s *scheduler) SepcifyClusters() { + +} + func (s *scheduler) MatchLabels() { - // 已指定 ParticipantId + + var ids []int64 + count := 0 + + // 已指定 ParticipantId 直接不走标签匹配 if s.task.ParticipantId != 0 { return } - var ids []int64 - count := 0 + // 如果已指定集群名,通过数据库查询后返回p端ip列表 + if len(s.task.Clusters) != 0 { + for i, _ := range s.task.Clusters { + clusterName := s.task.Clusters[i] + var participantId int64 + s.dbEngin.Raw("select id from sc_participant_phy_info where `name` = ?", clusterName).Scan(&participantId) + s.participantIds = append(s.participantIds, participantId) + } + return + } + + //如果均未指定,则通过标签匹配 for key := range s.task.MatchLabels { var participantIds []int64 s.dbEngin.Raw("select participant_id from sc_participant_label_info where `key` = ? and value = ?", key, s.task.MatchLabels[key]).Scan(&participantIds) if count == 0 { ids = participantIds } - //if len(participantId) == 0 || len(ids) == 0 { - // return nil, nil - //} ids = intersect(ids, participantIds) count++ } - s.participantIds = micsSlice(ids, 1) + s.participantIds = ids } func (s *scheduler) AssignAndSchedule() error { @@ -75,18 +89,19 @@ func (s *scheduler) AssignAndSchedule() error { if s.task.ParticipantId != 0 { return nil } - - // 标签匹配后,未找到ParticipantIds + // 标签匹配以及后,未找到ParticipantIds if len(s.participantIds) == 0 { return errors.New("未找到匹配的ParticipantIds") } - // ParticipantIds 返回唯一值 + // 指定或者标签匹配的结果只有一个集群,给任务信息指定 if len(s.participantIds) == 1 { - if !s.checkIfParticipantAvailable(ParticipantId(s.participantIds[0])) { - return errors.Errorf("集群 %d 不可用", s.participantIds[0]) - } s.task.ParticipantId = s.participantIds[0] + replicas := s.task.Metadata.(map[string]interface{})["spec"].(map[string]interface{})["replicas"].(float64) + result := make(map[ParticipantId]Replicas) + result[ParticipantId(s.participantIds[0])] = Replicas(replicas) + s.result = result + return nil } @@ -98,9 +113,6 @@ func (s *scheduler) AssignAndSchedule() error { //集群数量不满足,指定到标签匹配后第一个集群 if len(providerList) < 2 { - if !s.checkIfParticipantAvailable(ParticipantId(s.participantIds[0])) { - return errors.Errorf("集群 %d 不可用", s.participantIds[0]) - } s.task.ParticipantId = s.participantIds[0] return nil } @@ -121,18 +133,18 @@ func (s *scheduler) AssignAndSchedule() error { } func (s *scheduler) SaveToDb() error { - if s.task.ParticipantId == 0 { - return errors.New("participantId 为空") - } - structForDb, err := s.scheduleService.getNewStructForDb(s.task, s.task.ParticipantId) - if err != nil { - return err - } - tx := s.dbEngin.Create(structForDb) - if tx.Error != nil { - // todo 保存失败数据 - logx.Error(tx.Error) - return tx.Error + + for key, value := range s.result { + structForDb, err := s.scheduleService.getNewStructForDb(s.task, int64(key), int32(value)) + if err != nil { + return err + } + + tx := s.dbEngin.Create(structForDb) + if tx.Error != nil { + logx.Error(tx.Error) + return tx.Error + } } return nil } @@ -144,66 +156,9 @@ func (s *scheduler) obtainParamsforStrategy() (*algo.Task, []*algo.Provider, err return nil, nil, errors.New("获取集群失败") } - // 过滤可用集群 - err := s.filterAvailableProviders(&providerList) - if err != nil { - return nil, nil, err - } - return task, providerList, nil } -func (s *scheduler) checkIfParticipantAvailable(id ParticipantId) bool { - - workingIds, err := s.getAvailableParticipantIds() - if err != nil { - return false - } - - return contains(workingIds, int64(id)) -} - -func (s *scheduler) getAvailableParticipantIds() ([]int64, error) { - - resp, err := s.participantRpc.ListParticipant(context.Background(), nil) - if err != nil { - return nil, err - } - - if resp.Code != 200 { - return nil, errors.New("集群列表查询失败") - } - - var workingIds []int64 - for _, e := range resp.Data { - if e.ClientState == "UNKNOWN" { - continue - } - workingIds = append(workingIds, e.ParticipantId) - } - - return workingIds, nil -} - -func (s *scheduler) filterAvailableProviders(providerList *[]*algo.Provider) error { - - workingIds, err := s.getAvailableParticipantIds() - if err != nil { - return err - } - - var tempList []*algo.Provider - for _, provider := range *providerList { - if contains(workingIds, provider.Pid) && contains(s.participantIds, provider.Pid) { - tempList = append(tempList, provider) - } - } - - *providerList = tempList - - return nil -} - func (s *scheduler) assignReplicasToResult(strategy *algo.Strategy, providerList []*algo.Provider) error { if len(strategy.Tasksolution) == 0 { @@ -223,12 +178,3 @@ func (s *scheduler) assignReplicasToResult(strategy *algo.Strategy, providerList return nil } - -func contains(s []int64, e int64) bool { - for _, a := range s { - if a == e { - return true - } - } - return false -}