Merge pull request 'scheduler module modified' (#19) from tzwang/pcm-coordinator:master into master

Former-commit-id: 90cf569b701ef2b7980da167278c43967803af23
This commit is contained in:
tzwang 2024-02-01 16:33:58 +08:00
commit 41c93839f9
19 changed files with 169 additions and 104 deletions

View File

@ -67,7 +67,7 @@ func (l *SubmitLinkTaskLogic) SubmitLinkTask(req *types.SubmitLinkTaskReq) (resp
envs = append(envs, env) envs = append(envs, env)
} }
} }
task, err := storelink.ILinkage.SubmitTask(req.ImageId, req.Cmd, envs, params, req.ResourceId) task, err := storelink.ILinkage.SubmitTask(req.ImageId, req.Cmd, envs, params, req.ResourceId, "")
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -17,11 +17,10 @@ package schedulers
import ( import (
"errors" "errors"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/entity"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/schedulers/option" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/schedulers/option"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy/param"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils"
@ -31,7 +30,7 @@ type AiScheduler struct {
yamlString string yamlString string
task *response.TaskInfo task *response.TaskInfo
*scheduler.Scheduler *scheduler.Scheduler
option option.AiOption option *option.AiOption
} }
func NewAiScheduler(val string, scheduler *scheduler.Scheduler) (*AiScheduler, error) { func NewAiScheduler(val string, scheduler *scheduler.Scheduler) (*AiScheduler, error) {
@ -50,36 +49,24 @@ func (as *AiScheduler) GetNewStructForDb(task *response.TaskInfo, resource strin
} }
func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) { func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
resources, err := as.findClustersWithResource() resources, err := as.findClustersWithResources()
if err != nil { if err != nil {
return nil, err return nil, err
} }
if len(resources) == 0 {
return nil, errors.New("no cluster has resources")
}
params := &param.Params{Resources: resources}
if len(resources) < 2 /*|| as.task */ { if len(resources) < 2 /*|| as.task */ {
var pros []entity.Participant strategy := strategy.NewReplicationStrategy(&param.ReplicationParams{Params: params /*, Replicas: 1*/})
for _, resource := range resources {
pros = append(pros, entity.Participant{
Participant_id: resource.ParticipantId,
Name: resource.Name,
})
}
strategy := strategy.NewReplicationStrategy(nil)
return strategy, nil return strategy, nil
} }
task, providerList := as.genTaskAndProviders() strategy := strategy.NewPricingStrategy(&param.ResourcePricingParams{Params: params})
if err != nil {
return nil, nil
}
strategy := strategy.NewPricingStrategy(task, providerList...)
return strategy, nil return strategy, nil
} }
func (as *AiScheduler) genTaskAndProviders() (*providerPricing.Task, []*providerPricing.Provider) {
return nil, nil
}
func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) error { func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) error {
if clusters == nil { if clusters == nil {
return errors.New("clusters is nil") return errors.New("clusters is nil")
@ -87,7 +74,7 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) error {
executorMap := *as.AiExecutor executorMap := *as.AiExecutor
for _, cluster := range clusters { for _, cluster := range clusters {
_, err := executorMap[cluster.Name].Execute(option.AiOption{}) _, err := executorMap[cluster.Name].Execute(as.option)
if err != nil { if err != nil {
// TODO: database operation // TODO: database operation
} }
@ -97,7 +84,7 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) error {
return nil return nil
} }
func (as *AiScheduler) findClustersWithResource() ([]*collector.ResourceSpecs, error) { func (as *AiScheduler) findClustersWithResources() ([]*collector.ResourceSpecs, error) {
var resourceSpecs []*collector.ResourceSpecs var resourceSpecs []*collector.ResourceSpecs
for _, resourceCollector := range *as.ResourceCollector { for _, resourceCollector := range *as.ResourceCollector {
spec, err := resourceCollector.GetResourceSpecs() spec, err := resourceCollector.GetResourceSpecs()

View File

@ -19,6 +19,7 @@ import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/database" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/database"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy/param"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils"
@ -38,12 +39,9 @@ func NewCloudScheduler() *CloudScheduler {
} }
func (cs *CloudScheduler) PickOptimalStrategy() (strategy.Strategy, error) { func (cs *CloudScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
task, providerList, err := cs.genTaskAndProviders() //获取所有计算中心
if err != nil {
return nil, nil
}
//调度算法 //调度算法
strategy := strategy.NewPricingStrategy(task, providerList...) strategy := strategy.NewPricingStrategy(&param.ResourcePricingParams{})
return strategy, nil return strategy, nil
} }

View File

@ -1,17 +1,20 @@
package option package option
type AiOption struct { type AiOption struct {
aiType string // shuguangAi/octopus AiType string // shuguangAi/octopus
resourceType string // cpu/gpu/compute card ResourceType string // cpu/gpu/compute card
taskType string // pytorch/tensorflow TaskType string // pytorch/tensorflow
imageId string ImageId string
specId string SpecId string
datasetsId string DatasetsId string
codeId string CodeId string
ResourceId string
cmd string Cmd string
Envs []string
Params []string
datasets string Datasets string
code string Code string
} }

View File

@ -19,7 +19,7 @@ var (
"Hanwuji": OCTOPUS, "Hanwuji": OCTOPUS,
"Suiyan": OCTOPUS, "Suiyan": OCTOPUS,
"Sailingsi": OCTOPUS, "Sailingsi": OCTOPUS,
"modelarts-CloudBrain2": MODELARTS, "Modelarts-CloudBrain2": MODELARTS,
"ShuguangAi": SHUGUANGAI, "ShuguangAi": SHUGUANGAI,
} }
) )

View File

@ -6,6 +6,6 @@ import (
) )
type AiExecutor interface { type AiExecutor interface {
Execute(option option.AiOption) (interface{}, error) Execute(option *option.AiOption) (interface{}, error)
storeLink.Linkage storeLink.Linkage
} }

View File

@ -1,9 +1,9 @@
package params package param
import ( import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector"
) )
type Params struct { type Params struct {
resources []*collector.ResourceSpecs Resources []*collector.ResourceSpecs
} }

View File

@ -0,0 +1,23 @@
package param
import "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/entity"
type ReplicationParams struct {
Replicas int32
*Params
}
func (r *ReplicationParams) GetReplicas() int32 {
return r.Replicas
}
func (r *ReplicationParams) GetParticipants() []*entity.Participant {
var participants []*entity.Participant
for _, resource := range r.Resources {
participants = append(participants, &entity.Participant{
Participant_id: resource.ParticipantId,
Name: resource.Name,
})
}
return participants
}

View File

@ -0,0 +1,32 @@
package param
import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing"
)
type ResourcePricingParams struct {
replicas int32
task *providerPricing.Task
*Params
}
func (r *ResourcePricingParams) GetReplicas() int32 {
return r.replicas
}
func (r *ResourcePricingParams) GetTask() *providerPricing.Task {
return r.task
}
func (r *ResourcePricingParams) GetProviders() []*providerPricing.Provider {
var providerList []*providerPricing.Provider
for _, resource := range r.Resources {
provider := providerPricing.NewProvider(
resource.ParticipantId,
resource.CpuAvail,
resource.MemAvail,
resource.DiskAvail, 0.0, 0.0, 0.0)
providerList = append(providerList, provider)
}
return providerList
}

View File

@ -1,16 +0,0 @@
package params
import "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/entity"
type ReplicationOption struct {
replicas int32
participants []entity.Participant
}
func (o *ReplicationOption) GetReplicas() int32 {
return o.replicas
}
func (o *ReplicationOption) GetParticipants() []entity.Participant {
return o.participants
}

View File

@ -1,26 +0,0 @@
package params
import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing"
)
type ResourcePricingOption struct {
replicas int32
task *providerPricing.Task
providers []*providerPricing.Provider
*Params
}
func NewResourcePricingOption(params *Params) *ResourcePricingOption {
return &ResourcePricingOption{
Params: params,
}
}
func (r *ResourcePricingOption) GetReplicas() int32 {
return r.replicas
}
func (r *ResourcePricingOption) GetProviders() []*providerPricing.Provider {
return r.providers
}

View File

@ -3,15 +3,15 @@ package strategy
import ( import (
"github.com/pkg/errors" "github.com/pkg/errors"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/entity" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/entity"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy/params" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy/param"
) )
type ReplicationStrategy struct { type ReplicationStrategy struct {
replicas int32 replicas int32
participants []entity.Participant participants []*entity.Participant
} }
func NewReplicationStrategy(params *params.ReplicationOption) *ReplicationStrategy { func NewReplicationStrategy(params *param.ReplicationParams) *ReplicationStrategy {
return &ReplicationStrategy{replicas: params.GetReplicas(), return &ReplicationStrategy{replicas: params.GetReplicas(),
participants: params.GetParticipants(), participants: params.GetParticipants(),
} }

View File

@ -17,6 +17,7 @@ package strategy
import ( import (
"errors" "errors"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy/param"
) )
type PricingStrategy struct { type PricingStrategy struct {
@ -25,7 +26,9 @@ type PricingStrategy struct {
StrategyList []*providerPricing.Strategy StrategyList []*providerPricing.Strategy
} }
func NewPricingStrategy(task *providerPricing.Task, providers ...*providerPricing.Provider) *PricingStrategy { func NewPricingStrategy(params *param.ResourcePricingParams) *PricingStrategy {
providers := params.GetProviders()
task := params.GetTask()
var providerList []*providerPricing.Provider var providerList []*providerPricing.Provider
var res [][]int var res [][]int

View File

@ -3,7 +3,9 @@ package test
import ( import (
"fmt" "fmt"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/entity" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/entity"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy/param"
"testing" "testing"
) )
@ -13,10 +15,23 @@ func TestReplication(t *testing.T) {
{Name: "test2", Participant_id: 2}, {Name: "test2", Participant_id: 2},
{Name: "test3", Participant_id: 3}, {Name: "test3", Participant_id: 3},
} }
rsc := []*collector.ResourceSpecs{
{
ParticipantId: 1,
Name: "test1",
},
{
ParticipantId: 1,
Name: "test2"},
{
ParticipantId: 1,
Name: "test3"},
}
tests := []struct { tests := []struct {
name string name string
replica int32 replica int32
ps []entity.Participant ps []entity.Participant
res []*collector.ResourceSpecs
}{ }{
{ {
name: "test1", name: "test1",
@ -32,7 +47,8 @@ func TestReplication(t *testing.T) {
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
repl := strategy.NewReplicationStrategy(nil) params := &param.Params{Resources: rsc}
repl := strategy.NewReplicationStrategy(&param.ReplicationParams{Params: params, Replicas: tt.replica})
schedule, err := repl.Schedule() schedule, err := repl.Schedule()
if err != nil { if err != nil {
return return

View File

@ -63,7 +63,7 @@ func (o *ModelArtsLink) QueryImageList() (interface{}, error) {
return resp, nil return resp, nil
} }
func (o *ModelArtsLink) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (interface{}, error) { func (o *ModelArtsLink) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, aiType string) (interface{}, error) {
// modelArts提交任务 // modelArts提交任务
environments := make(map[string]string) environments := make(map[string]string)
parameters := make([]*modelarts.ParametersTrainJob, 0) parameters := make([]*modelarts.ParametersTrainJob, 0)
@ -153,6 +153,10 @@ func (o *ModelArtsLink) GetResourceSpecs() (*collector.ResourceSpecs, error) {
return nil, nil return nil, nil
} }
func (o *ModelArtsLink) Execute(option option.AiOption) (interface{}, error) { func (o *ModelArtsLink) Execute(option *option.AiOption) (interface{}, error) {
return nil, nil task, err := o.SubmitTask(option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.AiType)
if err != nil {
return nil, err
}
return task, nil
} }

View File

@ -107,7 +107,7 @@ func (o *OctopusLink) QueryImageList() (interface{}, error) {
return resp, nil return resp, nil
} }
func (o *OctopusLink) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (interface{}, error) { func (o *OctopusLink) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, aiType string) (interface{}, error) {
// octopus提交任务 // octopus提交任务
// python参数 // python参数
@ -200,6 +200,10 @@ func (o *OctopusLink) GetResourceSpecs() (*collector.ResourceSpecs, error) {
return nil, nil return nil, nil
} }
func (o *OctopusLink) Execute(option option.AiOption) (interface{}, error) { func (o *OctopusLink) Execute(option *option.AiOption) (interface{}, error) {
return nil, nil task, err := o.SubmitTask(option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.AiType)
if err != nil {
return nil, err
}
return task, nil
} }

View File

@ -144,7 +144,7 @@ func (s ShuguangHpc) QueryImageList() (interface{}, error) {
return nil, nil return nil, nil
} }
func (s ShuguangHpc) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (interface{}, error) { func (s ShuguangHpc) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, aiType string) (interface{}, error) {
// shuguangHpc提交任务 // shuguangHpc提交任务
//判断是否resourceId匹配自定义资源Id //判断是否resourceId匹配自定义资源Id

View File

@ -22,6 +22,7 @@ import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils"
"strconv"
"strings" "strings"
) )
@ -75,9 +76,7 @@ func (s *ShuguangAi) QueryImageList() (interface{}, error) {
return resp, nil return resp, nil
} }
func (s *ShuguangAi) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (interface{}, error) { func (s *ShuguangAi) SubmitPytorchTask(imageId string, cmd string, envs []string, params []string, resourceId string) (interface{}, error) {
// shuguangAi提交任务
//判断是否resourceId匹配自定义资源Id //判断是否resourceId匹配自定义资源Id
if resourceId != SHUGUANGAI_CUSTOM_RESOURCE_ID { if resourceId != SHUGUANGAI_CUSTOM_RESOURCE_ID {
return nil, errors.New("shuguangAi资源Id不存在") return nil, errors.New("shuguangAi资源Id不存在")
@ -132,6 +131,18 @@ func (s *ShuguangAi) SubmitTask(imageId string, cmd string, envs []string, param
return resp, nil return resp, nil
} }
func (s *ShuguangAi) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, aiType string) (interface{}, error) {
// shuguangAi提交任务
if aiType == PYTORCH {
task, err := s.SubmitPytorchTask(imageId, cmd, envs, params, resourceId)
if err != nil {
return nil, err
}
return task, nil
}
return nil, errors.New("shuguangAi不支持的任务类型")
}
func (s *ShuguangAi) QueryTask(taskId string) (interface{}, error) { func (s *ShuguangAi) QueryTask(taskId string) (interface{}, error) {
// shuguangAi获取任务 // shuguangAi获取任务
req := &hpcAC.GetPytorchTaskReq{ req := &hpcAC.GetPytorchTaskReq{
@ -173,9 +184,35 @@ func (o *ShuguangAi) QuerySpecs() (interface{}, error) {
} }
func (o *ShuguangAi) GetResourceSpecs() (*collector.ResourceSpecs, error) { func (o *ShuguangAi) GetResourceSpecs() (*collector.ResourceSpecs, error) {
userReq := &hpcAC.GetUserInfoReq{}
userinfo, err := o.svcCtx.ACRpc.GetUserInfo(o.ctx, userReq)
if err != nil {
return nil, err
}
limitReq := &hpcAC.QueueReq{}
_, err = o.svcCtx.ACRpc.QueryUserQuotasLimit(o.ctx, limitReq)
if err != nil {
return nil, err
}
diskReq := &hpcAC.ParaStorQuotaReq{}
_, err = o.svcCtx.ACRpc.ParaStorQuota(o.ctx, diskReq)
if err != nil {
return nil, err
}
balance, _ := strconv.ParseFloat(userinfo.Data.AccountBalance, 64)
_ = &collector.ResourceSpecs{
ParticipantId: o.participantId,
Name: o.platform,
Balance: balance,
}
return nil, nil return nil, nil
} }
func (o *ShuguangAi) Execute(option option.AiOption) (interface{}, error) { func (o *ShuguangAi) Execute(option *option.AiOption) (interface{}, error) {
return nil, nil task, err := o.SubmitTask(option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.AiType)
if err != nil {
return nil, err
}
return task, nil
} }

View File

@ -31,7 +31,7 @@ type Linkage interface {
UploadImage(path string) (interface{}, error) UploadImage(path string) (interface{}, error)
DeleteImage(imageId string) (interface{}, error) DeleteImage(imageId string) (interface{}, error)
QueryImageList() (interface{}, error) QueryImageList() (interface{}, error)
SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (interface{}, error) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, aiType string) (interface{}, error)
QueryTask(taskId string) (interface{}, error) QueryTask(taskId string) (interface{}, error)
QuerySpecs() (interface{}, error) QuerySpecs() (interface{}, error)
DeleteTask(taskId string) (interface{}, error) DeleteTask(taskId string) (interface{}, error)