From 5b0fe44f51860812d9612db646ef9292b43882bf Mon Sep 17 00:00:00 2001 From: tzwang Date: Thu, 11 Apr 2024 17:31:46 +0800 Subject: [PATCH] updated schedule clusterId bugs Former-commit-id: fc08b0aaed230388b70c710f2a77cb91695a7140 --- .../weightDistributing/weightDistributing.go | 3 +- .../scheduler/schedulers/aiScheduler.go | 57 ++++++++++++++----- .../scheduler/service/collector/collector.go | 24 ++++---- .../scheduler/strategy/dynamicResources.go | 8 +-- .../scheduler/strategy/param/replication.go | 23 -------- .../strategy/param/resourcePricing.go | 4 +- .../scheduler/strategy/replication.go | 24 ++++---- .../scheduler/strategy/resourcePricing.go | 3 +- .../scheduler/strategy/staticWeight.go | 4 +- api/internal/scheduler/strategy/strategy.go | 5 +- .../scheduler/strategy/test/strategy_test.go | 20 ++++--- api/internal/storeLink/octopus.go | 10 ++-- api/internal/storeLink/shuguangai.go | 30 +++++----- 13 files changed, 110 insertions(+), 105 deletions(-) delete mode 100644 api/internal/scheduler/strategy/param/replication.go diff --git a/api/internal/scheduler/algorithm/weightDistributing/weightDistributing.go b/api/internal/scheduler/algorithm/weightDistributing/weightDistributing.go index 0124fa8b..83c15723 100644 --- a/api/internal/scheduler/algorithm/weightDistributing/weightDistributing.go +++ b/api/internal/scheduler/algorithm/weightDistributing/weightDistributing.go @@ -5,9 +5,8 @@ import ( ) type Weight struct { - Id int64 + Id string Weight int32 - Name string Replica int32 } diff --git a/api/internal/scheduler/schedulers/aiScheduler.go b/api/internal/scheduler/schedulers/aiScheduler.go index 58a4b614..38a04a9d 100644 --- a/api/internal/scheduler/schedulers/aiScheduler.go +++ b/api/internal/scheduler/schedulers/aiScheduler.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "gitlink.org.cn/JointCloud/pcm-ac/hpcAC" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" @@ -28,7 +29,6 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "gitlink.org.cn/JointCloud/pcm-octopus/octopus" - "strconv" "sync" ) @@ -65,7 +65,7 @@ func (as *AiScheduler) GetNewStructForDb(task *response.TaskInfo, resource strin func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) { if as.option.AiClusterId != "" { // TODO database operation Find - return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ParticipantId: 0, Name: "", Replicas: 1}}, nil + return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: "", Replicas: 1}}, nil } resources, err := as.findClustersWithResources() @@ -79,8 +79,7 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) { if len(resources) == 1 { var cluster strategy.AssignedCluster - cluster.ParticipantId = resources[0].ParticipantId - cluster.Name = resources[0].Name + cluster.ClusterId = resources[0].ClusterId cluster.Replicas = 1 return &strategy.SingleAssignment{Cluster: &cluster}, nil } @@ -89,7 +88,11 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) { switch as.option.StrategyName { case strategy.REPLICATION: - strategy := strategy.NewReplicationStrategy(¶m.ReplicationParams{Params: params, Replicas: 1}) + var clusterIds []string + for _, resource := range resources { + clusterIds = append(clusterIds, resource.ClusterId) + } + strategy := strategy.NewReplicationStrategy(clusterIds, 1) return strategy, nil case strategy.RESOURCES_PRICING: strategy := strategy.NewPricingStrategy(¶m.ResourcePricingParams{Params: params, Replicas: 1}) @@ -111,32 +114,46 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa return nil, errors.New("clusters is nil") } + for i := len(clusters) - 1; i >= 0; i-- { + if clusters[i].Replicas == 0 { + clusters = append(clusters[:i], clusters[i+1:]...) + } + } + + if len(clusters) == 0 { + return nil, errors.New("clusters is nil") + } + var wg sync.WaitGroup var results []*AiResult - var errs []error + var errs []interface{} var ch = make(chan *AiResult, len(clusters)) - var errCh = make(chan error, len(clusters)) + var errCh = make(chan interface{}, len(clusters)) executorMap := *as.AiExecutor for _, cluster := range clusters { c := cluster - if cluster.Replicas == 0 { - continue - } wg.Add(1) go func() { opt, _ := cloneAiOption(as.option) - resp, err := executorMap[c.Name].Execute(as.ctx, opt) + resp, err := executorMap[c.ClusterId].Execute(as.ctx, opt) if err != nil { - errCh <- err + e := struct { + err error + clusterId string + }{ + err: err, + clusterId: c.ClusterId, + } + errCh <- e wg.Done() return } result, _ := convertType(resp) result.Replica = c.Replicas - result.ClusterId = strconv.FormatInt(c.ParticipantId, 10) + result.ClusterId = c.ClusterId ch <- result wg.Done() @@ -150,10 +167,22 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa errs = append(errs, e) } - if len(errs) != 0 { + if len(errs) != len(clusters) { return nil, errors.New("submit task failed") } + if len(errs) != 0 { + var msg string + for _, err := range errs { + e := (err).(struct { + err error + clusterId string + }) + msg += fmt.Sprintf("clusterId: %v , error: %v \n", e.clusterId, e.err.Error()) + } + return nil, errors.New(msg) + } + for s := range ch { // TODO: database operation results = append(results, s) diff --git a/api/internal/scheduler/service/collector/collector.go b/api/internal/scheduler/service/collector/collector.go index 39a05e5e..a20b1d36 100644 --- a/api/internal/scheduler/service/collector/collector.go +++ b/api/internal/scheduler/service/collector/collector.go @@ -9,18 +9,18 @@ type AiCollector interface { } type ResourceStats struct { - ParticipantId int64 - Name string - CpuCoreAvail int64 - CpuCoreTotal int64 - MemAvail float64 - MemTotal float64 - DiskAvail float64 - DiskTotal float64 - GpuAvail int64 - CardsAvail []*Card - CpuCoreHours float64 - Balance float64 + ClusterId string + Name string + CpuCoreAvail int64 + CpuCoreTotal int64 + MemAvail float64 + MemTotal float64 + DiskAvail float64 + DiskTotal float64 + GpuAvail int64 + CardsAvail []*Card + CpuCoreHours float64 + Balance float64 } type Card struct { diff --git a/api/internal/scheduler/strategy/dynamicResources.go b/api/internal/scheduler/strategy/dynamicResources.go index cb1d4922..12a2172d 100644 --- a/api/internal/scheduler/strategy/dynamicResources.go +++ b/api/internal/scheduler/strategy/dynamicResources.go @@ -33,15 +33,14 @@ func (ps *DynamicResourcesStrategy) Schedule() ([]*AssignedCluster, error) { for _, res := range ps.resources { if opt.ResourceType == "cpu" { if res.CpuCoreHours <= 0 { - cluster := &AssignedCluster{ParticipantId: res.ParticipantId, Name: res.Name, Replicas: ps.replicas} + cluster := &AssignedCluster{ClusterId: res.ClusterId, Replicas: ps.replicas} results = append(results, cluster) return results, nil } if res.CpuCoreHours > maxCpuCoreHoursAvailable { maxCpuCoreHoursAvailable = res.CpuCoreHours - assignedCluster.Name = res.Name - assignedCluster.ParticipantId = res.ParticipantId + assignedCluster.ClusterId = res.ClusterId assignedCluster.Replicas = ps.replicas } } @@ -56,8 +55,7 @@ func (ps *DynamicResourcesStrategy) Schedule() ([]*AssignedCluster, error) { } if maxCurrentCardHours > maxCardHoursAvailable { maxCardHoursAvailable = maxCurrentCardHours - assignedCluster.Name = res.Name - assignedCluster.ParticipantId = res.ParticipantId + assignedCluster.ClusterId = res.ClusterId assignedCluster.Replicas = ps.replicas } } diff --git a/api/internal/scheduler/strategy/param/replication.go b/api/internal/scheduler/strategy/param/replication.go deleted file mode 100644 index 6e45916e..00000000 --- a/api/internal/scheduler/strategy/param/replication.go +++ /dev/null @@ -1,23 +0,0 @@ -package param - -import "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/entity" - -type ReplicationParams struct { - Replicas int32 - *Params -} - -func (r *ReplicationParams) GetReplicas() int32 { - return r.Replicas -} - -func (r *ReplicationParams) GetParticipants() []*entity.Participant { - var participants []*entity.Participant - for _, resource := range r.Resources { - participants = append(participants, &entity.Participant{ - Participant_id: resource.ParticipantId, - Name: resource.Name, - }) - } - return participants -} diff --git a/api/internal/scheduler/strategy/param/resourcePricing.go b/api/internal/scheduler/strategy/param/resourcePricing.go index 6fc05819..4c47e7cc 100644 --- a/api/internal/scheduler/strategy/param/resourcePricing.go +++ b/api/internal/scheduler/strategy/param/resourcePricing.go @@ -2,6 +2,7 @@ package param import ( "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing" + "strconv" ) type ResourcePricingParams struct { @@ -21,8 +22,9 @@ func (r *ResourcePricingParams) GetTask() *providerPricing.Task { func (r *ResourcePricingParams) GetProviders() []*providerPricing.Provider { var providerList []*providerPricing.Provider for _, resource := range r.Resources { + id, _ := strconv.ParseInt(resource.ClusterId, 10, 64) provider := providerPricing.NewProvider( - resource.ParticipantId, + id, float64(resource.CpuCoreAvail), resource.MemAvail, resource.DiskAvail, 0.0, 0.0, 0.0) diff --git a/api/internal/scheduler/strategy/replication.go b/api/internal/scheduler/strategy/replication.go index d12c5857..0b4c7d99 100644 --- a/api/internal/scheduler/strategy/replication.go +++ b/api/internal/scheduler/strategy/replication.go @@ -2,33 +2,31 @@ package strategy import ( "errors" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/entity" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy/param" ) type ReplicationStrategy struct { - replicas int32 - participants []*entity.Participant + replicas int32 + clusterIds []string } -func NewReplicationStrategy(params *param.ReplicationParams) *ReplicationStrategy { - return &ReplicationStrategy{replicas: params.GetReplicas(), - participants: params.GetParticipants(), +func NewReplicationStrategy(clusterIds []string, replicas int32) *ReplicationStrategy { + return &ReplicationStrategy{clusterIds: clusterIds, + replicas: replicas, } } -func (ps *ReplicationStrategy) Schedule() ([]*AssignedCluster, error) { - if ps.replicas < 1 { +func (r *ReplicationStrategy) Schedule() ([]*AssignedCluster, error) { + if r.replicas < 1 { return nil, errors.New("replicas must be greater than 0") } - if ps.participants == nil { - return nil, errors.New("participantId must be set") + if len(r.clusterIds) == 0 { + return nil, errors.New("clusterIds must be set") } var results []*AssignedCluster - for _, p := range ps.participants { - cluster := &AssignedCluster{ParticipantId: p.Participant_id, Name: p.Name, Replicas: ps.replicas} + for _, c := range r.clusterIds { + cluster := &AssignedCluster{ClusterId: c, Replicas: r.replicas} results = append(results, cluster) } return results, nil diff --git a/api/internal/scheduler/strategy/resourcePricing.go b/api/internal/scheduler/strategy/resourcePricing.go index df931d82..32d3de6b 100644 --- a/api/internal/scheduler/strategy/resourcePricing.go +++ b/api/internal/scheduler/strategy/resourcePricing.go @@ -18,6 +18,7 @@ import ( "errors" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy/param" + "strconv" ) type PricingStrategy struct { @@ -154,7 +155,7 @@ func (ps *PricingStrategy) Schedule() ([]*AssignedCluster, error) { if e == 0 { continue } - cluster := &AssignedCluster{ParticipantId: ps.ProviderList[i].Pid, Replicas: int32(e)} + cluster := &AssignedCluster{ClusterId: strconv.FormatInt(ps.ProviderList[i].Pid, 10), Replicas: int32(e)} results = append(results, cluster) } diff --git a/api/internal/scheduler/strategy/staticWeight.go b/api/internal/scheduler/strategy/staticWeight.go index 2172bec3..8b3108e9 100644 --- a/api/internal/scheduler/strategy/staticWeight.go +++ b/api/internal/scheduler/strategy/staticWeight.go @@ -29,7 +29,7 @@ func (s *StaticWeightStrategy) Schedule() ([]*AssignedCluster, error) { weights := make([]*weightDistributing.Weight, 0) for k, v := range s.staticWeightMap { weight := &weightDistributing.Weight{ - Name: k, + Id: k, Weight: v, } weights = append(weights, weight) @@ -39,7 +39,7 @@ func (s *StaticWeightStrategy) Schedule() ([]*AssignedCluster, error) { var results []*AssignedCluster for _, weight := range weights { - cluster := &AssignedCluster{ParticipantId: weight.Id, Name: weight.Name, Replicas: weight.Replica} + cluster := &AssignedCluster{ClusterId: weight.Id, Replicas: weight.Replica} results = append(results, cluster) } diff --git a/api/internal/scheduler/strategy/strategy.go b/api/internal/scheduler/strategy/strategy.go index f5c06d64..1bec626a 100644 --- a/api/internal/scheduler/strategy/strategy.go +++ b/api/internal/scheduler/strategy/strategy.go @@ -18,9 +18,8 @@ type Strategy interface { } type AssignedCluster struct { - ParticipantId int64 - Name string - Replicas int32 + ClusterId string + Replicas int32 } func GetStrategyNames() []string { diff --git a/api/internal/scheduler/strategy/test/strategy_test.go b/api/internal/scheduler/strategy/test/strategy_test.go index eb0f59ad..376e93c9 100644 --- a/api/internal/scheduler/strategy/test/strategy_test.go +++ b/api/internal/scheduler/strategy/test/strategy_test.go @@ -5,7 +5,6 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/entity" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy/param" "testing" ) @@ -17,15 +16,15 @@ func TestReplication(t *testing.T) { } rsc := []*collector.ResourceStats{ { - ParticipantId: 1, - Name: "test1", + ClusterId: "1", + Name: "test1", }, { - ParticipantId: 1, - Name: "test2"}, + ClusterId: "2", + Name: "test2"}, { - ParticipantId: 1, - Name: "test3"}, + ClusterId: "3", + Name: "test3"}, } tests := []struct { name string @@ -47,8 +46,11 @@ func TestReplication(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - params := ¶m.Params{Resources: rsc} - repl := strategy.NewReplicationStrategy(¶m.ReplicationParams{Params: params, Replicas: tt.replica}) + var clusterIds []string + for _, stats := range rsc { + clusterIds = append(clusterIds, stats.ClusterId) + } + repl := strategy.NewReplicationStrategy(clusterIds, 0) schedule, err := repl.Schedule() if err != nil { return diff --git a/api/internal/storeLink/octopus.go b/api/internal/storeLink/octopus.go index 18d6c3e1..b643c1d6 100644 --- a/api/internal/storeLink/octopus.go +++ b/api/internal/storeLink/octopus.go @@ -283,11 +283,11 @@ func (o *OctopusLink) GetResourceStats(ctx context.Context) (*collector.Resource } resourceStats := &collector.ResourceStats{ - ParticipantId: o.participantId, - Name: o.platform, - Balance: balance, - CardsAvail: cards, - CpuCoreHours: cpuHours, + ClusterId: strconv.FormatInt(o.participantId, 10), + Name: o.platform, + Balance: balance, + CardsAvail: cards, + CpuCoreHours: cpuHours, } return resourceStats, nil diff --git a/api/internal/storeLink/shuguangai.go b/api/internal/storeLink/shuguangai.go index fd9c6245..db89cf72 100644 --- a/api/internal/storeLink/shuguangai.go +++ b/api/internal/storeLink/shuguangai.go @@ -284,14 +284,14 @@ func (s *ShuguangAi) GetResourceStats(ctx context.Context) (*collector.ResourceS totalDcu := limitResp.Data.AccountMaxDcu //disk - //diskReq := &hpcAC.ParaStorQuotaReq{} - //diskResp, err := s.aCRpc.ParaStorQuota(ctx, diskReq) - //if err != nil { - // return nil, err - //} - // - //totalDisk := common.RoundFloat(diskResp.Data[0].Threshold*KB*KB*KB, 3) - //availDisk := common.RoundFloat((diskResp.Data[0].Threshold-diskResp.Data[0].Usage)*KB*KB*KB, 3) + diskReq := &hpcAC.ParaStorQuotaReq{} + diskResp, err := s.aCRpc.ParaStorQuota(ctx, diskReq) + if err != nil { + return nil, err + } + + totalDisk := common.RoundFloat(diskResp.Data[0].Threshold*KB*KB*KB, 3) + availDisk := common.RoundFloat((diskResp.Data[0].Threshold-diskResp.Data[0].Usage)*KB*KB*KB, 3) //memory nodeResp, err := s.aCRpc.GetNodeResources(ctx, nil) @@ -344,13 +344,13 @@ func (s *ShuguangAi) GetResourceStats(ctx context.Context) (*collector.ResourceS } cards = append(cards, dcu) resourceStats := &collector.ResourceStats{ - ParticipantId: s.participantId, - Name: s.platform, - Balance: balance, - CpuCoreTotal: totalCpu, - CpuCoreAvail: CpuCoreAvail, - //DiskTotal: totalDisk, - //DiskAvail: availDisk, + ClusterId: strconv.FormatInt(s.participantId, 10), + Name: s.platform, + Balance: balance, + CpuCoreTotal: totalCpu, + CpuCoreAvail: CpuCoreAvail, + DiskTotal: totalDisk, + DiskAvail: availDisk, MemTotal: memSize, MemAvail: MemAvail, CpuCoreHours: cpuHours,