diff --git a/api/internal/mqs/ScheduleAi.go b/api/internal/mqs/ScheduleAi.go index 45ef84df..311bee00 100644 --- a/api/internal/mqs/ScheduleAi.go +++ b/api/internal/mqs/ScheduleAi.go @@ -18,8 +18,9 @@ import ( "context" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler" - "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/proxy/collector" - "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/proxy/executor" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/schedulers" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/collector" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/executor" ) /* @@ -46,7 +47,7 @@ func NewAiMq(ctx context.Context, svcCtx *svc.ServiceContext) *AiQueue { func (l *AiQueue) Consume(val string) error { // 接受消息, 根据标签筛选过滤 - aiSchdl, _ := scheduler.NewAiScheduler(val, l.scheduler) + aiSchdl, _ := schedulers.NewAiScheduler(val, l.scheduler) // 调度算法 err := l.scheduler.AssignAndSchedule(aiSchdl) diff --git a/api/internal/mqs/ScheduleCloud.go b/api/internal/mqs/ScheduleCloud.go index c40a5e9b..cb7c9d7b 100644 --- a/api/internal/mqs/ScheduleCloud.go +++ b/api/internal/mqs/ScheduleCloud.go @@ -18,6 +18,7 @@ import ( "context" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/schedulers" ) /* @@ -37,7 +38,7 @@ func NewCloudMq(ctx context.Context, svcCtx *svc.ServiceContext) *CloudMq { func (l *CloudMq) Consume(val string) error { // 接受消息, 根据标签筛选过滤 - cloudScheduler := scheduler.NewCloudScheduler() + cloudScheduler := schedulers.NewCloudScheduler() schdl, err := scheduler.NewScheduler(cloudScheduler, val, l.svcCtx.DbEngin, l.svcCtx.ParticipantRpc) if err != nil { return err diff --git a/pkg/scheduler/common.go b/pkg/scheduler/common/common.go similarity index 76% rename from pkg/scheduler/common.go rename to pkg/scheduler/common/common.go index 7b2cab8e..fb71cc8c 100644 --- a/pkg/scheduler/common.go +++ b/pkg/scheduler/common/common.go @@ -12,23 +12,23 @@ */ -package scheduler +package common import ( "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response" - "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/strategies" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/strategy" "math/rand" "time" ) -type scheduleService interface { - getNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) - pickOptimalStrategy() (strategies.Strategy, error) - assignTask(clusters []*strategies.AssignedCluster) error +type SubSchedule interface { + GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) + PickOptimalStrategy() (strategy.Strategy, error) + AssignTask(clusters []*strategy.AssignedCluster) error } // 求交集 -func intersect(slice1, slice2 []int64) []int64 { +func Intersect(slice1, slice2 []int64) []int64 { m := make(map[int64]int) nn := make([]int64, 0) for _, v := range slice1 { @@ -44,7 +44,7 @@ func intersect(slice1, slice2 []int64) []int64 { return nn } -func micsSlice(origin []int64, count int) []int64 { +func MicsSlice(origin []int64, count int) []int64 { tmpOrigin := make([]int64, len(origin)) copy(tmpOrigin, origin) //一定要seed diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 3efebaba..1407568e 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -19,9 +19,10 @@ import ( "github.com/pkg/errors" "github.com/zeromicro/go-zero/core/logx" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/common" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/database" - "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/proxy/collector" - "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/proxy/executor" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/collector" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/executor" "gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/client/participantservice" "gorm.io/gorm" "sigs.k8s.io/yaml" @@ -31,26 +32,26 @@ import ( type Scheduler struct { task *response.TaskInfo participantIds []int64 - scheduleService scheduleService + subSchedule common.SubSchedule dbEngin *gorm.DB result []string //pID:子任务yamlstring 键值对 participantRpc participantservice.ParticipantService - resourceCollectors []collector.ResourceCollector - storages []database.Storage - aiExecutor map[string]executor.Executor + ResourceCollectors []collector.ResourceCollector + Storages []database.Storage + AiExecutor map[string]executor.Executor } -func NewScheduler(scheduleService scheduleService, val string, dbEngin *gorm.DB, participantRpc participantservice.ParticipantService) (*Scheduler, error) { +func NewScheduler(subSchedule common.SubSchedule, val string, dbEngin *gorm.DB, participantRpc participantservice.ParticipantService) (*Scheduler, error) { var task *response.TaskInfo err := json.Unmarshal([]byte(val), &task) if err != nil { return nil, errors.New("create scheduler failed : " + err.Error()) } - return &Scheduler{task: task, scheduleService: scheduleService, dbEngin: dbEngin, participantRpc: participantRpc}, nil + return &Scheduler{task: task, subSchedule: subSchedule, dbEngin: dbEngin, participantRpc: participantRpc}, nil } func NewScheduler2(resourceCollectors []collector.ResourceCollector, storages []database.Storage, aiExecutor map[string]executor.Executor) *Scheduler { - return &Scheduler{resourceCollectors: resourceCollectors, storages: storages, aiExecutor: aiExecutor} + return &Scheduler{ResourceCollectors: resourceCollectors, Storages: storages, AiExecutor: aiExecutor} } func (s *Scheduler) SpecifyClusters() { @@ -90,7 +91,7 @@ func (s *Scheduler) MatchLabels() { if count == 0 { ids = participantIds } - ids = intersect(ids, participantIds) + ids = common.Intersect(ids, participantIds) count++ } s.participantIds = ids @@ -120,7 +121,7 @@ func (s *Scheduler) TempAssign() error { return nil } -func (s *Scheduler) AssignAndSchedule(ss scheduleService) error { +func (s *Scheduler) AssignAndSchedule(ss common.SubSchedule) error { //// 已指定 ParticipantId //if s.task.ParticipantId != 0 { // return nil @@ -141,7 +142,7 @@ func (s *Scheduler) AssignAndSchedule(ss scheduleService) error { // return nil //} - strategy, err := ss.pickOptimalStrategy() + strategy, err := ss.PickOptimalStrategy() if err != nil { return err } @@ -157,7 +158,7 @@ func (s *Scheduler) AssignAndSchedule(ss scheduleService) error { // return nil //} - err = ss.assignTask(clusters) + err = ss.AssignTask(clusters) if err != nil { return err } @@ -170,7 +171,7 @@ func (s *Scheduler) SaveToDb() error { for _, participantId := range s.participantIds { for _, resource := range s.task.Metadata { - structForDb, err := s.scheduleService.getNewStructForDb(s.task, resource, participantId) + structForDb, err := s.subSchedule.GetNewStructForDb(s.task, resource, participantId) if err != nil { return err } diff --git a/pkg/scheduler/aiScheduler.go b/pkg/scheduler/schedulers/aiScheduler.go similarity index 76% rename from pkg/scheduler/aiScheduler.go rename to pkg/scheduler/schedulers/aiScheduler.go index d5771faf..feef4e93 100644 --- a/pkg/scheduler/aiScheduler.go +++ b/pkg/scheduler/schedulers/aiScheduler.go @@ -12,30 +12,31 @@ */ -package scheduler +package schedulers import ( "errors" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/algorithm/providerPricing" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/entity" - "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/proxy/collector" - "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/strategies" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/collector" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/strategy" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils" ) type AiScheduler struct { yamlString string task *response.TaskInfo - *Scheduler + *scheduler.Scheduler } -func NewAiScheduler(val string, scheduler *Scheduler) (*AiScheduler, error) { +func NewAiScheduler(val string, scheduler *scheduler.Scheduler) (*AiScheduler, error) { return &AiScheduler{yamlString: val, Scheduler: scheduler}, nil } -func (as *AiScheduler) getNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) { +func (as *AiScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) { ai := models.Ai{ ParticipantId: participantId, TaskId: task.TaskId, @@ -46,7 +47,7 @@ func (as *AiScheduler) getNewStructForDb(task *response.TaskInfo, resource strin return ai, nil } -func (as *AiScheduler) pickOptimalStrategy() (strategies.Strategy, error) { +func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) { resources, err := as.findProvidersWithResource() if err != nil { return nil, err @@ -60,7 +61,7 @@ func (as *AiScheduler) pickOptimalStrategy() (strategies.Strategy, error) { Name: resource.Name, }) } - strategy := strategies.NewReplicationStrategy(nil, 0) + strategy := strategy.NewReplicationStrategy(nil, 0) return strategy, nil } @@ -68,7 +69,7 @@ func (as *AiScheduler) pickOptimalStrategy() (strategies.Strategy, error) { if err != nil { return nil, nil } - strategy := strategies.NewPricingStrategy(task, providerList...) + strategy := strategy.NewPricingStrategy(task, providerList...) return strategy, nil } @@ -77,7 +78,7 @@ func (as *AiScheduler) genTaskAndProviders() (*providerPricing.Task, []*provider return nil, nil } -func (as *AiScheduler) assignTask(clusters []*strategies.AssignedCluster) error { +func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) error { if clusters == nil { return errors.New("clusters is nil") } @@ -87,7 +88,7 @@ func (as *AiScheduler) assignTask(clusters []*strategies.AssignedCluster) error func (as *AiScheduler) findProvidersWithResource() ([]*collector.ResourceSpecs, error) { var resourceSpecs []*collector.ResourceSpecs - for _, resourceCollector := range as.resourceCollectors { + for _, resourceCollector := range as.ResourceCollectors { spec, err := resourceCollector.GetResourceSpecs() if err != nil { continue diff --git a/pkg/scheduler/cloudScheduler.go b/pkg/scheduler/schedulers/cloudScheduler.go similarity index 90% rename from pkg/scheduler/cloudScheduler.go rename to pkg/scheduler/schedulers/cloudScheduler.go index 95121182..89d46795 100644 --- a/pkg/scheduler/cloudScheduler.go +++ b/pkg/scheduler/schedulers/cloudScheduler.go @@ -12,7 +12,7 @@ */ -package scheduler +package schedulers import ( "bytes" @@ -20,7 +20,7 @@ import ( "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/algorithm/providerPricing" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/database" - "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/strategies" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/strategy" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils" "io" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -37,17 +37,17 @@ func NewCloudScheduler() *CloudScheduler { return &CloudScheduler{} } -func (cs *CloudScheduler) pickOptimalStrategy() (strategies.Strategy, error) { +func (cs *CloudScheduler) PickOptimalStrategy() (strategy.Strategy, error) { task, providerList, err := cs.genTaskAndProviders() if err != nil { return nil, nil } //调度算法 - strategy := strategies.NewPricingStrategy(task, providerList...) + strategy := strategy.NewPricingStrategy(task, providerList...) return strategy, nil } -func (cs *CloudScheduler) getNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) { +func (cs *CloudScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) { cloud := cs.UnMarshalK8sStruct(resource, task.TaskId, task.NsID) cloud.Id = utils.GenSnowflakeID() cloud.NsID = task.NsID @@ -117,6 +117,6 @@ func (cs *CloudScheduler) genTaskAndProviders() (*providerPricing.Task, []*provi return nil, providerList, nil } -func (cs *CloudScheduler) assignTask(clusters []*strategies.AssignedCluster) error { +func (cs *CloudScheduler) AssignTask(clusters []*strategy.AssignedCluster) error { return nil } diff --git a/pkg/scheduler/hpcScheduler.go b/pkg/scheduler/schedulers/hpcScheduler.go similarity index 82% rename from pkg/scheduler/hpcScheduler.go rename to pkg/scheduler/schedulers/hpcScheduler.go index af6416e6..92d49d84 100644 --- a/pkg/scheduler/hpcScheduler.go +++ b/pkg/scheduler/schedulers/hpcScheduler.go @@ -12,14 +12,14 @@ */ -package scheduler +package schedulers import ( "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/constants" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/algorithm/providerPricing" - "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/strategies" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/strategy" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils" ) @@ -31,7 +31,7 @@ func NewHpcScheduler(val string) *HpcScheduler { return &HpcScheduler{yamlString: val} } -func (h *HpcScheduler) getNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) { +func (h *HpcScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) { hpc := models.Hpc{} utils.Convert(task.Metadata, &hpc) hpc.Id = utils.GenSnowflakeID() @@ -42,7 +42,7 @@ func (h *HpcScheduler) getNewStructForDb(task *response.TaskInfo, resource strin return hpc, nil } -func (h *HpcScheduler) pickOptimalStrategy() (strategies.Strategy, error) { +func (h *HpcScheduler) PickOptimalStrategy() (strategy.Strategy, error) { return nil, nil } @@ -50,6 +50,6 @@ func (h *HpcScheduler) genTaskAndProviders(task *response.TaskInfo) (*providerPr return nil, nil } -func (h *HpcScheduler) assignTask(clusters []*strategies.AssignedCluster) error { +func (h *HpcScheduler) AssignTask(clusters []*strategy.AssignedCluster) error { return nil } diff --git a/pkg/scheduler/schedulers/vmScheduler.go b/pkg/scheduler/schedulers/vmScheduler.go new file mode 100644 index 00000000..ad4b7de0 --- /dev/null +++ b/pkg/scheduler/schedulers/vmScheduler.go @@ -0,0 +1,24 @@ +package schedulers + +import ( + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/strategy" +) + +type VmScheduler struct { +} + +func (v VmScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) { + //TODO implement me + panic("implement me") +} + +func (v VmScheduler) PickOptimalStrategy() (strategy.Strategy, error) { + //TODO implement me + panic("implement me") +} + +func (v VmScheduler) AssignTask(clusters []*strategy.AssignedCluster) error { + //TODO implement me + panic("implement me") +} diff --git a/pkg/scheduler/proxy/collector/acCollector.go b/pkg/scheduler/service/collector/acCollector.go similarity index 100% rename from pkg/scheduler/proxy/collector/acCollector.go rename to pkg/scheduler/service/collector/acCollector.go diff --git a/pkg/scheduler/proxy/collector/collector.go b/pkg/scheduler/service/collector/collector.go similarity index 100% rename from pkg/scheduler/proxy/collector/collector.go rename to pkg/scheduler/service/collector/collector.go diff --git a/pkg/scheduler/proxy/executor/acExecutor.go b/pkg/scheduler/service/executor/acExecutor.go similarity index 100% rename from pkg/scheduler/proxy/executor/acExecutor.go rename to pkg/scheduler/service/executor/acExecutor.go diff --git a/pkg/scheduler/proxy/executor/executor.go b/pkg/scheduler/service/executor/executor.go similarity index 100% rename from pkg/scheduler/proxy/executor/executor.go rename to pkg/scheduler/service/executor/executor.go diff --git a/pkg/scheduler/strategies/replication.go b/pkg/scheduler/strategy/replication.go similarity index 97% rename from pkg/scheduler/strategies/replication.go rename to pkg/scheduler/strategy/replication.go index 2a699c5a..08d7e29f 100644 --- a/pkg/scheduler/strategies/replication.go +++ b/pkg/scheduler/strategy/replication.go @@ -1,4 +1,4 @@ -package strategies +package strategy import ( "github.com/pkg/errors" diff --git a/pkg/scheduler/strategies/resourcePricing.go b/pkg/scheduler/strategy/resourcePricing.go similarity index 99% rename from pkg/scheduler/strategies/resourcePricing.go rename to pkg/scheduler/strategy/resourcePricing.go index 5c620fa3..f909f62f 100644 --- a/pkg/scheduler/strategies/resourcePricing.go +++ b/pkg/scheduler/strategy/resourcePricing.go @@ -12,7 +12,7 @@ */ -package strategies +package strategy import ( "errors" diff --git a/pkg/scheduler/strategies/staticWeight.go b/pkg/scheduler/strategy/staticWeight.go similarity index 91% rename from pkg/scheduler/strategies/staticWeight.go rename to pkg/scheduler/strategy/staticWeight.go index 8e08d219..3aa5d769 100644 --- a/pkg/scheduler/strategies/staticWeight.go +++ b/pkg/scheduler/strategy/staticWeight.go @@ -1,4 +1,4 @@ -package strategies +package strategy type StaticWeightStrategy struct { // TODO: add fields diff --git a/pkg/scheduler/strategies/strategy.go b/pkg/scheduler/strategy/strategy.go similarity index 90% rename from pkg/scheduler/strategies/strategy.go rename to pkg/scheduler/strategy/strategy.go index e265acdd..1502dc21 100644 --- a/pkg/scheduler/strategies/strategy.go +++ b/pkg/scheduler/strategy/strategy.go @@ -1,4 +1,4 @@ -package strategies +package strategy type Strategy interface { Schedule() ([]*AssignedCluster, error)