diff --git a/api/desc/schedule/pcm-schedule.api b/api/desc/schedule/pcm-schedule.api index 72ba71a7..eafba3de 100644 --- a/api/desc/schedule/pcm-schedule.api +++ b/api/desc/schedule/pcm-schedule.api @@ -13,8 +13,14 @@ type ( } ScheduleResp { - Success bool `json:"success"` - ErrorMsg string `json:"errorMsg"` + Results []*ScheduleResult `json:"results"` + } + + ScheduleResult { + ClusterId string `json:"clusterId"` + TaskId string `json:"taskId"` + Replica int32 `json:"replica"` + Msg string `json:"msg"` } AiOption { diff --git a/api/internal/logic/schedule/schedulesubmitlogic.go b/api/internal/logic/schedule/schedulesubmitlogic.go index 8dbccdae..074d31c0 100644 --- a/api/internal/logic/schedule/schedulesubmitlogic.go +++ b/api/internal/logic/schedule/schedulesubmitlogic.go @@ -4,8 +4,6 @@ import ( "context" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/storeLink" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" @@ -50,15 +48,10 @@ func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *type return nil, err } - //转换成统一返回类型 - for _, r := range results { - sResp, err := storeLink.ConvertType(r, resp, nil) - if err != nil { - return nil, err - } - - if sResp.(*types.ScheduleResp).ErrorMsg != "" { - resp.ErrorMsg = sResp.(*types.ScheduleResp).ErrorMsg + "\n" + switch opt.GetOptionType() { + case option.AI: + for _, result := range results { + _ = (result).(*schedulers.AiResult) } } diff --git a/api/internal/scheduler/schedulers/aiScheduler.go b/api/internal/scheduler/schedulers/aiScheduler.go index a9e73f17..0533f22c 100644 --- a/api/internal/scheduler/schedulers/aiScheduler.go +++ b/api/internal/scheduler/schedulers/aiScheduler.go @@ -16,6 +16,7 @@ package schedulers import ( "context" + "encoding/json" "errors" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" @@ -25,6 +26,9 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" + "gitlink.org.cn/jcce-pcm/pcm-ac/hpcAC" + "gitlink.org.cn/jcce-pcm/pcm-participant-octopus/octopus" + "strconv" "sync" ) @@ -36,6 +40,13 @@ type AiScheduler struct { ctx context.Context } +type AiResult struct { + taskId string + clusterId string + replica int32 + msg string +} + func NewAiScheduler(ctx context.Context, val string, scheduler *scheduler.Scheduler, option *option.AiOption) (*AiScheduler, error) { return &AiScheduler{ctx: ctx, yamlString: val, Scheduler: scheduler, option: option}, nil } @@ -101,9 +112,9 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) ([]inter } var wg sync.WaitGroup - var result []interface{} + var results []interface{} var errs []error - var ch = make(chan interface{}, len(clusters)) + var ch = make(chan *AiResult, len(clusters)) var errCh = make(chan error, len(clusters)) executorMap := *as.AiExecutor @@ -114,9 +125,8 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) ([]inter } wg.Add(1) go func() { - //var resp interface{} - //var err error - resp, err := executorMap[c.Name].Execute(as.ctx, as.option) + opt, _ := cloneAiOption(as.option) + resp, err := executorMap[c.Name].Execute(as.ctx, opt) if err != nil { errCh <- err @@ -124,14 +134,11 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) ([]inter return } - data := struct { - Resp interface{} - ClusterId int64 - }{ - Resp: resp, - ClusterId: c.ParticipantId, - } - ch <- data + result, _ := convertType(resp) + result.replica = c.Replicas + result.clusterId = strconv.FormatInt(c.ParticipantId, 10) + + ch <- result wg.Done() }() } @@ -148,15 +155,11 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) ([]inter } for s := range ch { - data := (s).(struct { - Resp interface{} - ClusterId int64 - }) // TODO: database operation - result = append(result, data.Resp) + results = append(results, s) } - return result, nil + return results, nil } func (as *AiScheduler) findClustersWithResources() ([]*collector.ResourceStats, error) { @@ -202,3 +205,43 @@ func (as *AiScheduler) findClustersWithResources() ([]*collector.ResourceStats, } return resourceSpecs, nil } + +func convertType(in interface{}) (*AiResult, error) { + var result AiResult + switch (in).(type) { + case *hpcAC.SubmitTaskAiResp: + resp := (in).(*hpcAC.SubmitTaskAiResp) + if resp.Code == "0" { + result.taskId = resp.Data + } else { + result.msg = resp.Msg + } + return &result, nil + case *octopus.CreateTrainJobResp: + resp := (in).(*octopus.CreateTrainJobResp) + + if resp.Success { + result.taskId = resp.Payload.JobId + } else { + result.msg = resp.Error.Message + } + + return &result, nil + default: + return nil, errors.New("ai task response failed") + } +} + +func cloneAiOption(opt *option.AiOption) (*option.AiOption, error) { + origJSON, err := json.Marshal(opt) + if err != nil { + return nil, err + } + + clone := option.AiOption{} + if err = json.Unmarshal(origJSON, &clone); err != nil { + return nil, err + } + + return &clone, nil +} diff --git a/api/internal/scheduler/schedulers/option/cloudOption.go b/api/internal/scheduler/schedulers/option/cloudOption.go new file mode 100644 index 00000000..fe6274ed --- /dev/null +++ b/api/internal/scheduler/schedulers/option/cloudOption.go @@ -0,0 +1,9 @@ +package option + +type CloudOption struct { + task interface{} +} + +func (c CloudOption) GetOptionType() string { + return CLOUD +} diff --git a/api/internal/scheduler/strategy/dynamicResources.go b/api/internal/scheduler/strategy/dynamicResources.go index bfb78263..cb1d4922 100644 --- a/api/internal/scheduler/strategy/dynamicResources.go +++ b/api/internal/scheduler/strategy/dynamicResources.go @@ -64,6 +64,8 @@ func (ps *DynamicResourcesStrategy) Schedule() ([]*AssignedCluster, error) { } results = append(results, &assignedCluster) return results, nil + case option.CLOUD: + } return nil, errors.New("failed to apply DynamicResourcesStrategy")