diff --git a/api/internal/handler/routes.go b/api/internal/handler/routes.go index 0217023f..070785f6 100644 --- a/api/internal/handler/routes.go +++ b/api/internal/handler/routes.go @@ -786,4 +786,35 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) { }, rest.WithPrefix("/pcm/v1"), ) + + server.AddRoutes( + []rest.Route{ + { + Method: http.MethodGet, + Path: "/schedule/ai/getResourceTypes", + Handler: schedule.ScheduleGetAiResourceTypesHandler(serverCtx), + }, + { + Method: http.MethodGet, + Path: "/schedule/ai/getTaskTypes", + Handler: schedule.ScheduleGetAiTaskTypesHandler(serverCtx), + }, + { + Method: http.MethodGet, + Path: "/schedule/ai/getDatasets", + Handler: schedule.ScheduleGetDatasetsHandler(serverCtx), + }, + { + Method: http.MethodGet, + Path: "/schedule/ai/getStrategies", + Handler: schedule.ScheduleGetStrategyHandler(serverCtx), + }, + { + Method: http.MethodPost, + Path: "/schedule/submit", + Handler: schedule.ScheduleSubmitHandler(serverCtx), + }, + }, + rest.WithPrefix("/pcm/v1"), + ) } diff --git a/api/internal/logic/schedule/schedulegetairesourcetypeslogic.go b/api/internal/logic/schedule/schedulegetairesourcetypeslogic.go new file mode 100644 index 00000000..025642bf --- /dev/null +++ b/api/internal/logic/schedule/schedulegetairesourcetypeslogic.go @@ -0,0 +1,33 @@ +package schedule + +import ( + "context" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/storeLink" + + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types" + + "github.com/zeromicro/go-zero/core/logx" +) + +type ScheduleGetAiResourceTypesLogic struct { + logx.Logger + ctx context.Context + svcCtx *svc.ServiceContext +} + +func NewScheduleGetAiResourceTypesLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ScheduleGetAiResourceTypesLogic { + return &ScheduleGetAiResourceTypesLogic{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + } +} + +func (l *ScheduleGetAiResourceTypesLogic) ScheduleGetAiResourceTypes() (resp *types.AiResourceTypesResp, err error) { + resp = &types.AiResourceTypesResp{} + resourceTypes := storeLink.GetResourceTypes() + resp.ResourceTypes = resourceTypes + + return resp, nil +} diff --git a/api/internal/logic/schedule/schedulegetaitasktypeslogic.go b/api/internal/logic/schedule/schedulegetaitasktypeslogic.go new file mode 100644 index 00000000..926e4dbc --- /dev/null +++ b/api/internal/logic/schedule/schedulegetaitasktypeslogic.go @@ -0,0 +1,33 @@ +package schedule + +import ( + "context" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/storeLink" + + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types" + + "github.com/zeromicro/go-zero/core/logx" +) + +type ScheduleGetAiTaskTypesLogic struct { + logx.Logger + ctx context.Context + svcCtx *svc.ServiceContext +} + +func NewScheduleGetAiTaskTypesLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ScheduleGetAiTaskTypesLogic { + return &ScheduleGetAiTaskTypesLogic{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + } +} + +func (l *ScheduleGetAiTaskTypesLogic) ScheduleGetAiTaskTypes() (resp *types.AiTaskTypesResp, err error) { + resp = &types.AiTaskTypesResp{} + taskTypes := storeLink.GetTaskTypes() + resp.TaskTypes = taskTypes + + return resp, nil +} diff --git a/api/internal/logic/schedule/schedulegetdatasetslogic.go b/api/internal/logic/schedule/schedulegetdatasetslogic.go new file mode 100644 index 00000000..99a3dd43 --- /dev/null +++ b/api/internal/logic/schedule/schedulegetdatasetslogic.go @@ -0,0 +1,37 @@ +package schedule + +import ( + "context" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/storeLink" + + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types" + + "github.com/zeromicro/go-zero/core/logx" +) + +type ScheduleGetDatasetsLogic struct { + logx.Logger + ctx context.Context + svcCtx *svc.ServiceContext +} + +func NewScheduleGetDatasetsLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ScheduleGetDatasetsLogic { + return &ScheduleGetDatasetsLogic{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + } +} + +func (l *ScheduleGetDatasetsLogic) ScheduleGetDatasets() (resp *types.AiDatasetsResp, err error) { + resp = &types.AiDatasetsResp{} + _, colMap := service.InitAiClusterMap(l.ctx, l.svcCtx) + names, err := storeLink.GetDatasetsNames(colMap) + if err != nil { + return nil, err + } + resp.Datasets = names + return resp, nil +} diff --git a/api/internal/logic/schedule/schedulegetstrategylogic.go b/api/internal/logic/schedule/schedulegetstrategylogic.go new file mode 100644 index 00000000..9b591f83 --- /dev/null +++ b/api/internal/logic/schedule/schedulegetstrategylogic.go @@ -0,0 +1,32 @@ +package schedule + +import ( + "context" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types" + + "github.com/zeromicro/go-zero/core/logx" +) + +type ScheduleGetStrategyLogic struct { + logx.Logger + ctx context.Context + svcCtx *svc.ServiceContext +} + +func NewScheduleGetStrategyLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ScheduleGetStrategyLogic { + return &ScheduleGetStrategyLogic{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + } +} + +func (l *ScheduleGetStrategyLogic) ScheduleGetStrategy() (resp *types.AiStrategyResp, err error) { + resp = &types.AiStrategyResp{} + names := strategy.GetStrategyNames() + resp.Strategies = names + + return resp, nil +} diff --git a/api/internal/logic/schedule/schedulesubmitlogic.go b/api/internal/logic/schedule/schedulesubmitlogic.go new file mode 100644 index 00000000..924aa37b --- /dev/null +++ b/api/internal/logic/schedule/schedulesubmitlogic.go @@ -0,0 +1,30 @@ +package schedule + +import ( + "context" + + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types" + + "github.com/zeromicro/go-zero/core/logx" +) + +type ScheduleSubmitLogic struct { + logx.Logger + ctx context.Context + svcCtx *svc.ServiceContext +} + +func NewScheduleSubmitLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ScheduleSubmitLogic { + return &ScheduleSubmitLogic{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + } +} + +func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleResp) (resp *types.ScheduleResp, err error) { + // todo: add your logic here and delete this line + + return +} diff --git a/api/internal/logic/storelink/submitlinktasklogic.go b/api/internal/logic/storelink/submitlinktasklogic.go index 45710841..354e5521 100644 --- a/api/internal/logic/storelink/submitlinktasklogic.go +++ b/api/internal/logic/storelink/submitlinktasklogic.go @@ -67,7 +67,7 @@ func (l *SubmitLinkTaskLogic) SubmitLinkTask(req *types.SubmitLinkTaskReq) (resp envs = append(envs, env) } } - task, err := storelink.ILinkage.SubmitTask(req.ImageId, req.Cmd, envs, params, req.ResourceId, "pytorch") + task, err := storelink.ILinkage.SubmitTask(req.ImageId, req.Cmd, envs, params, req.ResourceId, "", "", "pytorch") if err != nil { return nil, err } diff --git a/api/internal/scheduler/schedulers/aiScheduler.go b/api/internal/scheduler/schedulers/aiScheduler.go index 3077b67b..c8b5c72b 100644 --- a/api/internal/scheduler/schedulers/aiScheduler.go +++ b/api/internal/scheduler/schedulers/aiScheduler.go @@ -71,7 +71,7 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) { case strategy.REPLICATION: strategy := strategy.NewReplicationStrategy(¶m.ReplicationParams{Params: params, Replicas: 1}) return strategy, nil - case strategy.RESOURCE_PRICING: + case strategy.RESOURCES_PRICING: strategy := strategy.NewPricingStrategy(¶m.ResourcePricingParams{Params: params, Replicas: 1}) return strategy, nil } diff --git a/api/internal/scheduler/schedulers/option/aiOption.go b/api/internal/scheduler/schedulers/option/aiOption.go index 9512b3e9..acff16de 100644 --- a/api/internal/scheduler/schedulers/option/aiOption.go +++ b/api/internal/scheduler/schedulers/option/aiOption.go @@ -9,11 +9,12 @@ type AiOption struct { ClusterToStaticWeight map[string]int32 CodeType string - ImageId string - SpecId string - //DatasetsId string - CodeId string - ResourceId string + ImageId string + SpecId string + DatasetsId string + CodeId string + ResourceId string + AlgorithmId string Cmd string Envs []string diff --git a/api/internal/scheduler/service/aiService.go b/api/internal/scheduler/service/aiService.go index 01e41685..4ac09867 100644 --- a/api/internal/scheduler/service/aiService.go +++ b/api/internal/scheduler/service/aiService.go @@ -16,11 +16,11 @@ const ( var ( AiTypeMap = map[string]string{ - "hanwuji": OCTOPUS, - "suiyan": OCTOPUS, - "sailingsi": OCTOPUS, - "modelarts-CloudBrain2": MODELARTS, - "shuguangAi": SHUGUANGAI, + "hanwuji": OCTOPUS, + //"suiyan": OCTOPUS, + //"sailingsi": OCTOPUS, + //"modelarts-CloudBrain2": MODELARTS, + "shuguangAi": SHUGUANGAI, } ) diff --git a/api/internal/scheduler/strategy/strategy.go b/api/internal/scheduler/strategy/strategy.go index ef9123cc..f5c06d64 100644 --- a/api/internal/scheduler/strategy/strategy.go +++ b/api/internal/scheduler/strategy/strategy.go @@ -1,14 +1,16 @@ package strategy const ( - REPLICATION = "replication" - RESOURCE_PRICING = "resourcePricing" - STATIC_WEIGHT = "staticWeight" - DYNAMIC_WEIGHT = "dynamicWeight" + REPLICATION = "replication" + RESOURCES_PRICING = "resourcesPricing" + STATIC_WEIGHT = "staticWeight" + DYNAMIC_RESOURCES = "dynamicResources" + DATA_LOCALITY = "dataLocality" //感知数据位置,数据调度和计算调度协同,近数据调度 + ENERGY_CONSUMPTION = "energyConsumption" //根据各集群总体能耗水平调度作业,优先选择能耗低的集群调度作业 ) var ( - strategyNames = []string{REPLICATION, RESOURCE_PRICING} + strategyNames = []string{REPLICATION, RESOURCES_PRICING, STATIC_WEIGHT, DYNAMIC_RESOURCES} ) type Strategy interface { diff --git a/api/internal/scheduler/strategy/test/strategy_test.go b/api/internal/scheduler/strategy/test/strategy_test.go index 6331d136..e92bf482 100644 --- a/api/internal/scheduler/strategy/test/strategy_test.go +++ b/api/internal/scheduler/strategy/test/strategy_test.go @@ -15,7 +15,7 @@ func TestReplication(t *testing.T) { {Name: "test2", Participant_id: 2}, {Name: "test3", Participant_id: 3}, } - rsc := []*collector.ResourceSpecs{ + rsc := []*collector.ResourceStats{ { ParticipantId: 1, Name: "test1", @@ -31,7 +31,7 @@ func TestReplication(t *testing.T) { name string replica int32 ps []entity.Participant - res []*collector.ResourceSpecs + res []*collector.ResourceStats }{ { name: "test1", diff --git a/api/internal/storeLink/modelarts.go b/api/internal/storeLink/modelarts.go index c3b4cda1..2d65db43 100644 --- a/api/internal/storeLink/modelarts.go +++ b/api/internal/storeLink/modelarts.go @@ -63,7 +63,7 @@ func (m *ModelArtsLink) QueryImageList() (interface{}, error) { return resp, nil } -func (m *ModelArtsLink) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, aiType string) (interface{}, error) { +func (m *ModelArtsLink) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) { // modelArts提交任务 environments := make(map[string]string) parameters := make([]*modelarts.ParametersTrainJob, 0) @@ -162,7 +162,7 @@ func (m *ModelArtsLink) Execute(option *option.AiOption) (interface{}, error) { if err != nil { return nil, err } - task, err := m.SubmitTask(option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.TaskType) + task, err := m.SubmitTask(option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.DatasetsId, option.AlgorithmId, option.TaskType) if err != nil { return nil, err } diff --git a/api/internal/storeLink/octopus.go b/api/internal/storeLink/octopus.go index 9448d897..13fe2b07 100644 --- a/api/internal/storeLink/octopus.go +++ b/api/internal/storeLink/octopus.go @@ -108,7 +108,7 @@ func (o *OctopusLink) QueryImageList() (interface{}, error) { return resp, nil } -func (o *OctopusLink) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, aiType string) (interface{}, error) { +func (o *OctopusLink) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) { // octopus提交任务 // python参数 @@ -227,7 +227,7 @@ func (o *OctopusLink) Execute(option *option.AiOption) (interface{}, error) { if err != nil { return nil, err } - task, err := o.SubmitTask(option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.TaskType) + task, err := o.SubmitTask(option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.DatasetsId, option.AlgorithmId, option.TaskType) if err != nil { return nil, err } @@ -239,6 +239,10 @@ func (o *OctopusLink) GenerateSubmitParams(option *option.AiOption) error { if err != nil { return err } + err = o.generateDatasetsId(option) + if err != nil { + return err + } err = o.generateImageId(option) if err != nil { return err @@ -259,10 +263,34 @@ func (o *OctopusLink) GenerateSubmitParams(option *option.AiOption) error { } func (o *OctopusLink) generateResourceId(option *option.AiOption) error { - return nil } +func (o *OctopusLink) generateDatasetsId(option *option.AiOption) error { + if option.DatasetsName == "" { + return errors.New("DatasetsName not set") + } + req := &octopus.GetMyDatasetListReq{ + Platform: o.platform, + PageIndex: o.pageIndex, + PageSize: o.pageSize, + } + resp, err := o.svcCtx.OctopusRpc.GetMyDatasetList(o.ctx, req) + if err != nil { + return err + } + if !resp.Success { + return errors.New("failed to get DatasetsId") + } + for _, dataset := range resp.Payload.Datasets { + if dataset.Name == option.DatasetsName { + option.DatasetsId = dataset.Id + return nil + } + } + return errors.New("failed to get DatasetsId") +} + func (o *OctopusLink) generateImageId(option *option.AiOption) error { return nil diff --git a/api/internal/storeLink/shuguangHpc.go b/api/internal/storeLink/shuguangHpc.go index f7f0af82..85b4c32e 100644 --- a/api/internal/storeLink/shuguangHpc.go +++ b/api/internal/storeLink/shuguangHpc.go @@ -144,7 +144,7 @@ func (s ShuguangHpc) QueryImageList() (interface{}, error) { return nil, nil } -func (s ShuguangHpc) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, aiType string) (interface{}, error) { +func (s ShuguangHpc) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) { // shuguangHpc提交任务 //判断是否resourceId匹配自定义资源Id diff --git a/api/internal/storeLink/shuguangai.go b/api/internal/storeLink/shuguangai.go index eef573bf..c2898513 100644 --- a/api/internal/storeLink/shuguangai.go +++ b/api/internal/storeLink/shuguangai.go @@ -141,7 +141,7 @@ func (s *ShuguangAi) SubmitTensorflowTask(imageId string, cmd string, envs []str return nil, nil } -func (s *ShuguangAi) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, aiType string) (interface{}, error) { +func (s *ShuguangAi) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) { // shuguangAi提交任务 switch aiType { case PYTORCH_TASK: @@ -248,7 +248,7 @@ func (s *ShuguangAi) Execute(option *option.AiOption) (interface{}, error) { if err != nil { return nil, err } - task, err := s.SubmitTask(option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.TaskType) + task, err := s.SubmitTask(option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.DatasetsId, option.AlgorithmId, option.TaskType) if err != nil { return nil, err } diff --git a/api/internal/storeLink/storeLink.go b/api/internal/storeLink/storeLink.go index 80714797..b1815758 100644 --- a/api/internal/storeLink/storeLink.go +++ b/api/internal/storeLink/storeLink.go @@ -33,7 +33,7 @@ type Linkage interface { UploadImage(path string) (interface{}, error) DeleteImage(imageId string) (interface{}, error) QueryImageList() (interface{}, error) - SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, aiType string) (interface{}, error) + SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) QueryTask(taskId string) (interface{}, error) QuerySpecs() (interface{}, error) DeleteTask(taskId string) (interface{}, error)