diff --git a/api/internal/logic/schedule/schedulesubmitlogic.go b/api/internal/logic/schedule/schedulesubmitlogic.go index d5105390..8dbccdae 100644 --- a/api/internal/logic/schedule/schedulesubmitlogic.go +++ b/api/internal/logic/schedule/schedulesubmitlogic.go @@ -4,6 +4,7 @@ import ( "context" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/storeLink" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" @@ -28,26 +29,48 @@ func NewScheduleSubmitLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Sc func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *types.ScheduleResp, err error) { resp = &types.ScheduleResp{} opt := &option.AiOption{ - ResourceType: req.AiOption.ResourceType, - Tops: 0, - TaskType: req.AiOption.TaskType, - DatasetsName: req.AiOption.Datasets, - //AlgorithmName: "cnn", + ResourceType: req.AiOption.ResourceType, + Tops: req.AiOption.Tops, + TaskType: req.AiOption.TaskType, + DatasetsName: req.AiOption.Datasets, + AlgorithmName: req.AiOption.Algorithm, StrategyName: req.AiOption.Strategy, - ClusterToStaticWeight: nil, - Params: []string{ - "epoch,1", - }, + ClusterToStaticWeight: req.AiOption.StaticWeightMap, + Params: req.AiOption.Params, + Envs: req.AiOption.Envs, + Cmd: req.AiOption.Cmd, } aiSchdl, err := schedulers.NewAiScheduler(l.ctx, "", l.svcCtx.Scheduler, opt) if err != nil { return nil, err } - _, err = l.svcCtx.Scheduler.AssignAndSchedule(aiSchdl) + results, err := l.svcCtx.Scheduler.AssignAndSchedule(aiSchdl) if err != nil { return nil, err } + //转换成统一返回类型 + for _, r := range results { + sResp, err := storeLink.ConvertType(r, resp, nil) + if err != nil { + return nil, err + } + + if sResp.(*types.ScheduleResp).ErrorMsg != "" { + resp.ErrorMsg = sResp.(*types.ScheduleResp).ErrorMsg + "\n" + } + } + + if resp.ErrorMsg == "" { + resp.Success = true + err = l.svcCtx.Scheduler.AiStorages.SaveTask(req.AiOption.TaskName) + if err != nil { + return nil, err + } + } else { + resp.Success = false + } + return resp, nil } diff --git a/api/internal/scheduler/schedulers/aiScheduler.go b/api/internal/scheduler/schedulers/aiScheduler.go index 8ff45161..a9e73f17 100644 --- a/api/internal/scheduler/schedulers/aiScheduler.go +++ b/api/internal/scheduler/schedulers/aiScheduler.go @@ -100,8 +100,6 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) ([]inter return nil, errors.New("clusters is nil") } - //res := struct { - //}{} var wg sync.WaitGroup var result []interface{} var errs []error @@ -116,15 +114,16 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) ([]inter } wg.Add(1) go func() { + //var resp interface{} + //var err error resp, err := executorMap[c.Name].Execute(as.ctx, as.option) if err != nil { - // TODO: database operation errCh <- err wg.Done() return } - // TODO: database operation + data := struct { Resp interface{} ClusterId int64 @@ -153,14 +152,10 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) ([]inter Resp interface{} ClusterId int64 }) - + // TODO: database operation result = append(result, data.Resp) } - err := as.AiStorages.SaveTask(as.option.TaskName) - if err != nil { - return nil, err - } return result, nil } diff --git a/api/internal/storeLink/storeLink.go b/api/internal/storeLink/storeLink.go index ce00a540..b3c0ba2d 100644 --- a/api/internal/storeLink/storeLink.go +++ b/api/internal/storeLink/storeLink.go @@ -370,6 +370,14 @@ func ConvertType(in interface{}, out interface{}, participant *models.StorelinkC resp.TaskId = inresp.Payload.JobId + return resp, nil + case *types.ScheduleResp: + resp := (interface{})(out).(*types.ScheduleResp) + resp.Success = inresp.Success + if !resp.Success { + resp.ErrorMsg = inresp.Error.Message + return resp, nil + } return resp, nil } return nil, nil @@ -404,6 +412,15 @@ func ConvertType(in interface{}, out interface{}, participant *models.StorelinkC resp.ErrorMsg = inresp.Msg } return resp, nil + case *types.ScheduleResp: + resp := (interface{})(out).(*types.ScheduleResp) + if inresp.Code == "0" { + resp.Success = true + } else { + resp.Success = false + resp.ErrorMsg = inresp.Msg + } + return resp, nil } return nil, nil