diff --git a/api/desc/schedule/pcm-schedule.api b/api/desc/schedule/pcm-schedule.api index 72ba71a7..eafba3de 100644 --- a/api/desc/schedule/pcm-schedule.api +++ b/api/desc/schedule/pcm-schedule.api @@ -13,8 +13,14 @@ type ( } ScheduleResp { - Success bool `json:"success"` - ErrorMsg string `json:"errorMsg"` + Results []*ScheduleResult `json:"results"` + } + + ScheduleResult { + ClusterId string `json:"clusterId"` + TaskId string `json:"taskId"` + Replica int32 `json:"replica"` + Msg string `json:"msg"` } AiOption { diff --git a/api/internal/logic/schedule/schedulesubmitlogic.go b/api/internal/logic/schedule/schedulesubmitlogic.go index d5105390..074d31c0 100644 --- a/api/internal/logic/schedule/schedulesubmitlogic.go +++ b/api/internal/logic/schedule/schedulesubmitlogic.go @@ -4,7 +4,6 @@ import ( "context" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" @@ -28,26 +27,43 @@ func NewScheduleSubmitLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Sc func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *types.ScheduleResp, err error) { resp = &types.ScheduleResp{} opt := &option.AiOption{ - ResourceType: req.AiOption.ResourceType, - Tops: 0, - TaskType: req.AiOption.TaskType, - DatasetsName: req.AiOption.Datasets, - //AlgorithmName: "cnn", + ResourceType: req.AiOption.ResourceType, + Tops: req.AiOption.Tops, + TaskType: req.AiOption.TaskType, + DatasetsName: req.AiOption.Datasets, + AlgorithmName: req.AiOption.Algorithm, StrategyName: req.AiOption.Strategy, - ClusterToStaticWeight: nil, - Params: []string{ - "epoch,1", - }, + ClusterToStaticWeight: req.AiOption.StaticWeightMap, + Params: req.AiOption.Params, + Envs: req.AiOption.Envs, + Cmd: req.AiOption.Cmd, } aiSchdl, err := schedulers.NewAiScheduler(l.ctx, "", l.svcCtx.Scheduler, opt) if err != nil { return nil, err } - _, err = l.svcCtx.Scheduler.AssignAndSchedule(aiSchdl) + results, err := l.svcCtx.Scheduler.AssignAndSchedule(aiSchdl) if err != nil { return nil, err } + switch opt.GetOptionType() { + case option.AI: + for _, result := range results { + _ = (result).(*schedulers.AiResult) + } + } + + if resp.ErrorMsg == "" { + resp.Success = true + err = l.svcCtx.Scheduler.AiStorages.SaveTask(req.AiOption.TaskName) + if err != nil { + return nil, err + } + } else { + resp.Success = false + } + return resp, nil } diff --git a/api/internal/scheduler/schedulers/aiScheduler.go b/api/internal/scheduler/schedulers/aiScheduler.go index 8ff45161..0533f22c 100644 --- a/api/internal/scheduler/schedulers/aiScheduler.go +++ b/api/internal/scheduler/schedulers/aiScheduler.go @@ -16,6 +16,7 @@ package schedulers import ( "context" + "encoding/json" "errors" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" @@ -25,6 +26,9 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" + "gitlink.org.cn/jcce-pcm/pcm-ac/hpcAC" + "gitlink.org.cn/jcce-pcm/pcm-participant-octopus/octopus" + "strconv" "sync" ) @@ -36,6 +40,13 @@ type AiScheduler struct { ctx context.Context } +type AiResult struct { + taskId string + clusterId string + replica int32 + msg string +} + func NewAiScheduler(ctx context.Context, val string, scheduler *scheduler.Scheduler, option *option.AiOption) (*AiScheduler, error) { return &AiScheduler{ctx: ctx, yamlString: val, Scheduler: scheduler, option: option}, nil } @@ -100,12 +111,10 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) ([]inter return nil, errors.New("clusters is nil") } - //res := struct { - //}{} var wg sync.WaitGroup - var result []interface{} + var results []interface{} var errs []error - var ch = make(chan interface{}, len(clusters)) + var ch = make(chan *AiResult, len(clusters)) var errCh = make(chan error, len(clusters)) executorMap := *as.AiExecutor @@ -116,23 +125,20 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) ([]inter } wg.Add(1) go func() { - resp, err := executorMap[c.Name].Execute(as.ctx, as.option) + opt, _ := cloneAiOption(as.option) + resp, err := executorMap[c.Name].Execute(as.ctx, opt) if err != nil { - // TODO: database operation errCh <- err wg.Done() return } - // TODO: database operation - data := struct { - Resp interface{} - ClusterId int64 - }{ - Resp: resp, - ClusterId: c.ParticipantId, - } - ch <- data + + result, _ := convertType(resp) + result.replica = c.Replicas + result.clusterId = strconv.FormatInt(c.ParticipantId, 10) + + ch <- result wg.Done() }() } @@ -149,19 +155,11 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) ([]inter } for s := range ch { - data := (s).(struct { - Resp interface{} - ClusterId int64 - }) - - result = append(result, data.Resp) + // TODO: database operation + results = append(results, s) } - err := as.AiStorages.SaveTask(as.option.TaskName) - if err != nil { - return nil, err - } - return result, nil + return results, nil } func (as *AiScheduler) findClustersWithResources() ([]*collector.ResourceStats, error) { @@ -207,3 +205,43 @@ func (as *AiScheduler) findClustersWithResources() ([]*collector.ResourceStats, } return resourceSpecs, nil } + +func convertType(in interface{}) (*AiResult, error) { + var result AiResult + switch (in).(type) { + case *hpcAC.SubmitTaskAiResp: + resp := (in).(*hpcAC.SubmitTaskAiResp) + if resp.Code == "0" { + result.taskId = resp.Data + } else { + result.msg = resp.Msg + } + return &result, nil + case *octopus.CreateTrainJobResp: + resp := (in).(*octopus.CreateTrainJobResp) + + if resp.Success { + result.taskId = resp.Payload.JobId + } else { + result.msg = resp.Error.Message + } + + return &result, nil + default: + return nil, errors.New("ai task response failed") + } +} + +func cloneAiOption(opt *option.AiOption) (*option.AiOption, error) { + origJSON, err := json.Marshal(opt) + if err != nil { + return nil, err + } + + clone := option.AiOption{} + if err = json.Unmarshal(origJSON, &clone); err != nil { + return nil, err + } + + return &clone, nil +} diff --git a/api/internal/scheduler/schedulers/option/cloudOption.go b/api/internal/scheduler/schedulers/option/cloudOption.go new file mode 100644 index 00000000..fe6274ed --- /dev/null +++ b/api/internal/scheduler/schedulers/option/cloudOption.go @@ -0,0 +1,9 @@ +package option + +type CloudOption struct { + task interface{} +} + +func (c CloudOption) GetOptionType() string { + return CLOUD +} diff --git a/api/internal/scheduler/strategy/dynamicResources.go b/api/internal/scheduler/strategy/dynamicResources.go index bfb78263..cb1d4922 100644 --- a/api/internal/scheduler/strategy/dynamicResources.go +++ b/api/internal/scheduler/strategy/dynamicResources.go @@ -64,6 +64,8 @@ func (ps *DynamicResourcesStrategy) Schedule() ([]*AssignedCluster, error) { } results = append(results, &assignedCluster) return results, nil + case option.CLOUD: + } return nil, errors.New("failed to apply DynamicResourcesStrategy") diff --git a/api/internal/storeLink/storeLink.go b/api/internal/storeLink/storeLink.go index ce00a540..b3c0ba2d 100644 --- a/api/internal/storeLink/storeLink.go +++ b/api/internal/storeLink/storeLink.go @@ -370,6 +370,14 @@ func ConvertType(in interface{}, out interface{}, participant *models.StorelinkC resp.TaskId = inresp.Payload.JobId + return resp, nil + case *types.ScheduleResp: + resp := (interface{})(out).(*types.ScheduleResp) + resp.Success = inresp.Success + if !resp.Success { + resp.ErrorMsg = inresp.Error.Message + return resp, nil + } return resp, nil } return nil, nil @@ -404,6 +412,15 @@ func ConvertType(in interface{}, out interface{}, participant *models.StorelinkC resp.ErrorMsg = inresp.Msg } return resp, nil + case *types.ScheduleResp: + resp := (interface{})(out).(*types.ScheduleResp) + if inresp.Code == "0" { + resp.Success = true + } else { + resp.Success = false + resp.ErrorMsg = inresp.Msg + } + return resp, nil } return nil, nil