updated schedule clusterId bugs

Former-commit-id: fc08b0aaed230388b70c710f2a77cb91695a7140
This commit is contained in:
tzwang 2024-04-11 17:31:46 +08:00
parent 84576a9b7d
commit 5b0fe44f51
13 changed files with 110 additions and 105 deletions

View File

@ -5,9 +5,8 @@ import (
) )
type Weight struct { type Weight struct {
Id int64 Id string
Weight int32 Weight int32
Name string
Replica int32 Replica int32
} }

View File

@ -18,6 +18,7 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt"
"gitlink.org.cn/JointCloud/pcm-ac/hpcAC" "gitlink.org.cn/JointCloud/pcm-ac/hpcAC"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
@ -28,7 +29,6 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gitlink.org.cn/JointCloud/pcm-octopus/octopus" "gitlink.org.cn/JointCloud/pcm-octopus/octopus"
"strconv"
"sync" "sync"
) )
@ -65,7 +65,7 @@ func (as *AiScheduler) GetNewStructForDb(task *response.TaskInfo, resource strin
func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) { func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
if as.option.AiClusterId != "" { if as.option.AiClusterId != "" {
// TODO database operation Find // TODO database operation Find
return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ParticipantId: 0, Name: "", Replicas: 1}}, nil return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: "", Replicas: 1}}, nil
} }
resources, err := as.findClustersWithResources() resources, err := as.findClustersWithResources()
@ -79,8 +79,7 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
if len(resources) == 1 { if len(resources) == 1 {
var cluster strategy.AssignedCluster var cluster strategy.AssignedCluster
cluster.ParticipantId = resources[0].ParticipantId cluster.ClusterId = resources[0].ClusterId
cluster.Name = resources[0].Name
cluster.Replicas = 1 cluster.Replicas = 1
return &strategy.SingleAssignment{Cluster: &cluster}, nil return &strategy.SingleAssignment{Cluster: &cluster}, nil
} }
@ -89,7 +88,11 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
switch as.option.StrategyName { switch as.option.StrategyName {
case strategy.REPLICATION: case strategy.REPLICATION:
strategy := strategy.NewReplicationStrategy(&param.ReplicationParams{Params: params, Replicas: 1}) var clusterIds []string
for _, resource := range resources {
clusterIds = append(clusterIds, resource.ClusterId)
}
strategy := strategy.NewReplicationStrategy(clusterIds, 1)
return strategy, nil return strategy, nil
case strategy.RESOURCES_PRICING: case strategy.RESOURCES_PRICING:
strategy := strategy.NewPricingStrategy(&param.ResourcePricingParams{Params: params, Replicas: 1}) strategy := strategy.NewPricingStrategy(&param.ResourcePricingParams{Params: params, Replicas: 1})
@ -111,32 +114,46 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa
return nil, errors.New("clusters is nil") return nil, errors.New("clusters is nil")
} }
for i := len(clusters) - 1; i >= 0; i-- {
if clusters[i].Replicas == 0 {
clusters = append(clusters[:i], clusters[i+1:]...)
}
}
if len(clusters) == 0 {
return nil, errors.New("clusters is nil")
}
var wg sync.WaitGroup var wg sync.WaitGroup
var results []*AiResult var results []*AiResult
var errs []error var errs []interface{}
var ch = make(chan *AiResult, len(clusters)) var ch = make(chan *AiResult, len(clusters))
var errCh = make(chan error, len(clusters)) var errCh = make(chan interface{}, len(clusters))
executorMap := *as.AiExecutor executorMap := *as.AiExecutor
for _, cluster := range clusters { for _, cluster := range clusters {
c := cluster c := cluster
if cluster.Replicas == 0 {
continue
}
wg.Add(1) wg.Add(1)
go func() { go func() {
opt, _ := cloneAiOption(as.option) opt, _ := cloneAiOption(as.option)
resp, err := executorMap[c.Name].Execute(as.ctx, opt) resp, err := executorMap[c.ClusterId].Execute(as.ctx, opt)
if err != nil { if err != nil {
errCh <- err e := struct {
err error
clusterId string
}{
err: err,
clusterId: c.ClusterId,
}
errCh <- e
wg.Done() wg.Done()
return return
} }
result, _ := convertType(resp) result, _ := convertType(resp)
result.Replica = c.Replicas result.Replica = c.Replicas
result.ClusterId = strconv.FormatInt(c.ParticipantId, 10) result.ClusterId = c.ClusterId
ch <- result ch <- result
wg.Done() wg.Done()
@ -150,10 +167,22 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa
errs = append(errs, e) errs = append(errs, e)
} }
if len(errs) != 0 { if len(errs) != len(clusters) {
return nil, errors.New("submit task failed") return nil, errors.New("submit task failed")
} }
if len(errs) != 0 {
var msg string
for _, err := range errs {
e := (err).(struct {
err error
clusterId string
})
msg += fmt.Sprintf("clusterId: %v , error: %v \n", e.clusterId, e.err.Error())
}
return nil, errors.New(msg)
}
for s := range ch { for s := range ch {
// TODO: database operation // TODO: database operation
results = append(results, s) results = append(results, s)

View File

@ -9,7 +9,7 @@ type AiCollector interface {
} }
type ResourceStats struct { type ResourceStats struct {
ParticipantId int64 ClusterId string
Name string Name string
CpuCoreAvail int64 CpuCoreAvail int64
CpuCoreTotal int64 CpuCoreTotal int64

View File

@ -33,15 +33,14 @@ func (ps *DynamicResourcesStrategy) Schedule() ([]*AssignedCluster, error) {
for _, res := range ps.resources { for _, res := range ps.resources {
if opt.ResourceType == "cpu" { if opt.ResourceType == "cpu" {
if res.CpuCoreHours <= 0 { if res.CpuCoreHours <= 0 {
cluster := &AssignedCluster{ParticipantId: res.ParticipantId, Name: res.Name, Replicas: ps.replicas} cluster := &AssignedCluster{ClusterId: res.ClusterId, Replicas: ps.replicas}
results = append(results, cluster) results = append(results, cluster)
return results, nil return results, nil
} }
if res.CpuCoreHours > maxCpuCoreHoursAvailable { if res.CpuCoreHours > maxCpuCoreHoursAvailable {
maxCpuCoreHoursAvailable = res.CpuCoreHours maxCpuCoreHoursAvailable = res.CpuCoreHours
assignedCluster.Name = res.Name assignedCluster.ClusterId = res.ClusterId
assignedCluster.ParticipantId = res.ParticipantId
assignedCluster.Replicas = ps.replicas assignedCluster.Replicas = ps.replicas
} }
} }
@ -56,8 +55,7 @@ func (ps *DynamicResourcesStrategy) Schedule() ([]*AssignedCluster, error) {
} }
if maxCurrentCardHours > maxCardHoursAvailable { if maxCurrentCardHours > maxCardHoursAvailable {
maxCardHoursAvailable = maxCurrentCardHours maxCardHoursAvailable = maxCurrentCardHours
assignedCluster.Name = res.Name assignedCluster.ClusterId = res.ClusterId
assignedCluster.ParticipantId = res.ParticipantId
assignedCluster.Replicas = ps.replicas assignedCluster.Replicas = ps.replicas
} }
} }

View File

@ -1,23 +0,0 @@
package param
import "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/entity"
type ReplicationParams struct {
Replicas int32
*Params
}
func (r *ReplicationParams) GetReplicas() int32 {
return r.Replicas
}
func (r *ReplicationParams) GetParticipants() []*entity.Participant {
var participants []*entity.Participant
for _, resource := range r.Resources {
participants = append(participants, &entity.Participant{
Participant_id: resource.ParticipantId,
Name: resource.Name,
})
}
return participants
}

View File

@ -2,6 +2,7 @@ package param
import ( import (
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing"
"strconv"
) )
type ResourcePricingParams struct { type ResourcePricingParams struct {
@ -21,8 +22,9 @@ func (r *ResourcePricingParams) GetTask() *providerPricing.Task {
func (r *ResourcePricingParams) GetProviders() []*providerPricing.Provider { func (r *ResourcePricingParams) GetProviders() []*providerPricing.Provider {
var providerList []*providerPricing.Provider var providerList []*providerPricing.Provider
for _, resource := range r.Resources { for _, resource := range r.Resources {
id, _ := strconv.ParseInt(resource.ClusterId, 10, 64)
provider := providerPricing.NewProvider( provider := providerPricing.NewProvider(
resource.ParticipantId, id,
float64(resource.CpuCoreAvail), float64(resource.CpuCoreAvail),
resource.MemAvail, resource.MemAvail,
resource.DiskAvail, 0.0, 0.0, 0.0) resource.DiskAvail, 0.0, 0.0, 0.0)

View File

@ -2,33 +2,31 @@ package strategy
import ( import (
"errors" "errors"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/entity"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy/param"
) )
type ReplicationStrategy struct { type ReplicationStrategy struct {
replicas int32 replicas int32
participants []*entity.Participant clusterIds []string
} }
func NewReplicationStrategy(params *param.ReplicationParams) *ReplicationStrategy { func NewReplicationStrategy(clusterIds []string, replicas int32) *ReplicationStrategy {
return &ReplicationStrategy{replicas: params.GetReplicas(), return &ReplicationStrategy{clusterIds: clusterIds,
participants: params.GetParticipants(), replicas: replicas,
} }
} }
func (ps *ReplicationStrategy) Schedule() ([]*AssignedCluster, error) { func (r *ReplicationStrategy) Schedule() ([]*AssignedCluster, error) {
if ps.replicas < 1 { if r.replicas < 1 {
return nil, errors.New("replicas must be greater than 0") return nil, errors.New("replicas must be greater than 0")
} }
if ps.participants == nil { if len(r.clusterIds) == 0 {
return nil, errors.New("participantId must be set") return nil, errors.New("clusterIds must be set")
} }
var results []*AssignedCluster var results []*AssignedCluster
for _, p := range ps.participants { for _, c := range r.clusterIds {
cluster := &AssignedCluster{ParticipantId: p.Participant_id, Name: p.Name, Replicas: ps.replicas} cluster := &AssignedCluster{ClusterId: c, Replicas: r.replicas}
results = append(results, cluster) results = append(results, cluster)
} }
return results, nil return results, nil

View File

@ -18,6 +18,7 @@ import (
"errors" "errors"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy/param" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy/param"
"strconv"
) )
type PricingStrategy struct { type PricingStrategy struct {
@ -154,7 +155,7 @@ func (ps *PricingStrategy) Schedule() ([]*AssignedCluster, error) {
if e == 0 { if e == 0 {
continue continue
} }
cluster := &AssignedCluster{ParticipantId: ps.ProviderList[i].Pid, Replicas: int32(e)} cluster := &AssignedCluster{ClusterId: strconv.FormatInt(ps.ProviderList[i].Pid, 10), Replicas: int32(e)}
results = append(results, cluster) results = append(results, cluster)
} }

View File

@ -29,7 +29,7 @@ func (s *StaticWeightStrategy) Schedule() ([]*AssignedCluster, error) {
weights := make([]*weightDistributing.Weight, 0) weights := make([]*weightDistributing.Weight, 0)
for k, v := range s.staticWeightMap { for k, v := range s.staticWeightMap {
weight := &weightDistributing.Weight{ weight := &weightDistributing.Weight{
Name: k, Id: k,
Weight: v, Weight: v,
} }
weights = append(weights, weight) weights = append(weights, weight)
@ -39,7 +39,7 @@ func (s *StaticWeightStrategy) Schedule() ([]*AssignedCluster, error) {
var results []*AssignedCluster var results []*AssignedCluster
for _, weight := range weights { for _, weight := range weights {
cluster := &AssignedCluster{ParticipantId: weight.Id, Name: weight.Name, Replicas: weight.Replica} cluster := &AssignedCluster{ClusterId: weight.Id, Replicas: weight.Replica}
results = append(results, cluster) results = append(results, cluster)
} }

View File

@ -18,8 +18,7 @@ type Strategy interface {
} }
type AssignedCluster struct { type AssignedCluster struct {
ParticipantId int64 ClusterId string
Name string
Replicas int32 Replicas int32
} }

View File

@ -5,7 +5,6 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/entity" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/entity"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy/param"
"testing" "testing"
) )
@ -17,14 +16,14 @@ func TestReplication(t *testing.T) {
} }
rsc := []*collector.ResourceStats{ rsc := []*collector.ResourceStats{
{ {
ParticipantId: 1, ClusterId: "1",
Name: "test1", Name: "test1",
}, },
{ {
ParticipantId: 1, ClusterId: "2",
Name: "test2"}, Name: "test2"},
{ {
ParticipantId: 1, ClusterId: "3",
Name: "test3"}, Name: "test3"},
} }
tests := []struct { tests := []struct {
@ -47,8 +46,11 @@ func TestReplication(t *testing.T) {
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
params := &param.Params{Resources: rsc} var clusterIds []string
repl := strategy.NewReplicationStrategy(&param.ReplicationParams{Params: params, Replicas: tt.replica}) for _, stats := range rsc {
clusterIds = append(clusterIds, stats.ClusterId)
}
repl := strategy.NewReplicationStrategy(clusterIds, 0)
schedule, err := repl.Schedule() schedule, err := repl.Schedule()
if err != nil { if err != nil {
return return

View File

@ -283,7 +283,7 @@ func (o *OctopusLink) GetResourceStats(ctx context.Context) (*collector.Resource
} }
resourceStats := &collector.ResourceStats{ resourceStats := &collector.ResourceStats{
ParticipantId: o.participantId, ClusterId: strconv.FormatInt(o.participantId, 10),
Name: o.platform, Name: o.platform,
Balance: balance, Balance: balance,
CardsAvail: cards, CardsAvail: cards,

View File

@ -284,14 +284,14 @@ func (s *ShuguangAi) GetResourceStats(ctx context.Context) (*collector.ResourceS
totalDcu := limitResp.Data.AccountMaxDcu totalDcu := limitResp.Data.AccountMaxDcu
//disk //disk
//diskReq := &hpcAC.ParaStorQuotaReq{} diskReq := &hpcAC.ParaStorQuotaReq{}
//diskResp, err := s.aCRpc.ParaStorQuota(ctx, diskReq) diskResp, err := s.aCRpc.ParaStorQuota(ctx, diskReq)
//if err != nil { if err != nil {
// return nil, err return nil, err
//} }
//
//totalDisk := common.RoundFloat(diskResp.Data[0].Threshold*KB*KB*KB, 3) totalDisk := common.RoundFloat(diskResp.Data[0].Threshold*KB*KB*KB, 3)
//availDisk := common.RoundFloat((diskResp.Data[0].Threshold-diskResp.Data[0].Usage)*KB*KB*KB, 3) availDisk := common.RoundFloat((diskResp.Data[0].Threshold-diskResp.Data[0].Usage)*KB*KB*KB, 3)
//memory //memory
nodeResp, err := s.aCRpc.GetNodeResources(ctx, nil) nodeResp, err := s.aCRpc.GetNodeResources(ctx, nil)
@ -344,13 +344,13 @@ func (s *ShuguangAi) GetResourceStats(ctx context.Context) (*collector.ResourceS
} }
cards = append(cards, dcu) cards = append(cards, dcu)
resourceStats := &collector.ResourceStats{ resourceStats := &collector.ResourceStats{
ParticipantId: s.participantId, ClusterId: strconv.FormatInt(s.participantId, 10),
Name: s.platform, Name: s.platform,
Balance: balance, Balance: balance,
CpuCoreTotal: totalCpu, CpuCoreTotal: totalCpu,
CpuCoreAvail: CpuCoreAvail, CpuCoreAvail: CpuCoreAvail,
//DiskTotal: totalDisk, DiskTotal: totalDisk,
//DiskAvail: availDisk, DiskAvail: availDisk,
MemTotal: memSize, MemTotal: memSize,
MemAvail: MemAvail, MemAvail: MemAvail,
CpuCoreHours: cpuHours, CpuCoreHours: cpuHours,