Merge pull request 'updated ai scheduler' (#91) from tzwang/pcm-coordinator:master into master

Former-commit-id: ef44753204a919ce9b290079a0a35faae00f176d
This commit is contained in:
tzwang 2024-04-01 17:57:04 +08:00
commit cc0b729639
8 changed files with 51 additions and 20 deletions

View File

@ -14,16 +14,22 @@ type (
ScheduleResp {
Success bool `json:"success"`
TaskId string `json:"taskId"`
ClusterId string `json:"clusterId"`
ErrorMsg string `json:"errorMsg"`
}
AiOption {
TaskName string `json:"taskName"`
AiClusterId string `json:"aiClusterId,optional"`
ResourceType string `json:"resourceType"`
Tops float64 `json:"Tops,optional"`
TaskType string `json:"taskType"`
Datasets string `json:"datasets"`
Algorithm string `json:"algorithm"`
Strategy string `json:"strategy"`
StaticWeightMap map[string]int32 `json:"staticWeightMap,optional"`
Params []string `json:"params,optional"`
Envs []string `json:"envs,optional"`
Cmd string `json:"cmd,optional"`
}
AiResourceTypesResp {

View File

@ -44,7 +44,7 @@ func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *type
return nil, err
}
err = l.svcCtx.Scheduler.AssignAndSchedule(aiSchdl)
_, err = l.svcCtx.Scheduler.AssignAndSchedule(aiSchdl)
if err != nil {
return nil, err
}

View File

@ -40,7 +40,7 @@ func (l *AiQueue) Consume(val string) error {
aiSchdl, _ := schedulers.NewAiScheduler(l.ctx, val, l.svcCtx.Scheduler, nil)
// 调度算法
err := l.svcCtx.Scheduler.AssignAndSchedule(aiSchdl)
_, err := l.svcCtx.Scheduler.AssignAndSchedule(aiSchdl)
if err != nil {
return err
}

View File

@ -47,7 +47,7 @@ type Scheduler struct {
type SubSchedule interface {
GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error)
PickOptimalStrategy() (strategy.Strategy, error)
AssignTask(clusters []*strategy.AssignedCluster) error
AssignTask(clusters []*strategy.AssignedCluster) ([]interface{}, error)
}
func NewScheduler(subSchedule SubSchedule, val string, dbEngin *gorm.DB, participantRpc participantservice.ParticipantService) (*Scheduler, error) {
@ -130,7 +130,7 @@ func (s *Scheduler) TempAssign() error {
return nil
}
func (s *Scheduler) AssignAndSchedule(ss SubSchedule) error {
func (s *Scheduler) AssignAndSchedule(ss SubSchedule) ([]interface{}, error) {
//// 已指定 ParticipantId
//if s.task.ParticipantId != 0 {
// return nil
@ -153,12 +153,12 @@ func (s *Scheduler) AssignAndSchedule(ss SubSchedule) error {
strategy, err := ss.PickOptimalStrategy()
if err != nil {
return err
return nil, err
}
clusters, err := strategy.Schedule()
if err != nil {
return err
return nil, err
}
//集群数量不满足,指定到标签匹配后第一个集群
@ -167,12 +167,12 @@ func (s *Scheduler) AssignAndSchedule(ss SubSchedule) error {
// return nil
//}
err = ss.AssignTask(clusters)
resp, err := ss.AssignTask(clusters)
if err != nil {
return err
return nil, err
}
return nil
return resp, nil
}
func (s *Scheduler) SaveToDb() error {

View File

@ -95,27 +95,52 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
return nil, errors.New("no strategy has been chosen")
}
func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) error {
func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) ([]interface{}, error) {
if clusters == nil {
return errors.New("clusters is nil")
return nil, errors.New("clusters is nil")
}
var wg sync.WaitGroup
var result []interface{}
var errs []error
var ch = make(chan interface{}, len(clusters))
var errCh = make(chan error, len(clusters))
executorMap := *as.AiExecutor
for _, cluster := range clusters {
c := cluster
if cluster.Replicas == 0 {
continue
}
wg.Add(1)
go func() {
_, err := executorMap[c.Name].Execute(as.ctx, as.option)
resp, err := executorMap[c.Name].Execute(as.ctx, as.option)
if err != nil {
// TODO: database operation
errCh <- err
wg.Done()
return
}
// TODO: database operation
ch <- resp
wg.Done()
}()
}
wg.Wait()
return nil
for s := range ch {
result = append(result, s)
}
for e := range errCh {
errs = append(errs, e)
}
if len(errs) != 0 {
return nil, errors.New("submit task failed")
}
return result, nil
}
func (as *AiScheduler) findClustersWithResources() ([]*collector.ResourceStats, error) {

View File

@ -115,6 +115,6 @@ func (cs *CloudScheduler) genTaskAndProviders() (*providerPricing.Task, []*provi
return nil, providerList, nil
}
func (cs *CloudScheduler) AssignTask(clusters []*strategy.AssignedCluster) error {
return nil
func (cs *CloudScheduler) AssignTask(clusters []*strategy.AssignedCluster) ([]interface{}, error) {
return nil, nil
}

View File

@ -50,6 +50,6 @@ func (h *HpcScheduler) genTaskAndProviders(task *response.TaskInfo) (*providerPr
return nil, nil
}
func (h *HpcScheduler) AssignTask(clusters []*strategy.AssignedCluster) error {
return nil
func (h *HpcScheduler) AssignTask(clusters []*strategy.AssignedCluster) ([]interface{}, error) {
return nil, nil
}

View File

@ -64,7 +64,7 @@ func (vm *VmScheduler) genTaskAndProviders() (*providerPricing.Task, []*provider
return nil, providerList, nil
}
func (v VmScheduler) AssignTask(clusters []*strategy.AssignedCluster) error {
func (v VmScheduler) AssignTask(clusters []*strategy.AssignedCluster) ([]interface{}, error) {
//TODO implement me
panic("implement me")
}