Former-commit-id: 51daa59a9496c33698931f90bd64e55c9ef94687
This commit is contained in:
tzwang 2024-05-20 20:34:25 +08:00
commit bc8d65af40
8 changed files with 304 additions and 32 deletions

View File

@ -45,6 +45,7 @@ func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *type
Params: req.AiOption.Params,
Envs: req.AiOption.Envs,
Cmd: req.AiOption.Cmd,
ClusterIds: req.AiOption.AiClusterIds,
}
aiSchdl, err := schedulers.NewAiScheduler(l.ctx, "", l.svcCtx.Scheduler, opt)
if err != nil {

View File

@ -26,11 +26,11 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy/param"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice"
"gitlink.org.cn/JointCloud/pcm-octopus/octopus"
"sync"
)
@ -90,42 +90,45 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: as.option.ClusterIds[0], Replicas: 1}}, nil
}
resources, err := as.findClustersWithResources()
/* resources, err := as.findClustersWithResources()
if err != nil {
return nil, err
}
if len(resources) == 0 {
return nil, errors.New("no cluster has resources")
}
if err != nil {
return nil, err
}
if len(resources) == 0 {
return nil, errors.New("no cluster has resources")
}
if len(resources) == 1 {
var cluster strategy.AssignedCluster
cluster.ClusterId = resources[0].ClusterId
cluster.Replicas = 1
return &strategy.SingleAssignment{Cluster: &cluster}, nil
}
if len(resources) == 1 {
var cluster strategy.AssignedCluster
cluster.ClusterId = resources[0].ClusterId
cluster.Replicas = 1
return &strategy.SingleAssignment{Cluster: &cluster}, nil
}
params := &param.Params{Resources: resources}
params := &param.Params{Resources: resources}*/
switch as.option.StrategyName {
case strategy.REPLICATION:
var clusterIds []string
for _, resource := range resources {
/* for _, resource := range resources {
clusterIds = append(clusterIds, resource.ClusterId)
}
}*/
strategy := strategy.NewReplicationStrategy(clusterIds, 1)
return strategy, nil
case strategy.RESOURCES_PRICING:
strategy := strategy.NewPricingStrategy(&param.ResourcePricingParams{Params: params, Replicas: 1})
return strategy, nil
case strategy.DYNAMIC_RESOURCES:
strategy := strategy.NewDynamicResourcesStrategy(params.Resources, as.option, 1)
return strategy, nil
/* case strategy.RESOURCES_PRICING:
strategy := strategy.NewPricingStrategy(&param.ResourcePricingParams{Params: params, Replicas: 1})
return strategy, nil
case strategy.DYNAMIC_RESOURCES:
strategy := strategy.NewDynamicResourcesStrategy(params.Resources, as.option, 1)
return strategy, nil*/
case strategy.STATIC_WEIGHT:
//todo resources should match cluster StaticWeightMap
strategy := strategy.NewStaticWeightStrategy(as.option.ClusterToStaticWeight, as.option.Replica)
return strategy, nil
case strategy.RANDOM:
strategy := strategy.NewRandomStrategy(as.option.ClusterIds, as.option.Replica)
return strategy, nil
}
return nil, errors.New("no strategy has been chosen")
@ -340,6 +343,17 @@ func convertType(in interface{}) (*AiResult, error) {
result.Msg = resp.Error.Message
}
return &result, nil
case *modelartsservice.CreateTrainingJobResp:
resp := (in).(*modelartsservice.CreateTrainingJobResp)
if resp.ErrorMsg != "" {
result.Msg = resp.ErrorMsg
} else {
result.JobId = resp.Metadata.Id
}
return &result, nil
default:
return nil, errors.New("ai task response failed")

View File

@ -64,7 +64,7 @@ func InitAiClusterMap(conf *config.Config, clusters []types.ClusterInfo) (map[st
id, _ := strconv.ParseInt(c.Id, 10, 64)
modelArtsRpc := modelartsservice.NewModelArtsService(zrpc.MustNewClient(conf.ModelArtsRpcConf))
modelArtsImgRpc := imagesservice.NewImagesService(zrpc.MustNewClient(conf.ModelArtsImgRpcConf))
modelarts := storeLink.NewModelArtsLink(modelArtsRpc, modelArtsImgRpc, c.Nickname, id)
modelarts := storeLink.NewModelArtsLink(modelArtsRpc, modelArtsImgRpc, c.Name, id, c.Nickname)
collectorMap[c.Id] = modelarts
executorMap[c.Id] = modelarts
case SHUGUANGAI:

View File

@ -0,0 +1,66 @@
package strategy
import (
"github.com/pkg/errors"
"math/rand"
)
type RandomStrategy struct {
clusterIds []string
replicas int32
}
func NewRandomStrategy(clusterIds []string, replicas int32) *RandomStrategy {
return &RandomStrategy{clusterIds: clusterIds,
replicas: replicas,
}
}
func (s *RandomStrategy) Schedule() ([]*AssignedCluster, error) {
if s.replicas < 1 {
return nil, errors.New("replicas must be greater than 0")
}
if len(s.clusterIds) < 1 {
return nil, errors.New("cluster must be greater than 0")
}
if len(s.clusterIds) == 0 || s.clusterIds == nil {
return nil, errors.New("weight must be set")
}
// 创建一个切片来保存每个部分的数量
parts := make([]int32, len(s.clusterIds))
// 首先将每个部分都分配至少一个副本
for i := range parts {
parts[i] = 1
s.replicas--
}
// 剩余要分配的副本数
remaining := s.replicas
// 随机分配剩余的副本
for remaining > 0 {
// 随机选择一个部分索引从0到numParts-1
partIndex := rand.Intn(len(s.clusterIds))
// 如果该部分加上一个副本后不会超过总数,则分配一个副本
if parts[partIndex]+1 <= s.replicas {
parts[partIndex]++
remaining--
}
}
var results []*AssignedCluster
if len(s.clusterIds) == len(parts) {
for i, key := range s.clusterIds {
cluster := &AssignedCluster{ClusterId: key, Replicas: parts[i]}
results = append(results, cluster)
}
}
return results, nil
}

View File

@ -5,6 +5,7 @@ const (
RESOURCES_PRICING = "resourcesPricing"
STATIC_WEIGHT = "staticWeight"
DYNAMIC_RESOURCES = "dynamicResources"
RANDOM = "random"
DATA_LOCALITY = "dataLocality" //感知数据位置,数据调度和计算调度协同,近数据调度
ENERGY_CONSUMPTION = "energyConsumption" //根据各集群总体能耗水平调度作业,优先选择能耗低的集群调度作业
)

View File

@ -5,7 +5,9 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/entity"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy"
"math/rand"
"testing"
"time"
)
func TestReplication(t *testing.T) {
@ -106,3 +108,144 @@ func TestStaticWeight(t *testing.T) {
})
}
}
func TestRandom(t *testing.T) {
// 使用当前时间作为随机数种子,确保每次程序运行产生的随机数序列都不同
rand.Seed(time.Now().UnixNano())
/*randomNum := randInt(1, 100)
fmt.Println("Random number:", randomNum)*/
total := 5 // 假设总数是5
first, second := splitIntoTwoRandomParts(total)
fmt.Printf("第一部分的数量: %d, 第二部分的数量: %d\n", first, second)
}
// randInt 生成一个指定范围内的随机整数包括min但不包括max
func randInt(min, max int) int {
return min + rand.Intn(max-min)
}
func splitIntoTwoRandomParts(total int) (int, int) {
if total < 2 {
// 如果总数小于2则无法分成两部分
return 0, 0
}
// 生成一个随机数作为第一部分的数量(范围在[1, total-1]之间)
firstPart := rand.Intn(total-1) + 1
// 第二部分的数量就是总数减去第一部分的数量
secondPart := total - firstPart
return firstPart, secondPart
}
func splitIntoRandomParts(total int) (int, int) {
if total < 2 {
// 如果总数小于2则无法分成两部分
return 0, 0
}
// 生成一个随机数作为第一部分的数量(范围在[1, total-1]之间)
firstPart := rand.Intn(total-1) + 1
// 第二部分的数量就是总数减去第一部分的数量
secondPart := total - firstPart
return firstPart, secondPart
}
func TestRandoms(t *testing.T) {
// 使用当前时间作为随机数种子,确保每次程序运行产生的随机数序列都不同
rand.Seed(time.Now().UnixNano())
/*randomNum := randInt(1, 100)
fmt.Println("Random number:", randomNum)*/
total := 10 // 假设总数是5
parts := splitRandomParts(total)
fmt.Println("分配结果:", parts)
}
// splitIntoRandomParts 将总数total随机分成多个部分并返回这些部分的切片
func splitRandomParts(total int) []int {
if total < 2 {
// 如果总数小于2则无法分成多个部分
return []int{total}
}
// 创建一个切片来保存每个部分的数量
var parts []int
// 剩余要分配的副本数
remaining := total
// 随机决定要分成的部分数量至少2个部分
numParts := rand.Intn(total-1) + 2
// 确保每个部分至少获得1个副本
for i := 0; i < numParts-1; i++ {
// 生成一个随机数1到剩余副本数之间
// 为了避免最后一个部分太小,我们可能需要调整随机数范围
minPartSize := 1
if remaining <= numParts-i {
// 如果剩余副本数不足以让每个部分都至少获得1个则调整最小部分大小
minPartSize = remaining / (numParts - i)
if remaining%(numParts-i) > 0 {
minPartSize++
}
}
// 生成一个大于等于minPartSize且小于等于remaining的随机数
partSize := minPartSize + rand.Intn(remaining-minPartSize+1)
parts = append(parts, partSize)
remaining -= partSize
}
// 最后一个部分的数量就是剩余的副本数
parts = append(parts, remaining)
return parts
}
func TestNumRandom(t *testing.T) {
total := 10 // 假设副本数是10
numParts := 2 // 假设要分成5个集群
parts, err := splitIntoParts(total, numParts)
if err != nil {
fmt.Println("Error:", err)
return
}
fmt.Println("分配结果:", parts)
}
// splitIntoParts 将总数total随机分成numParts个部分并返回这些部分的切片
func splitIntoParts(total int, numParts int) ([]int, error) {
if total < 1 || numParts < 1 {
// 总数或部分数量不能小于1
return nil, fmt.Errorf("total and numParts must be greater than 0")
}
if numParts > total {
// 部分数量不能大于总数
return nil, fmt.Errorf("numParts cannot be greater than total")
}
// 创建一个切片来保存每个部分的数量
parts := make([]int, numParts)
// 首先将每个部分都分配至少一个副本
for i := range parts {
parts[i] = 1
total--
}
// 剩余要分配的副本数
remaining := total
// 随机分配剩余的副本
for remaining > 0 {
// 随机选择一个部分索引从0到numParts-1
partIndex := rand.Intn(numParts)
// 如果该部分加上一个副本后不会超过总数,则分配一个副本
if parts[partIndex]+1 <= total {
parts[partIndex]++
remaining--
}
}
return parts, nil
}

View File

@ -27,6 +27,10 @@ import (
"strings"
)
const (
Ascend = "Ascend"
)
type ModelArtsLink struct {
modelArtsRpc modelartsservice.ModelArtsService
modelArtsImgRpc imagesservice.ImagesService
@ -36,8 +40,8 @@ type ModelArtsLink struct {
pageSize int32
}
func NewModelArtsLink(modelArtsRpc modelartsservice.ModelArtsService, modelArtsImgRpc imagesservice.ImagesService, name string, id int64) *ModelArtsLink {
return &ModelArtsLink{modelArtsRpc: modelArtsRpc, modelArtsImgRpc: modelArtsImgRpc, platform: name, participantId: id, pageIndex: 1, pageSize: 100}
func NewModelArtsLink(modelArtsRpc modelartsservice.ModelArtsService, modelArtsImgRpc imagesservice.ImagesService, name string, id int64, nickname string) *ModelArtsLink {
return &ModelArtsLink{modelArtsRpc: modelArtsRpc, modelArtsImgRpc: modelArtsImgRpc, platform: nickname, participantId: id, pageIndex: 0, pageSize: 50}
}
func (m *ModelArtsLink) UploadImage(ctx context.Context, path string) (interface{}, error) {
@ -87,6 +91,7 @@ func (m *ModelArtsLink) SubmitTask(ctx context.Context, imageId string, cmd stri
WorkspaceId: "0",
},
Algorithm: &modelarts.Algorithms{
Id: algorithmId,
Engine: &modelarts.EngineCreateTraining{
ImageUrl: imageId,
},
@ -184,7 +189,9 @@ func (m *ModelArtsLink) GetAlgorithms(ctx context.Context) ([]*collector.Algorit
}
func (m *ModelArtsLink) GetComputeCards(ctx context.Context) ([]string, error) {
return nil, nil
var cards []string
cards = append(cards, Ascend)
return cards, nil
}
func (m *ModelArtsLink) GetUserBalance(ctx context.Context) (float64, error) {
@ -224,6 +231,10 @@ func (m *ModelArtsLink) GenerateSubmitParams(ctx context.Context, option *option
if err != nil {
return err
}
err = m.generateAlgorithmId(ctx, option)
if err != nil {
return err
}
err = m.generateImageId(option)
if err != nil {
return err
@ -244,10 +255,7 @@ func (m *ModelArtsLink) GenerateSubmitParams(ctx context.Context, option *option
}
func (m *ModelArtsLink) generateResourceId(ctx context.Context, option *option.AiOption) error {
_, err := m.QuerySpecs(ctx)
if err != nil {
return err
}
option.ResourceId = "modelarts.kat1.xlarge"
return nil
}
@ -270,3 +278,42 @@ func (m *ModelArtsLink) generateParams(option *option.AiOption) error {
return nil
}
func (m *ModelArtsLink) generateAlgorithmId(ctx context.Context, option *option.AiOption) error {
req := &modelarts.ListAlgorithmsReq{
Platform: m.platform,
Offset: m.pageIndex,
Limit: m.pageSize,
}
resp, err := m.modelArtsRpc.ListAlgorithms(ctx, req)
if err != nil {
return err
}
if resp.ErrorMsg != "" {
return errors.New("failed to get algorithmId")
}
for _, algorithm := range resp.Items {
engVersion := algorithm.JobConfig.Engine.EngineVersion
if strings.Contains(engVersion, option.TaskType) {
ns := strings.Split(algorithm.Metadata.Name, DASH)
if ns[0] != option.TaskType {
continue
}
if ns[1] != option.DatasetsName {
continue
}
if ns[2] != option.AlgorithmName {
continue
}
option.AlgorithmId = algorithm.Metadata.Id
return nil
}
}
if option.AlgorithmId == "" {
return errors.New("Algorithm does not exist")
}
return errors.New("failed to get AlgorithmId")
}

View File

@ -99,7 +99,7 @@ func NewStoreLink(octopusRpc octopusclient.Octopus, modelArtsRpc modelartsservic
linkStruct := NewOctopusLink(octopusRpc, participant.Name, participant.Id)
return &StoreLink{ILinkage: linkStruct}
case TYPE_MODELARTS:
linkStruct := NewModelArtsLink(modelArtsRpc, modelArtsImgRpc, participant.Name, participant.Id)
linkStruct := NewModelArtsLink(modelArtsRpc, modelArtsImgRpc, participant.Name, participant.Id, "")
return &StoreLink{ILinkage: linkStruct}
case TYPE_SHUGUANGAI:
linkStruct := NewShuguangAi(aCRpc, participant.Name, participant.Id)