From 96ab6a81a6d408d39c768baf1b5c4b95033d1091 Mon Sep 17 00:00:00 2001 From: qiwang <1364512070@qq.com> Date: Fri, 17 May 2024 17:35:48 +0800 Subject: [PATCH 1/3] fix: update ai Replicas Former-commit-id: e4f63e15056835d0f9b34b39b7b527db387e216d --- .../scheduler/schedulers/aiScheduler.go | 2 + api/internal/scheduler/strategy/random.go | 10 ++ api/internal/scheduler/strategy/strategy.go | 1 + .../scheduler/strategy/test/strategy_test.go | 143 ++++++++++++++++++ 4 files changed, 156 insertions(+) create mode 100644 api/internal/scheduler/strategy/random.go diff --git a/api/internal/scheduler/schedulers/aiScheduler.go b/api/internal/scheduler/schedulers/aiScheduler.go index 50cf27ca..5c8daacc 100644 --- a/api/internal/scheduler/schedulers/aiScheduler.go +++ b/api/internal/scheduler/schedulers/aiScheduler.go @@ -107,6 +107,8 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) { //todo resources should match cluster StaticWeightMap strategy := strategy.NewStaticWeightStrategy(as.option.ClusterToStaticWeight, as.option.Replica) return strategy, nil + case strategy.RANDOM: + } return nil, errors.New("no strategy has been chosen") diff --git a/api/internal/scheduler/strategy/random.go b/api/internal/scheduler/strategy/random.go new file mode 100644 index 00000000..b030bafd --- /dev/null +++ b/api/internal/scheduler/strategy/random.go @@ -0,0 +1,10 @@ +package strategy + +/*type RandomStrategy struct { + clusterIds []string + replicas int32 +} + +func NewRandomStrategy(clusterIds []string, replicas int32) *NewRandomStrategy { + return &RandomStrategy{clusterIds: clusterIds, replicas: replicas} +}*/ diff --git a/api/internal/scheduler/strategy/strategy.go b/api/internal/scheduler/strategy/strategy.go index 59d6cb17..91cd4d4d 100644 --- a/api/internal/scheduler/strategy/strategy.go +++ b/api/internal/scheduler/strategy/strategy.go @@ -5,6 +5,7 @@ const ( RESOURCES_PRICING = "resourcesPricing" STATIC_WEIGHT = "staticWeight" DYNAMIC_RESOURCES = "dynamicResources" + RANDOM = "random" DATA_LOCALITY = "dataLocality" //感知数据位置,数据调度和计算调度协同,近数据调度 ENERGY_CONSUMPTION = "energyConsumption" //根据各集群总体能耗水平调度作业,优先选择能耗低的集群调度作业 ) diff --git a/api/internal/scheduler/strategy/test/strategy_test.go b/api/internal/scheduler/strategy/test/strategy_test.go index 376e93c9..8802ffa2 100644 --- a/api/internal/scheduler/strategy/test/strategy_test.go +++ b/api/internal/scheduler/strategy/test/strategy_test.go @@ -5,7 +5,9 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/entity" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy" + "math/rand" "testing" + "time" ) func TestReplication(t *testing.T) { @@ -106,3 +108,144 @@ func TestStaticWeight(t *testing.T) { }) } } + +func TestRandom(t *testing.T) { + // 使用当前时间作为随机数种子,确保每次程序运行产生的随机数序列都不同 + rand.Seed(time.Now().UnixNano()) + + /*randomNum := randInt(1, 100) + fmt.Println("Random number:", randomNum)*/ + total := 5 // 假设总数是5 + first, second := splitIntoTwoRandomParts(total) + fmt.Printf("第一部分的数量: %d, 第二部分的数量: %d\n", first, second) +} + +// randInt 生成一个指定范围内的随机整数,包括min但不包括max +func randInt(min, max int) int { + return min + rand.Intn(max-min) +} + +func splitIntoTwoRandomParts(total int) (int, int) { + if total < 2 { + // 如果总数小于2,则无法分成两部分 + return 0, 0 + } + // 生成一个随机数作为第一部分的数量(范围在[1, total-1]之间) + firstPart := rand.Intn(total-1) + 1 + // 第二部分的数量就是总数减去第一部分的数量 + secondPart := total - firstPart + return firstPart, secondPart +} + +func splitIntoRandomParts(total int) (int, int) { + if total < 2 { + // 如果总数小于2,则无法分成两部分 + return 0, 0 + } + // 生成一个随机数作为第一部分的数量(范围在[1, total-1]之间) + firstPart := rand.Intn(total-1) + 1 + // 第二部分的数量就是总数减去第一部分的数量 + secondPart := total - firstPart + return firstPart, secondPart +} + +func TestRandoms(t *testing.T) { + // 使用当前时间作为随机数种子,确保每次程序运行产生的随机数序列都不同 + rand.Seed(time.Now().UnixNano()) + + /*randomNum := randInt(1, 100) + fmt.Println("Random number:", randomNum)*/ + total := 10 // 假设总数是5 + parts := splitRandomParts(total) + fmt.Println("分配结果:", parts) +} + +// splitIntoRandomParts 将总数total随机分成多个部分,并返回这些部分的切片 +func splitRandomParts(total int) []int { + if total < 2 { + // 如果总数小于2,则无法分成多个部分 + return []int{total} + } + + // 创建一个切片来保存每个部分的数量 + var parts []int + + // 剩余要分配的副本数 + remaining := total + + // 随机决定要分成的部分数量(至少2个部分) + numParts := rand.Intn(total-1) + 2 + + // 确保每个部分至少获得1个副本 + for i := 0; i < numParts-1; i++ { + // 生成一个随机数(1到剩余副本数之间) + // 为了避免最后一个部分太小,我们可能需要调整随机数范围 + minPartSize := 1 + if remaining <= numParts-i { + // 如果剩余副本数不足以让每个部分都至少获得1个,则调整最小部分大小 + minPartSize = remaining / (numParts - i) + if remaining%(numParts-i) > 0 { + minPartSize++ + } + } + // 生成一个大于等于minPartSize且小于等于remaining的随机数 + partSize := minPartSize + rand.Intn(remaining-minPartSize+1) + parts = append(parts, partSize) + remaining -= partSize + } + + // 最后一个部分的数量就是剩余的副本数 + parts = append(parts, remaining) + + return parts +} + +func TestNumRandom(t *testing.T) { + total := 10 // 假设副本数是10 + numParts := 3 // 假设要分成5个集群 + + parts, err := splitIntoParts(total, numParts) + if err != nil { + fmt.Println("Error:", err) + return + } + fmt.Println("分配结果:", parts) +} + +// splitIntoParts 将总数total随机分成numParts个部分,并返回这些部分的切片 +func splitIntoParts(total int, numParts int) ([]int, error) { + if total < 1 || numParts < 1 { + // 总数或部分数量不能小于1 + return nil, fmt.Errorf("total and numParts must be greater than 0") + } + if numParts > total { + // 部分数量不能大于总数 + return nil, fmt.Errorf("numParts cannot be greater than total") + } + + // 创建一个切片来保存每个部分的数量 + parts := make([]int, numParts) + + // 首先将每个部分都分配至少一个副本 + for i := range parts { + parts[i] = 1 + total-- + } + + // 剩余要分配的副本数 + remaining := total + + // 随机分配剩余的副本 + for remaining > 0 { + // 随机选择一个部分(索引从0到numParts-1) + partIndex := rand.Intn(numParts) + + // 如果该部分加上一个副本后不会超过总数,则分配一个副本 + if parts[partIndex]+1 <= total { + parts[partIndex]++ + remaining-- + } + } + + return parts, nil +} From 61f7363302dce44e9fc100e08199cb35f36cba07 Mon Sep 17 00:00:00 2001 From: jagger Date: Mon, 20 May 2024 19:58:33 +0800 Subject: [PATCH 2/3] fix bug Signed-off-by: jagger Former-commit-id: adc07bd843d2938c09d0932a8fe9cf1031a30005 --- .../scheduler/schedulers/aiScheduler.go | 12 ++++ api/internal/scheduler/service/aiService.go | 2 +- api/internal/storeLink/modelarts.go | 61 ++++++++++++++++--- api/internal/storeLink/storeLink.go | 2 +- 4 files changed, 68 insertions(+), 9 deletions(-) diff --git a/api/internal/scheduler/schedulers/aiScheduler.go b/api/internal/scheduler/schedulers/aiScheduler.go index f3bd56e1..c033f9c9 100644 --- a/api/internal/scheduler/schedulers/aiScheduler.go +++ b/api/internal/scheduler/schedulers/aiScheduler.go @@ -31,6 +31,7 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" + "gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice" "gitlink.org.cn/JointCloud/pcm-octopus/octopus" "sync" ) @@ -340,6 +341,17 @@ func convertType(in interface{}) (*AiResult, error) { result.Msg = resp.Error.Message } + return &result, nil + case *modelartsservice.CreateTrainingJobResp: + resp := (in).(*modelartsservice.CreateTrainingJobResp) + + if resp.ErrorMsg != "" { + result.Msg = resp.ErrorMsg + } else { + + result.JobId = resp.Metadata.Id + } + return &result, nil default: return nil, errors.New("ai task response failed") diff --git a/api/internal/scheduler/service/aiService.go b/api/internal/scheduler/service/aiService.go index 45b6da6d..0567e4a3 100644 --- a/api/internal/scheduler/service/aiService.go +++ b/api/internal/scheduler/service/aiService.go @@ -64,7 +64,7 @@ func InitAiClusterMap(conf *config.Config, clusters []types.ClusterInfo) (map[st id, _ := strconv.ParseInt(c.Id, 10, 64) modelArtsRpc := modelartsservice.NewModelArtsService(zrpc.MustNewClient(conf.ModelArtsRpcConf)) modelArtsImgRpc := imagesservice.NewImagesService(zrpc.MustNewClient(conf.ModelArtsImgRpcConf)) - modelarts := storeLink.NewModelArtsLink(modelArtsRpc, modelArtsImgRpc, c.Nickname, id) + modelarts := storeLink.NewModelArtsLink(modelArtsRpc, modelArtsImgRpc, c.Name, id, c.Nickname) collectorMap[c.Id] = modelarts executorMap[c.Id] = modelarts case SHUGUANGAI: diff --git a/api/internal/storeLink/modelarts.go b/api/internal/storeLink/modelarts.go index ee8df706..c91df822 100644 --- a/api/internal/storeLink/modelarts.go +++ b/api/internal/storeLink/modelarts.go @@ -27,6 +27,10 @@ import ( "strings" ) +const ( + Ascend = "Ascend" +) + type ModelArtsLink struct { modelArtsRpc modelartsservice.ModelArtsService modelArtsImgRpc imagesservice.ImagesService @@ -36,8 +40,8 @@ type ModelArtsLink struct { pageSize int32 } -func NewModelArtsLink(modelArtsRpc modelartsservice.ModelArtsService, modelArtsImgRpc imagesservice.ImagesService, name string, id int64) *ModelArtsLink { - return &ModelArtsLink{modelArtsRpc: modelArtsRpc, modelArtsImgRpc: modelArtsImgRpc, platform: name, participantId: id, pageIndex: 1, pageSize: 100} +func NewModelArtsLink(modelArtsRpc modelartsservice.ModelArtsService, modelArtsImgRpc imagesservice.ImagesService, name string, id int64, nickname string) *ModelArtsLink { + return &ModelArtsLink{modelArtsRpc: modelArtsRpc, modelArtsImgRpc: modelArtsImgRpc, platform: nickname, participantId: id, pageIndex: 0, pageSize: 50} } func (m *ModelArtsLink) UploadImage(ctx context.Context, path string) (interface{}, error) { @@ -87,6 +91,7 @@ func (m *ModelArtsLink) SubmitTask(ctx context.Context, imageId string, cmd stri WorkspaceId: "0", }, Algorithm: &modelarts.Algorithms{ + Id: algorithmId, Engine: &modelarts.EngineCreateTraining{ ImageUrl: imageId, }, @@ -184,7 +189,9 @@ func (m *ModelArtsLink) GetAlgorithms(ctx context.Context) ([]*collector.Algorit } func (m *ModelArtsLink) GetComputeCards(ctx context.Context) ([]string, error) { - return nil, nil + var cards []string + cards = append(cards, Ascend) + return cards, nil } func (m *ModelArtsLink) GetUserBalance(ctx context.Context) (float64, error) { @@ -224,6 +231,10 @@ func (m *ModelArtsLink) GenerateSubmitParams(ctx context.Context, option *option if err != nil { return err } + err = m.generateAlgorithmId(ctx, option) + if err != nil { + return err + } err = m.generateImageId(option) if err != nil { return err @@ -244,10 +255,7 @@ func (m *ModelArtsLink) GenerateSubmitParams(ctx context.Context, option *option } func (m *ModelArtsLink) generateResourceId(ctx context.Context, option *option.AiOption) error { - _, err := m.QuerySpecs(ctx) - if err != nil { - return err - } + option.ResourceId = "modelarts.kat1.xlarge" return nil } @@ -270,3 +278,42 @@ func (m *ModelArtsLink) generateParams(option *option.AiOption) error { return nil } + +func (m *ModelArtsLink) generateAlgorithmId(ctx context.Context, option *option.AiOption) error { + req := &modelarts.ListAlgorithmsReq{ + Platform: m.platform, + Offset: m.pageIndex, + Limit: m.pageSize, + } + resp, err := m.modelArtsRpc.ListAlgorithms(ctx, req) + if err != nil { + return err + } + if resp.ErrorMsg != "" { + return errors.New("failed to get algorithmId") + } + + for _, algorithm := range resp.Items { + engVersion := algorithm.JobConfig.Engine.EngineVersion + if strings.Contains(engVersion, option.TaskType) { + ns := strings.Split(algorithm.Metadata.Name, DASH) + if ns[0] != option.TaskType { + continue + } + if ns[1] != option.DatasetsName { + continue + } + if ns[2] != option.AlgorithmName { + continue + } + option.AlgorithmId = algorithm.Metadata.Id + return nil + } + } + + if option.AlgorithmId == "" { + return errors.New("Algorithm does not exist") + } + + return errors.New("failed to get AlgorithmId") +} diff --git a/api/internal/storeLink/storeLink.go b/api/internal/storeLink/storeLink.go index e3e86a46..24958177 100644 --- a/api/internal/storeLink/storeLink.go +++ b/api/internal/storeLink/storeLink.go @@ -99,7 +99,7 @@ func NewStoreLink(octopusRpc octopusclient.Octopus, modelArtsRpc modelartsservic linkStruct := NewOctopusLink(octopusRpc, participant.Name, participant.Id) return &StoreLink{ILinkage: linkStruct} case TYPE_MODELARTS: - linkStruct := NewModelArtsLink(modelArtsRpc, modelArtsImgRpc, participant.Name, participant.Id) + linkStruct := NewModelArtsLink(modelArtsRpc, modelArtsImgRpc, participant.Name, participant.Id, "") return &StoreLink{ILinkage: linkStruct} case TYPE_SHUGUANGAI: linkStruct := NewShuguangAi(aCRpc, participant.Name, participant.Id) From d4e18c8796cf24670933dbaeda66728f5f2842a0 Mon Sep 17 00:00:00 2001 From: qiwang <1364512070@qq.com> Date: Mon, 20 May 2024 20:18:37 +0800 Subject: [PATCH 3/3] fix: add ai Random RandomStrategy Former-commit-id: bde21281fe09457b3353852e75776df92209a155 --- .../logic/schedule/schedulesubmitlogic.go | 1 + .../scheduler/schedulers/aiScheduler.go | 48 +++++++------- api/internal/scheduler/strategy/random.go | 64 +++++++++++++++++-- .../scheduler/strategy/test/strategy_test.go | 2 +- 4 files changed, 86 insertions(+), 29 deletions(-) diff --git a/api/internal/logic/schedule/schedulesubmitlogic.go b/api/internal/logic/schedule/schedulesubmitlogic.go index 98f5f916..548a06eb 100644 --- a/api/internal/logic/schedule/schedulesubmitlogic.go +++ b/api/internal/logic/schedule/schedulesubmitlogic.go @@ -44,6 +44,7 @@ func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *type Params: req.AiOption.Params, Envs: req.AiOption.Envs, Cmd: req.AiOption.Cmd, + ClusterIds: req.AiOption.AiClusterIds, } aiSchdl, err := schedulers.NewAiScheduler(l.ctx, "", l.svcCtx.Scheduler, opt) if err != nil { diff --git a/api/internal/scheduler/schedulers/aiScheduler.go b/api/internal/scheduler/schedulers/aiScheduler.go index 5c8daacc..d0a5e485 100644 --- a/api/internal/scheduler/schedulers/aiScheduler.go +++ b/api/internal/scheduler/schedulers/aiScheduler.go @@ -25,7 +25,6 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy/param" "gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" @@ -71,44 +70,45 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) { return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: as.option.ClusterIds[0], Replicas: 1}}, nil } - resources, err := as.findClustersWithResources() + /* resources, err := as.findClustersWithResources() - if err != nil { - return nil, err - } - if len(resources) == 0 { - return nil, errors.New("no cluster has resources") - } + if err != nil { + return nil, err + } + if len(resources) == 0 { + return nil, errors.New("no cluster has resources") + } - if len(resources) == 1 { - var cluster strategy.AssignedCluster - cluster.ClusterId = resources[0].ClusterId - cluster.Replicas = 1 - return &strategy.SingleAssignment{Cluster: &cluster}, nil - } + if len(resources) == 1 { + var cluster strategy.AssignedCluster + cluster.ClusterId = resources[0].ClusterId + cluster.Replicas = 1 + return &strategy.SingleAssignment{Cluster: &cluster}, nil + } - params := ¶m.Params{Resources: resources} + params := ¶m.Params{Resources: resources}*/ switch as.option.StrategyName { case strategy.REPLICATION: var clusterIds []string - for _, resource := range resources { + /* for _, resource := range resources { clusterIds = append(clusterIds, resource.ClusterId) - } + }*/ strategy := strategy.NewReplicationStrategy(clusterIds, 1) return strategy, nil - case strategy.RESOURCES_PRICING: - strategy := strategy.NewPricingStrategy(¶m.ResourcePricingParams{Params: params, Replicas: 1}) - return strategy, nil - case strategy.DYNAMIC_RESOURCES: - strategy := strategy.NewDynamicResourcesStrategy(params.Resources, as.option, 1) - return strategy, nil + /* case strategy.RESOURCES_PRICING: + strategy := strategy.NewPricingStrategy(¶m.ResourcePricingParams{Params: params, Replicas: 1}) + return strategy, nil + case strategy.DYNAMIC_RESOURCES: + strategy := strategy.NewDynamicResourcesStrategy(params.Resources, as.option, 1) + return strategy, nil*/ case strategy.STATIC_WEIGHT: //todo resources should match cluster StaticWeightMap strategy := strategy.NewStaticWeightStrategy(as.option.ClusterToStaticWeight, as.option.Replica) return strategy, nil case strategy.RANDOM: - + strategy := strategy.NewRandomStrategy(as.option.ClusterIds, as.option.Replica) + return strategy, nil } return nil, errors.New("no strategy has been chosen") diff --git a/api/internal/scheduler/strategy/random.go b/api/internal/scheduler/strategy/random.go index b030bafd..315c0b3f 100644 --- a/api/internal/scheduler/strategy/random.go +++ b/api/internal/scheduler/strategy/random.go @@ -1,10 +1,66 @@ package strategy -/*type RandomStrategy struct { +import ( + "github.com/pkg/errors" + "math/rand" +) + +type RandomStrategy struct { clusterIds []string replicas int32 } -func NewRandomStrategy(clusterIds []string, replicas int32) *NewRandomStrategy { - return &RandomStrategy{clusterIds: clusterIds, replicas: replicas} -}*/ +func NewRandomStrategy(clusterIds []string, replicas int32) *RandomStrategy { + return &RandomStrategy{clusterIds: clusterIds, + replicas: replicas, + } +} + +func (s *RandomStrategy) Schedule() ([]*AssignedCluster, error) { + + if s.replicas < 1 { + return nil, errors.New("replicas must be greater than 0") + } + + if len(s.clusterIds) < 1 { + return nil, errors.New("cluster must be greater than 0") + } + + if len(s.clusterIds) == 0 || s.clusterIds == nil { + return nil, errors.New("weight must be set") + } + + // 创建一个切片来保存每个部分的数量 + parts := make([]int32, len(s.clusterIds)) + + // 首先将每个部分都分配至少一个副本 + for i := range parts { + parts[i] = 1 + s.replicas-- + } + + // 剩余要分配的副本数 + remaining := s.replicas + + // 随机分配剩余的副本 + for remaining > 0 { + // 随机选择一个部分(索引从0到numParts-1) + partIndex := rand.Intn(len(s.clusterIds)) + + // 如果该部分加上一个副本后不会超过总数,则分配一个副本 + if parts[partIndex]+1 <= s.replicas { + parts[partIndex]++ + remaining-- + } + } + + var results []*AssignedCluster + if len(s.clusterIds) == len(parts) { + for i, key := range s.clusterIds { + cluster := &AssignedCluster{ClusterId: key, Replicas: parts[i]} + results = append(results, cluster) + } + } + return results, nil + +} diff --git a/api/internal/scheduler/strategy/test/strategy_test.go b/api/internal/scheduler/strategy/test/strategy_test.go index 8802ffa2..de36b7be 100644 --- a/api/internal/scheduler/strategy/test/strategy_test.go +++ b/api/internal/scheduler/strategy/test/strategy_test.go @@ -202,7 +202,7 @@ func splitRandomParts(total int) []int { func TestNumRandom(t *testing.T) { total := 10 // 假设副本数是10 - numParts := 3 // 假设要分成5个集群 + numParts := 2 // 假设要分成5个集群 parts, err := splitIntoParts(total, numParts) if err != nil {