Merge pull request 'modified storelink submit method' (#25) from tzwang/pcm-coordinator:master into master
Former-commit-id: 5577c5808ea9f0d18fc5c5f3b2f4ba40a6512a01
This commit is contained in:
commit
07e5bcb5ba
|
@ -786,4 +786,35 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {
|
|||
},
|
||||
rest.WithPrefix("/pcm/v1"),
|
||||
)
|
||||
|
||||
server.AddRoutes(
|
||||
[]rest.Route{
|
||||
{
|
||||
Method: http.MethodGet,
|
||||
Path: "/schedule/ai/getResourceTypes",
|
||||
Handler: schedule.ScheduleGetAiResourceTypesHandler(serverCtx),
|
||||
},
|
||||
{
|
||||
Method: http.MethodGet,
|
||||
Path: "/schedule/ai/getTaskTypes",
|
||||
Handler: schedule.ScheduleGetAiTaskTypesHandler(serverCtx),
|
||||
},
|
||||
{
|
||||
Method: http.MethodGet,
|
||||
Path: "/schedule/ai/getDatasets",
|
||||
Handler: schedule.ScheduleGetDatasetsHandler(serverCtx),
|
||||
},
|
||||
{
|
||||
Method: http.MethodGet,
|
||||
Path: "/schedule/ai/getStrategies",
|
||||
Handler: schedule.ScheduleGetStrategyHandler(serverCtx),
|
||||
},
|
||||
{
|
||||
Method: http.MethodPost,
|
||||
Path: "/schedule/submit",
|
||||
Handler: schedule.ScheduleSubmitHandler(serverCtx),
|
||||
},
|
||||
},
|
||||
rest.WithPrefix("/pcm/v1"),
|
||||
)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,33 @@
|
|||
package schedule
|
||||
|
||||
import (
|
||||
"context"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/storeLink"
|
||||
|
||||
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
|
||||
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
)
|
||||
|
||||
type ScheduleGetAiResourceTypesLogic struct {
|
||||
logx.Logger
|
||||
ctx context.Context
|
||||
svcCtx *svc.ServiceContext
|
||||
}
|
||||
|
||||
func NewScheduleGetAiResourceTypesLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ScheduleGetAiResourceTypesLogic {
|
||||
return &ScheduleGetAiResourceTypesLogic{
|
||||
Logger: logx.WithContext(ctx),
|
||||
ctx: ctx,
|
||||
svcCtx: svcCtx,
|
||||
}
|
||||
}
|
||||
|
||||
func (l *ScheduleGetAiResourceTypesLogic) ScheduleGetAiResourceTypes() (resp *types.AiResourceTypesResp, err error) {
|
||||
resp = &types.AiResourceTypesResp{}
|
||||
resourceTypes := storeLink.GetResourceTypes()
|
||||
resp.ResourceTypes = resourceTypes
|
||||
|
||||
return resp, nil
|
||||
}
|
|
@ -0,0 +1,33 @@
|
|||
package schedule
|
||||
|
||||
import (
|
||||
"context"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/storeLink"
|
||||
|
||||
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
|
||||
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
)
|
||||
|
||||
type ScheduleGetAiTaskTypesLogic struct {
|
||||
logx.Logger
|
||||
ctx context.Context
|
||||
svcCtx *svc.ServiceContext
|
||||
}
|
||||
|
||||
func NewScheduleGetAiTaskTypesLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ScheduleGetAiTaskTypesLogic {
|
||||
return &ScheduleGetAiTaskTypesLogic{
|
||||
Logger: logx.WithContext(ctx),
|
||||
ctx: ctx,
|
||||
svcCtx: svcCtx,
|
||||
}
|
||||
}
|
||||
|
||||
func (l *ScheduleGetAiTaskTypesLogic) ScheduleGetAiTaskTypes() (resp *types.AiTaskTypesResp, err error) {
|
||||
resp = &types.AiTaskTypesResp{}
|
||||
taskTypes := storeLink.GetTaskTypes()
|
||||
resp.TaskTypes = taskTypes
|
||||
|
||||
return resp, nil
|
||||
}
|
|
@ -0,0 +1,37 @@
|
|||
package schedule
|
||||
|
||||
import (
|
||||
"context"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/storeLink"
|
||||
|
||||
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
|
||||
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
)
|
||||
|
||||
type ScheduleGetDatasetsLogic struct {
|
||||
logx.Logger
|
||||
ctx context.Context
|
||||
svcCtx *svc.ServiceContext
|
||||
}
|
||||
|
||||
func NewScheduleGetDatasetsLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ScheduleGetDatasetsLogic {
|
||||
return &ScheduleGetDatasetsLogic{
|
||||
Logger: logx.WithContext(ctx),
|
||||
ctx: ctx,
|
||||
svcCtx: svcCtx,
|
||||
}
|
||||
}
|
||||
|
||||
func (l *ScheduleGetDatasetsLogic) ScheduleGetDatasets() (resp *types.AiDatasetsResp, err error) {
|
||||
resp = &types.AiDatasetsResp{}
|
||||
_, colMap := service.InitAiClusterMap(l.ctx, l.svcCtx)
|
||||
names, err := storeLink.GetDatasetsNames(colMap)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resp.Datasets = names
|
||||
return resp, nil
|
||||
}
|
|
@ -0,0 +1,32 @@
|
|||
package schedule
|
||||
|
||||
import (
|
||||
"context"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
|
||||
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
)
|
||||
|
||||
type ScheduleGetStrategyLogic struct {
|
||||
logx.Logger
|
||||
ctx context.Context
|
||||
svcCtx *svc.ServiceContext
|
||||
}
|
||||
|
||||
func NewScheduleGetStrategyLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ScheduleGetStrategyLogic {
|
||||
return &ScheduleGetStrategyLogic{
|
||||
Logger: logx.WithContext(ctx),
|
||||
ctx: ctx,
|
||||
svcCtx: svcCtx,
|
||||
}
|
||||
}
|
||||
|
||||
func (l *ScheduleGetStrategyLogic) ScheduleGetStrategy() (resp *types.AiStrategyResp, err error) {
|
||||
resp = &types.AiStrategyResp{}
|
||||
names := strategy.GetStrategyNames()
|
||||
resp.Strategies = names
|
||||
|
||||
return resp, nil
|
||||
}
|
|
@ -0,0 +1,30 @@
|
|||
package schedule
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
|
||||
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
)
|
||||
|
||||
type ScheduleSubmitLogic struct {
|
||||
logx.Logger
|
||||
ctx context.Context
|
||||
svcCtx *svc.ServiceContext
|
||||
}
|
||||
|
||||
func NewScheduleSubmitLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ScheduleSubmitLogic {
|
||||
return &ScheduleSubmitLogic{
|
||||
Logger: logx.WithContext(ctx),
|
||||
ctx: ctx,
|
||||
svcCtx: svcCtx,
|
||||
}
|
||||
}
|
||||
|
||||
func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleResp) (resp *types.ScheduleResp, err error) {
|
||||
// todo: add your logic here and delete this line
|
||||
|
||||
return
|
||||
}
|
|
@ -67,7 +67,7 @@ func (l *SubmitLinkTaskLogic) SubmitLinkTask(req *types.SubmitLinkTaskReq) (resp
|
|||
envs = append(envs, env)
|
||||
}
|
||||
}
|
||||
task, err := storelink.ILinkage.SubmitTask(req.ImageId, req.Cmd, envs, params, req.ResourceId, "pytorch")
|
||||
task, err := storelink.ILinkage.SubmitTask(req.ImageId, req.Cmd, envs, params, req.ResourceId, "", "", "pytorch")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -71,7 +71,7 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
|
|||
case strategy.REPLICATION:
|
||||
strategy := strategy.NewReplicationStrategy(¶m.ReplicationParams{Params: params, Replicas: 1})
|
||||
return strategy, nil
|
||||
case strategy.RESOURCE_PRICING:
|
||||
case strategy.RESOURCES_PRICING:
|
||||
strategy := strategy.NewPricingStrategy(¶m.ResourcePricingParams{Params: params, Replicas: 1})
|
||||
return strategy, nil
|
||||
}
|
||||
|
|
|
@ -9,11 +9,12 @@ type AiOption struct {
|
|||
ClusterToStaticWeight map[string]int32
|
||||
CodeType string
|
||||
|
||||
ImageId string
|
||||
SpecId string
|
||||
//DatasetsId string
|
||||
CodeId string
|
||||
ResourceId string
|
||||
ImageId string
|
||||
SpecId string
|
||||
DatasetsId string
|
||||
CodeId string
|
||||
ResourceId string
|
||||
AlgorithmId string
|
||||
|
||||
Cmd string
|
||||
Envs []string
|
||||
|
|
|
@ -16,11 +16,11 @@ const (
|
|||
|
||||
var (
|
||||
AiTypeMap = map[string]string{
|
||||
"hanwuji": OCTOPUS,
|
||||
"suiyan": OCTOPUS,
|
||||
"sailingsi": OCTOPUS,
|
||||
"modelarts-CloudBrain2": MODELARTS,
|
||||
"shuguangAi": SHUGUANGAI,
|
||||
"hanwuji": OCTOPUS,
|
||||
//"suiyan": OCTOPUS,
|
||||
//"sailingsi": OCTOPUS,
|
||||
//"modelarts-CloudBrain2": MODELARTS,
|
||||
"shuguangAi": SHUGUANGAI,
|
||||
}
|
||||
)
|
||||
|
||||
|
|
|
@ -1,14 +1,16 @@
|
|||
package strategy
|
||||
|
||||
const (
|
||||
REPLICATION = "replication"
|
||||
RESOURCE_PRICING = "resourcePricing"
|
||||
STATIC_WEIGHT = "staticWeight"
|
||||
DYNAMIC_WEIGHT = "dynamicWeight"
|
||||
REPLICATION = "replication"
|
||||
RESOURCES_PRICING = "resourcesPricing"
|
||||
STATIC_WEIGHT = "staticWeight"
|
||||
DYNAMIC_RESOURCES = "dynamicResources"
|
||||
DATA_LOCALITY = "dataLocality" //感知数据位置,数据调度和计算调度协同,近数据调度
|
||||
ENERGY_CONSUMPTION = "energyConsumption" //根据各集群总体能耗水平调度作业,优先选择能耗低的集群调度作业
|
||||
)
|
||||
|
||||
var (
|
||||
strategyNames = []string{REPLICATION, RESOURCE_PRICING}
|
||||
strategyNames = []string{REPLICATION, RESOURCES_PRICING, STATIC_WEIGHT, DYNAMIC_RESOURCES}
|
||||
)
|
||||
|
||||
type Strategy interface {
|
||||
|
|
|
@ -15,7 +15,7 @@ func TestReplication(t *testing.T) {
|
|||
{Name: "test2", Participant_id: 2},
|
||||
{Name: "test3", Participant_id: 3},
|
||||
}
|
||||
rsc := []*collector.ResourceSpecs{
|
||||
rsc := []*collector.ResourceStats{
|
||||
{
|
||||
ParticipantId: 1,
|
||||
Name: "test1",
|
||||
|
@ -31,7 +31,7 @@ func TestReplication(t *testing.T) {
|
|||
name string
|
||||
replica int32
|
||||
ps []entity.Participant
|
||||
res []*collector.ResourceSpecs
|
||||
res []*collector.ResourceStats
|
||||
}{
|
||||
{
|
||||
name: "test1",
|
||||
|
|
|
@ -63,7 +63,7 @@ func (m *ModelArtsLink) QueryImageList() (interface{}, error) {
|
|||
return resp, nil
|
||||
}
|
||||
|
||||
func (m *ModelArtsLink) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, aiType string) (interface{}, error) {
|
||||
func (m *ModelArtsLink) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) {
|
||||
// modelArts提交任务
|
||||
environments := make(map[string]string)
|
||||
parameters := make([]*modelarts.ParametersTrainJob, 0)
|
||||
|
@ -162,7 +162,7 @@ func (m *ModelArtsLink) Execute(option *option.AiOption) (interface{}, error) {
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
task, err := m.SubmitTask(option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.TaskType)
|
||||
task, err := m.SubmitTask(option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.DatasetsId, option.AlgorithmId, option.TaskType)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -108,7 +108,7 @@ func (o *OctopusLink) QueryImageList() (interface{}, error) {
|
|||
return resp, nil
|
||||
}
|
||||
|
||||
func (o *OctopusLink) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, aiType string) (interface{}, error) {
|
||||
func (o *OctopusLink) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) {
|
||||
// octopus提交任务
|
||||
|
||||
// python参数
|
||||
|
@ -227,7 +227,7 @@ func (o *OctopusLink) Execute(option *option.AiOption) (interface{}, error) {
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
task, err := o.SubmitTask(option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.TaskType)
|
||||
task, err := o.SubmitTask(option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.DatasetsId, option.AlgorithmId, option.TaskType)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -239,6 +239,10 @@ func (o *OctopusLink) GenerateSubmitParams(option *option.AiOption) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = o.generateDatasetsId(option)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = o.generateImageId(option)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -259,10 +263,34 @@ func (o *OctopusLink) GenerateSubmitParams(option *option.AiOption) error {
|
|||
}
|
||||
|
||||
func (o *OctopusLink) generateResourceId(option *option.AiOption) error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (o *OctopusLink) generateDatasetsId(option *option.AiOption) error {
|
||||
if option.DatasetsName == "" {
|
||||
return errors.New("DatasetsName not set")
|
||||
}
|
||||
req := &octopus.GetMyDatasetListReq{
|
||||
Platform: o.platform,
|
||||
PageIndex: o.pageIndex,
|
||||
PageSize: o.pageSize,
|
||||
}
|
||||
resp, err := o.svcCtx.OctopusRpc.GetMyDatasetList(o.ctx, req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !resp.Success {
|
||||
return errors.New("failed to get DatasetsId")
|
||||
}
|
||||
for _, dataset := range resp.Payload.Datasets {
|
||||
if dataset.Name == option.DatasetsName {
|
||||
option.DatasetsId = dataset.Id
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return errors.New("failed to get DatasetsId")
|
||||
}
|
||||
|
||||
func (o *OctopusLink) generateImageId(option *option.AiOption) error {
|
||||
|
||||
return nil
|
||||
|
|
|
@ -144,7 +144,7 @@ func (s ShuguangHpc) QueryImageList() (interface{}, error) {
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
func (s ShuguangHpc) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, aiType string) (interface{}, error) {
|
||||
func (s ShuguangHpc) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) {
|
||||
// shuguangHpc提交任务
|
||||
|
||||
//判断是否resourceId匹配自定义资源Id
|
||||
|
|
|
@ -141,7 +141,7 @@ func (s *ShuguangAi) SubmitTensorflowTask(imageId string, cmd string, envs []str
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
func (s *ShuguangAi) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, aiType string) (interface{}, error) {
|
||||
func (s *ShuguangAi) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) {
|
||||
// shuguangAi提交任务
|
||||
switch aiType {
|
||||
case PYTORCH_TASK:
|
||||
|
@ -248,7 +248,7 @@ func (s *ShuguangAi) Execute(option *option.AiOption) (interface{}, error) {
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
task, err := s.SubmitTask(option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.TaskType)
|
||||
task, err := s.SubmitTask(option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.DatasetsId, option.AlgorithmId, option.TaskType)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -33,7 +33,7 @@ type Linkage interface {
|
|||
UploadImage(path string) (interface{}, error)
|
||||
DeleteImage(imageId string) (interface{}, error)
|
||||
QueryImageList() (interface{}, error)
|
||||
SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, aiType string) (interface{}, error)
|
||||
SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error)
|
||||
QueryTask(taskId string) (interface{}, error)
|
||||
QuerySpecs() (interface{}, error)
|
||||
DeleteTask(taskId string) (interface{}, error)
|
||||
|
|
Loading…
Reference in New Issue