updated return type of aischeduler submit

Former-commit-id: c8de6c78b18e2159f4c6ea4abcac77221f7eae48
This commit is contained in:
tzwang 2024-04-01 17:53:42 +08:00
parent b09a45bd5b
commit 69757c9f5b
8 changed files with 43 additions and 20 deletions

View File

@ -14,8 +14,6 @@ type (
ScheduleResp {
Success bool `json:"success"`
TaskId string `json:"taskId"`
ClusterId string `json:"clusterId"`
ErrorMsg string `json:"errorMsg"`
}

View File

@ -44,7 +44,7 @@ func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *type
return nil, err
}
err = l.svcCtx.Scheduler.AssignAndSchedule(aiSchdl)
_, err = l.svcCtx.Scheduler.AssignAndSchedule(aiSchdl)
if err != nil {
return nil, err
}

View File

@ -40,7 +40,7 @@ func (l *AiQueue) Consume(val string) error {
aiSchdl, _ := schedulers.NewAiScheduler(l.ctx, val, l.svcCtx.Scheduler, nil)
// 调度算法
err := l.svcCtx.Scheduler.AssignAndSchedule(aiSchdl)
_, err := l.svcCtx.Scheduler.AssignAndSchedule(aiSchdl)
if err != nil {
return err
}

View File

@ -47,7 +47,7 @@ type Scheduler struct {
type SubSchedule interface {
GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error)
PickOptimalStrategy() (strategy.Strategy, error)
AssignTask(clusters []*strategy.AssignedCluster) error
AssignTask(clusters []*strategy.AssignedCluster) ([]interface{}, error)
}
func NewScheduler(subSchedule SubSchedule, val string, dbEngin *gorm.DB, participantRpc participantservice.ParticipantService) (*Scheduler, error) {
@ -130,7 +130,7 @@ func (s *Scheduler) TempAssign() error {
return nil
}
func (s *Scheduler) AssignAndSchedule(ss SubSchedule) error {
func (s *Scheduler) AssignAndSchedule(ss SubSchedule) ([]interface{}, error) {
//// 已指定 ParticipantId
//if s.task.ParticipantId != 0 {
// return nil
@ -153,12 +153,12 @@ func (s *Scheduler) AssignAndSchedule(ss SubSchedule) error {
strategy, err := ss.PickOptimalStrategy()
if err != nil {
return err
return nil, err
}
clusters, err := strategy.Schedule()
if err != nil {
return err
return nil, err
}
//集群数量不满足,指定到标签匹配后第一个集群
@ -167,12 +167,12 @@ func (s *Scheduler) AssignAndSchedule(ss SubSchedule) error {
// return nil
//}
err = ss.AssignTask(clusters)
resp, err := ss.AssignTask(clusters)
if err != nil {
return err
return nil, err
}
return nil
return resp, nil
}
func (s *Scheduler) SaveToDb() error {

View File

@ -95,27 +95,52 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
return nil, errors.New("no strategy has been chosen")
}
func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) error {
func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) ([]interface{}, error) {
if clusters == nil {
return errors.New("clusters is nil")
return nil, errors.New("clusters is nil")
}
var wg sync.WaitGroup
var result []interface{}
var errs []error
var ch = make(chan interface{}, len(clusters))
var errCh = make(chan error, len(clusters))
executorMap := *as.AiExecutor
for _, cluster := range clusters {
c := cluster
if cluster.Replicas == 0 {
continue
}
wg.Add(1)
go func() {
_, err := executorMap[c.Name].Execute(as.ctx, as.option)
resp, err := executorMap[c.Name].Execute(as.ctx, as.option)
if err != nil {
// TODO: database operation
errCh <- err
wg.Done()
return
}
// TODO: database operation
ch <- resp
wg.Done()
}()
}
wg.Wait()
return nil
for s := range ch {
result = append(result, s)
}
for e := range errCh {
errs = append(errs, e)
}
if len(errs) != 0 {
return nil, errors.New("submit task failed")
}
return result, nil
}
func (as *AiScheduler) findClustersWithResources() ([]*collector.ResourceStats, error) {

View File

@ -115,6 +115,6 @@ func (cs *CloudScheduler) genTaskAndProviders() (*providerPricing.Task, []*provi
return nil, providerList, nil
}
func (cs *CloudScheduler) AssignTask(clusters []*strategy.AssignedCluster) error {
return nil
func (cs *CloudScheduler) AssignTask(clusters []*strategy.AssignedCluster) ([]interface{}, error) {
return nil, nil
}

View File

@ -50,6 +50,6 @@ func (h *HpcScheduler) genTaskAndProviders(task *response.TaskInfo) (*providerPr
return nil, nil
}
func (h *HpcScheduler) AssignTask(clusters []*strategy.AssignedCluster) error {
return nil
func (h *HpcScheduler) AssignTask(clusters []*strategy.AssignedCluster) ([]interface{}, error) {
return nil, nil
}

View File

@ -64,7 +64,7 @@ func (vm *VmScheduler) genTaskAndProviders() (*providerPricing.Task, []*provider
return nil, providerList, nil
}
func (v VmScheduler) AssignTask(clusters []*strategy.AssignedCluster) error {
func (v VmScheduler) AssignTask(clusters []*strategy.AssignedCluster) ([]interface{}, error) {
//TODO implement me
panic("implement me")
}