From dbe2363339b60661a000b0cfb1dcaf561c3ca5fd Mon Sep 17 00:00:00 2001 From: tzwang Date: Tue, 2 Apr 2024 18:35:42 +0800 Subject: [PATCH 1/2] updated type convert function Former-commit-id: c1a500ed22fc2922eb4d37d3651cca180c73f948 --- .../logic/schedule/schedulesubmitlogic.go | 43 ++++++++++++++----- .../scheduler/schedulers/aiScheduler.go | 13 ++---- api/internal/storeLink/storeLink.go | 17 ++++++++ 3 files changed, 54 insertions(+), 19 deletions(-) diff --git a/api/internal/logic/schedule/schedulesubmitlogic.go b/api/internal/logic/schedule/schedulesubmitlogic.go index d5105390..8dbccdae 100644 --- a/api/internal/logic/schedule/schedulesubmitlogic.go +++ b/api/internal/logic/schedule/schedulesubmitlogic.go @@ -4,6 +4,7 @@ import ( "context" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/storeLink" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" @@ -28,26 +29,48 @@ func NewScheduleSubmitLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Sc func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *types.ScheduleResp, err error) { resp = &types.ScheduleResp{} opt := &option.AiOption{ - ResourceType: req.AiOption.ResourceType, - Tops: 0, - TaskType: req.AiOption.TaskType, - DatasetsName: req.AiOption.Datasets, - //AlgorithmName: "cnn", + ResourceType: req.AiOption.ResourceType, + Tops: req.AiOption.Tops, + TaskType: req.AiOption.TaskType, + DatasetsName: req.AiOption.Datasets, + AlgorithmName: req.AiOption.Algorithm, StrategyName: req.AiOption.Strategy, - ClusterToStaticWeight: nil, - Params: []string{ - "epoch,1", - }, + ClusterToStaticWeight: req.AiOption.StaticWeightMap, + Params: req.AiOption.Params, + Envs: req.AiOption.Envs, + Cmd: req.AiOption.Cmd, } aiSchdl, err := schedulers.NewAiScheduler(l.ctx, "", l.svcCtx.Scheduler, opt) if err != nil { return nil, err } - _, err = l.svcCtx.Scheduler.AssignAndSchedule(aiSchdl) + results, err := l.svcCtx.Scheduler.AssignAndSchedule(aiSchdl) if err != nil { return nil, err } + //转换成统一返回类型 + for _, r := range results { + sResp, err := storeLink.ConvertType(r, resp, nil) + if err != nil { + return nil, err + } + + if sResp.(*types.ScheduleResp).ErrorMsg != "" { + resp.ErrorMsg = sResp.(*types.ScheduleResp).ErrorMsg + "\n" + } + } + + if resp.ErrorMsg == "" { + resp.Success = true + err = l.svcCtx.Scheduler.AiStorages.SaveTask(req.AiOption.TaskName) + if err != nil { + return nil, err + } + } else { + resp.Success = false + } + return resp, nil } diff --git a/api/internal/scheduler/schedulers/aiScheduler.go b/api/internal/scheduler/schedulers/aiScheduler.go index 8ff45161..a9e73f17 100644 --- a/api/internal/scheduler/schedulers/aiScheduler.go +++ b/api/internal/scheduler/schedulers/aiScheduler.go @@ -100,8 +100,6 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) ([]inter return nil, errors.New("clusters is nil") } - //res := struct { - //}{} var wg sync.WaitGroup var result []interface{} var errs []error @@ -116,15 +114,16 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) ([]inter } wg.Add(1) go func() { + //var resp interface{} + //var err error resp, err := executorMap[c.Name].Execute(as.ctx, as.option) if err != nil { - // TODO: database operation errCh <- err wg.Done() return } - // TODO: database operation + data := struct { Resp interface{} ClusterId int64 @@ -153,14 +152,10 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) ([]inter Resp interface{} ClusterId int64 }) - + // TODO: database operation result = append(result, data.Resp) } - err := as.AiStorages.SaveTask(as.option.TaskName) - if err != nil { - return nil, err - } return result, nil } diff --git a/api/internal/storeLink/storeLink.go b/api/internal/storeLink/storeLink.go index ce00a540..b3c0ba2d 100644 --- a/api/internal/storeLink/storeLink.go +++ b/api/internal/storeLink/storeLink.go @@ -370,6 +370,14 @@ func ConvertType(in interface{}, out interface{}, participant *models.StorelinkC resp.TaskId = inresp.Payload.JobId + return resp, nil + case *types.ScheduleResp: + resp := (interface{})(out).(*types.ScheduleResp) + resp.Success = inresp.Success + if !resp.Success { + resp.ErrorMsg = inresp.Error.Message + return resp, nil + } return resp, nil } return nil, nil @@ -404,6 +412,15 @@ func ConvertType(in interface{}, out interface{}, participant *models.StorelinkC resp.ErrorMsg = inresp.Msg } return resp, nil + case *types.ScheduleResp: + resp := (interface{})(out).(*types.ScheduleResp) + if inresp.Code == "0" { + resp.Success = true + } else { + resp.Success = false + resp.ErrorMsg = inresp.Msg + } + return resp, nil } return nil, nil From cdd7fe9dca810ce3653fbbe862ba115be6187ac2 Mon Sep 17 00:00:00 2001 From: tzwang Date: Sun, 7 Apr 2024 16:50:30 +0800 Subject: [PATCH 2/2] updated schedule submit task api logic Former-commit-id: 72fe516fb25e2feb760391799265642341d16277 --- api/desc/schedule/pcm-schedule.api | 10 ++- .../logic/schedule/schedulesubmitlogic.go | 15 +--- .../scheduler/schedulers/aiScheduler.go | 81 ++++++++++++++----- .../schedulers/option/cloudOption.go | 9 +++ .../scheduler/strategy/dynamicResources.go | 2 + 5 files changed, 85 insertions(+), 32 deletions(-) create mode 100644 api/internal/scheduler/schedulers/option/cloudOption.go diff --git a/api/desc/schedule/pcm-schedule.api b/api/desc/schedule/pcm-schedule.api index 72ba71a7..eafba3de 100644 --- a/api/desc/schedule/pcm-schedule.api +++ b/api/desc/schedule/pcm-schedule.api @@ -13,8 +13,14 @@ type ( } ScheduleResp { - Success bool `json:"success"` - ErrorMsg string `json:"errorMsg"` + Results []*ScheduleResult `json:"results"` + } + + ScheduleResult { + ClusterId string `json:"clusterId"` + TaskId string `json:"taskId"` + Replica int32 `json:"replica"` + Msg string `json:"msg"` } AiOption { diff --git a/api/internal/logic/schedule/schedulesubmitlogic.go b/api/internal/logic/schedule/schedulesubmitlogic.go index 8dbccdae..074d31c0 100644 --- a/api/internal/logic/schedule/schedulesubmitlogic.go +++ b/api/internal/logic/schedule/schedulesubmitlogic.go @@ -4,8 +4,6 @@ import ( "context" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/storeLink" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" @@ -50,15 +48,10 @@ func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *type return nil, err } - //转换成统一返回类型 - for _, r := range results { - sResp, err := storeLink.ConvertType(r, resp, nil) - if err != nil { - return nil, err - } - - if sResp.(*types.ScheduleResp).ErrorMsg != "" { - resp.ErrorMsg = sResp.(*types.ScheduleResp).ErrorMsg + "\n" + switch opt.GetOptionType() { + case option.AI: + for _, result := range results { + _ = (result).(*schedulers.AiResult) } } diff --git a/api/internal/scheduler/schedulers/aiScheduler.go b/api/internal/scheduler/schedulers/aiScheduler.go index a9e73f17..0533f22c 100644 --- a/api/internal/scheduler/schedulers/aiScheduler.go +++ b/api/internal/scheduler/schedulers/aiScheduler.go @@ -16,6 +16,7 @@ package schedulers import ( "context" + "encoding/json" "errors" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" @@ -25,6 +26,9 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" + "gitlink.org.cn/jcce-pcm/pcm-ac/hpcAC" + "gitlink.org.cn/jcce-pcm/pcm-participant-octopus/octopus" + "strconv" "sync" ) @@ -36,6 +40,13 @@ type AiScheduler struct { ctx context.Context } +type AiResult struct { + taskId string + clusterId string + replica int32 + msg string +} + func NewAiScheduler(ctx context.Context, val string, scheduler *scheduler.Scheduler, option *option.AiOption) (*AiScheduler, error) { return &AiScheduler{ctx: ctx, yamlString: val, Scheduler: scheduler, option: option}, nil } @@ -101,9 +112,9 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) ([]inter } var wg sync.WaitGroup - var result []interface{} + var results []interface{} var errs []error - var ch = make(chan interface{}, len(clusters)) + var ch = make(chan *AiResult, len(clusters)) var errCh = make(chan error, len(clusters)) executorMap := *as.AiExecutor @@ -114,9 +125,8 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) ([]inter } wg.Add(1) go func() { - //var resp interface{} - //var err error - resp, err := executorMap[c.Name].Execute(as.ctx, as.option) + opt, _ := cloneAiOption(as.option) + resp, err := executorMap[c.Name].Execute(as.ctx, opt) if err != nil { errCh <- err @@ -124,14 +134,11 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) ([]inter return } - data := struct { - Resp interface{} - ClusterId int64 - }{ - Resp: resp, - ClusterId: c.ParticipantId, - } - ch <- data + result, _ := convertType(resp) + result.replica = c.Replicas + result.clusterId = strconv.FormatInt(c.ParticipantId, 10) + + ch <- result wg.Done() }() } @@ -148,15 +155,11 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) ([]inter } for s := range ch { - data := (s).(struct { - Resp interface{} - ClusterId int64 - }) // TODO: database operation - result = append(result, data.Resp) + results = append(results, s) } - return result, nil + return results, nil } func (as *AiScheduler) findClustersWithResources() ([]*collector.ResourceStats, error) { @@ -202,3 +205,43 @@ func (as *AiScheduler) findClustersWithResources() ([]*collector.ResourceStats, } return resourceSpecs, nil } + +func convertType(in interface{}) (*AiResult, error) { + var result AiResult + switch (in).(type) { + case *hpcAC.SubmitTaskAiResp: + resp := (in).(*hpcAC.SubmitTaskAiResp) + if resp.Code == "0" { + result.taskId = resp.Data + } else { + result.msg = resp.Msg + } + return &result, nil + case *octopus.CreateTrainJobResp: + resp := (in).(*octopus.CreateTrainJobResp) + + if resp.Success { + result.taskId = resp.Payload.JobId + } else { + result.msg = resp.Error.Message + } + + return &result, nil + default: + return nil, errors.New("ai task response failed") + } +} + +func cloneAiOption(opt *option.AiOption) (*option.AiOption, error) { + origJSON, err := json.Marshal(opt) + if err != nil { + return nil, err + } + + clone := option.AiOption{} + if err = json.Unmarshal(origJSON, &clone); err != nil { + return nil, err + } + + return &clone, nil +} diff --git a/api/internal/scheduler/schedulers/option/cloudOption.go b/api/internal/scheduler/schedulers/option/cloudOption.go new file mode 100644 index 00000000..fe6274ed --- /dev/null +++ b/api/internal/scheduler/schedulers/option/cloudOption.go @@ -0,0 +1,9 @@ +package option + +type CloudOption struct { + task interface{} +} + +func (c CloudOption) GetOptionType() string { + return CLOUD +} diff --git a/api/internal/scheduler/strategy/dynamicResources.go b/api/internal/scheduler/strategy/dynamicResources.go index bfb78263..cb1d4922 100644 --- a/api/internal/scheduler/strategy/dynamicResources.go +++ b/api/internal/scheduler/strategy/dynamicResources.go @@ -64,6 +64,8 @@ func (ps *DynamicResourcesStrategy) Schedule() ([]*AssignedCluster, error) { } results = append(results, &assignedCluster) return results, nil + case option.CLOUD: + } return nil, errors.New("failed to apply DynamicResourcesStrategy")