From b09a45bd5bfcf8e0ed44318da5afe4a0af2d2044 Mon Sep 17 00:00:00 2001 From: tzwang Date: Mon, 1 Apr 2024 11:22:09 +0800 Subject: [PATCH 1/2] added scheduler api aioptions Former-commit-id: 634213a6182c1dc2e17417df9ceaf1289743e2ca --- api/desc/schedule/pcm-schedule.api | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/api/desc/schedule/pcm-schedule.api b/api/desc/schedule/pcm-schedule.api index d9946a33..0e425b2e 100644 --- a/api/desc/schedule/pcm-schedule.api +++ b/api/desc/schedule/pcm-schedule.api @@ -20,10 +20,18 @@ type ( } AiOption { + TaskName string `json:"taskName"` + AiClusterId string `json:"aiClusterId,optional"` ResourceType string `json:"resourceType"` + Tops float64 `json:"Tops,optional"` TaskType string `json:"taskType"` Datasets string `json:"datasets"` + Algorithm string `json:"algorithm"` Strategy string `json:"strategy"` + StaticWeightMap map[string]int32 `json:"staticWeightMap,optional"` + Params []string `json:"params,optional"` + Envs []string `json:"envs,optional"` + Cmd string `json:"cmd,optional"` } AiResourceTypesResp { From 69757c9f5b965222dc71e38b9be07977a5a51fa4 Mon Sep 17 00:00:00 2001 From: tzwang Date: Mon, 1 Apr 2024 17:53:42 +0800 Subject: [PATCH 2/2] updated return type of aischeduler submit Former-commit-id: c8de6c78b18e2159f4c6ea4abcac77221f7eae48 --- api/desc/schedule/pcm-schedule.api | 2 -- .../logic/schedule/schedulesubmitlogic.go | 2 +- api/internal/mqs/ScheduleAi.go | 2 +- api/internal/scheduler/scheduler.go | 14 ++++---- .../scheduler/schedulers/aiScheduler.go | 33 ++++++++++++++++--- .../scheduler/schedulers/cloudScheduler.go | 4 +-- .../scheduler/schedulers/hpcScheduler.go | 4 +-- .../scheduler/schedulers/vmScheduler.go | 2 +- 8 files changed, 43 insertions(+), 20 deletions(-) diff --git a/api/desc/schedule/pcm-schedule.api b/api/desc/schedule/pcm-schedule.api index 0e425b2e..72ba71a7 100644 --- a/api/desc/schedule/pcm-schedule.api +++ b/api/desc/schedule/pcm-schedule.api @@ -14,8 +14,6 @@ type ( ScheduleResp { Success bool `json:"success"` - TaskId string `json:"taskId"` - ClusterId string `json:"clusterId"` ErrorMsg string `json:"errorMsg"` } diff --git a/api/internal/logic/schedule/schedulesubmitlogic.go b/api/internal/logic/schedule/schedulesubmitlogic.go index fc7469f9..a4704def 100644 --- a/api/internal/logic/schedule/schedulesubmitlogic.go +++ b/api/internal/logic/schedule/schedulesubmitlogic.go @@ -44,7 +44,7 @@ func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *type return nil, err } - err = l.svcCtx.Scheduler.AssignAndSchedule(aiSchdl) + _, err = l.svcCtx.Scheduler.AssignAndSchedule(aiSchdl) if err != nil { return nil, err } diff --git a/api/internal/mqs/ScheduleAi.go b/api/internal/mqs/ScheduleAi.go index 61713ad2..2e47695a 100644 --- a/api/internal/mqs/ScheduleAi.go +++ b/api/internal/mqs/ScheduleAi.go @@ -40,7 +40,7 @@ func (l *AiQueue) Consume(val string) error { aiSchdl, _ := schedulers.NewAiScheduler(l.ctx, val, l.svcCtx.Scheduler, nil) // 调度算法 - err := l.svcCtx.Scheduler.AssignAndSchedule(aiSchdl) + _, err := l.svcCtx.Scheduler.AssignAndSchedule(aiSchdl) if err != nil { return err } diff --git a/api/internal/scheduler/scheduler.go b/api/internal/scheduler/scheduler.go index 281788e6..2ed27228 100644 --- a/api/internal/scheduler/scheduler.go +++ b/api/internal/scheduler/scheduler.go @@ -47,7 +47,7 @@ type Scheduler struct { type SubSchedule interface { GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) PickOptimalStrategy() (strategy.Strategy, error) - AssignTask(clusters []*strategy.AssignedCluster) error + AssignTask(clusters []*strategy.AssignedCluster) ([]interface{}, error) } func NewScheduler(subSchedule SubSchedule, val string, dbEngin *gorm.DB, participantRpc participantservice.ParticipantService) (*Scheduler, error) { @@ -130,7 +130,7 @@ func (s *Scheduler) TempAssign() error { return nil } -func (s *Scheduler) AssignAndSchedule(ss SubSchedule) error { +func (s *Scheduler) AssignAndSchedule(ss SubSchedule) ([]interface{}, error) { //// 已指定 ParticipantId //if s.task.ParticipantId != 0 { // return nil @@ -153,12 +153,12 @@ func (s *Scheduler) AssignAndSchedule(ss SubSchedule) error { strategy, err := ss.PickOptimalStrategy() if err != nil { - return err + return nil, err } clusters, err := strategy.Schedule() if err != nil { - return err + return nil, err } //集群数量不满足,指定到标签匹配后第一个集群 @@ -167,12 +167,12 @@ func (s *Scheduler) AssignAndSchedule(ss SubSchedule) error { // return nil //} - err = ss.AssignTask(clusters) + resp, err := ss.AssignTask(clusters) if err != nil { - return err + return nil, err } - return nil + return resp, nil } func (s *Scheduler) SaveToDb() error { diff --git a/api/internal/scheduler/schedulers/aiScheduler.go b/api/internal/scheduler/schedulers/aiScheduler.go index 024ef7ae..4b62f427 100644 --- a/api/internal/scheduler/schedulers/aiScheduler.go +++ b/api/internal/scheduler/schedulers/aiScheduler.go @@ -95,27 +95,52 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) { return nil, errors.New("no strategy has been chosen") } -func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) error { +func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) ([]interface{}, error) { if clusters == nil { - return errors.New("clusters is nil") + return nil, errors.New("clusters is nil") } + var wg sync.WaitGroup + var result []interface{} + var errs []error + var ch = make(chan interface{}, len(clusters)) + var errCh = make(chan error, len(clusters)) + executorMap := *as.AiExecutor for _, cluster := range clusters { c := cluster if cluster.Replicas == 0 { continue } + wg.Add(1) go func() { - _, err := executorMap[c.Name].Execute(as.ctx, as.option) + resp, err := executorMap[c.Name].Execute(as.ctx, as.option) if err != nil { // TODO: database operation + errCh <- err + wg.Done() + return } // TODO: database operation + ch <- resp + wg.Done() }() } + wg.Wait() - return nil + for s := range ch { + result = append(result, s) + } + + for e := range errCh { + errs = append(errs, e) + } + + if len(errs) != 0 { + return nil, errors.New("submit task failed") + } + + return result, nil } func (as *AiScheduler) findClustersWithResources() ([]*collector.ResourceStats, error) { diff --git a/api/internal/scheduler/schedulers/cloudScheduler.go b/api/internal/scheduler/schedulers/cloudScheduler.go index df5d91b5..097635ab 100644 --- a/api/internal/scheduler/schedulers/cloudScheduler.go +++ b/api/internal/scheduler/schedulers/cloudScheduler.go @@ -115,6 +115,6 @@ func (cs *CloudScheduler) genTaskAndProviders() (*providerPricing.Task, []*provi return nil, providerList, nil } -func (cs *CloudScheduler) AssignTask(clusters []*strategy.AssignedCluster) error { - return nil +func (cs *CloudScheduler) AssignTask(clusters []*strategy.AssignedCluster) ([]interface{}, error) { + return nil, nil } diff --git a/api/internal/scheduler/schedulers/hpcScheduler.go b/api/internal/scheduler/schedulers/hpcScheduler.go index 1a305302..319b8183 100644 --- a/api/internal/scheduler/schedulers/hpcScheduler.go +++ b/api/internal/scheduler/schedulers/hpcScheduler.go @@ -50,6 +50,6 @@ func (h *HpcScheduler) genTaskAndProviders(task *response.TaskInfo) (*providerPr return nil, nil } -func (h *HpcScheduler) AssignTask(clusters []*strategy.AssignedCluster) error { - return nil +func (h *HpcScheduler) AssignTask(clusters []*strategy.AssignedCluster) ([]interface{}, error) { + return nil, nil } diff --git a/api/internal/scheduler/schedulers/vmScheduler.go b/api/internal/scheduler/schedulers/vmScheduler.go index d2a3bb91..80e89c4e 100644 --- a/api/internal/scheduler/schedulers/vmScheduler.go +++ b/api/internal/scheduler/schedulers/vmScheduler.go @@ -64,7 +64,7 @@ func (vm *VmScheduler) genTaskAndProviders() (*providerPricing.Task, []*provider return nil, providerList, nil } -func (v VmScheduler) AssignTask(clusters []*strategy.AssignedCluster) error { +func (v VmScheduler) AssignTask(clusters []*strategy.AssignedCluster) ([]interface{}, error) { //TODO implement me panic("implement me") }