updated type convert function

Former-commit-id: c1a500ed22fc2922eb4d37d3651cca180c73f948
This commit is contained in:
tzwang 2024-04-02 18:35:42 +08:00
parent 5b4b7f88eb
commit dbe2363339
3 changed files with 54 additions and 19 deletions

View File

@ -4,6 +4,7 @@ import (
"context"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/storeLink"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
@ -28,26 +29,48 @@ func NewScheduleSubmitLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Sc
func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *types.ScheduleResp, err error) {
resp = &types.ScheduleResp{}
opt := &option.AiOption{
ResourceType: req.AiOption.ResourceType,
Tops: 0,
TaskType: req.AiOption.TaskType,
DatasetsName: req.AiOption.Datasets,
//AlgorithmName: "cnn",
ResourceType: req.AiOption.ResourceType,
Tops: req.AiOption.Tops,
TaskType: req.AiOption.TaskType,
DatasetsName: req.AiOption.Datasets,
AlgorithmName: req.AiOption.Algorithm,
StrategyName: req.AiOption.Strategy,
ClusterToStaticWeight: nil,
Params: []string{
"epoch,1",
},
ClusterToStaticWeight: req.AiOption.StaticWeightMap,
Params: req.AiOption.Params,
Envs: req.AiOption.Envs,
Cmd: req.AiOption.Cmd,
}
aiSchdl, err := schedulers.NewAiScheduler(l.ctx, "", l.svcCtx.Scheduler, opt)
if err != nil {
return nil, err
}
_, err = l.svcCtx.Scheduler.AssignAndSchedule(aiSchdl)
results, err := l.svcCtx.Scheduler.AssignAndSchedule(aiSchdl)
if err != nil {
return nil, err
}
//转换成统一返回类型
for _, r := range results {
sResp, err := storeLink.ConvertType(r, resp, nil)
if err != nil {
return nil, err
}
if sResp.(*types.ScheduleResp).ErrorMsg != "" {
resp.ErrorMsg = sResp.(*types.ScheduleResp).ErrorMsg + "\n"
}
}
if resp.ErrorMsg == "" {
resp.Success = true
err = l.svcCtx.Scheduler.AiStorages.SaveTask(req.AiOption.TaskName)
if err != nil {
return nil, err
}
} else {
resp.Success = false
}
return resp, nil
}

View File

@ -100,8 +100,6 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) ([]inter
return nil, errors.New("clusters is nil")
}
//res := struct {
//}{}
var wg sync.WaitGroup
var result []interface{}
var errs []error
@ -116,15 +114,16 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) ([]inter
}
wg.Add(1)
go func() {
//var resp interface{}
//var err error
resp, err := executorMap[c.Name].Execute(as.ctx, as.option)
if err != nil {
// TODO: database operation
errCh <- err
wg.Done()
return
}
// TODO: database operation
data := struct {
Resp interface{}
ClusterId int64
@ -153,14 +152,10 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) ([]inter
Resp interface{}
ClusterId int64
})
// TODO: database operation
result = append(result, data.Resp)
}
err := as.AiStorages.SaveTask(as.option.TaskName)
if err != nil {
return nil, err
}
return result, nil
}

View File

@ -370,6 +370,14 @@ func ConvertType(in interface{}, out interface{}, participant *models.StorelinkC
resp.TaskId = inresp.Payload.JobId
return resp, nil
case *types.ScheduleResp:
resp := (interface{})(out).(*types.ScheduleResp)
resp.Success = inresp.Success
if !resp.Success {
resp.ErrorMsg = inresp.Error.Message
return resp, nil
}
return resp, nil
}
return nil, nil
@ -404,6 +412,15 @@ func ConvertType(in interface{}, out interface{}, participant *models.StorelinkC
resp.ErrorMsg = inresp.Msg
}
return resp, nil
case *types.ScheduleResp:
resp := (interface{})(out).(*types.ScheduleResp)
if inresp.Code == "0" {
resp.Success = true
} else {
resp.Success = false
resp.ErrorMsg = inresp.Msg
}
return resp, nil
}
return nil, nil