Merge pull request 'updated ai submit task api and fix bugs' (#107) from tzwang/pcm-coordinator:master into master

Former-commit-id: 7a3214ca2b2fa356df241394923518656359eedb
This commit is contained in:
tzwang 2024-04-11 17:36:08 +08:00
commit 5601adeffd
13 changed files with 110 additions and 105 deletions

View File

@ -5,9 +5,8 @@ import (
) )
type Weight struct { type Weight struct {
Id int64 Id string
Weight int32 Weight int32
Name string
Replica int32 Replica int32
} }

View File

@ -18,6 +18,7 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt"
"gitlink.org.cn/JointCloud/pcm-ac/hpcAC" "gitlink.org.cn/JointCloud/pcm-ac/hpcAC"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
@ -28,7 +29,6 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gitlink.org.cn/JointCloud/pcm-octopus/octopus" "gitlink.org.cn/JointCloud/pcm-octopus/octopus"
"strconv"
"sync" "sync"
) )
@ -65,7 +65,7 @@ func (as *AiScheduler) GetNewStructForDb(task *response.TaskInfo, resource strin
func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) { func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
if as.option.AiClusterId != "" { if as.option.AiClusterId != "" {
// TODO database operation Find // TODO database operation Find
return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ParticipantId: 0, Name: "", Replicas: 1}}, nil return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: "", Replicas: 1}}, nil
} }
resources, err := as.findClustersWithResources() resources, err := as.findClustersWithResources()
@ -79,8 +79,7 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
if len(resources) == 1 { if len(resources) == 1 {
var cluster strategy.AssignedCluster var cluster strategy.AssignedCluster
cluster.ParticipantId = resources[0].ParticipantId cluster.ClusterId = resources[0].ClusterId
cluster.Name = resources[0].Name
cluster.Replicas = 1 cluster.Replicas = 1
return &strategy.SingleAssignment{Cluster: &cluster}, nil return &strategy.SingleAssignment{Cluster: &cluster}, nil
} }
@ -89,7 +88,11 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
switch as.option.StrategyName { switch as.option.StrategyName {
case strategy.REPLICATION: case strategy.REPLICATION:
strategy := strategy.NewReplicationStrategy(&param.ReplicationParams{Params: params, Replicas: 1}) var clusterIds []string
for _, resource := range resources {
clusterIds = append(clusterIds, resource.ClusterId)
}
strategy := strategy.NewReplicationStrategy(clusterIds, 1)
return strategy, nil return strategy, nil
case strategy.RESOURCES_PRICING: case strategy.RESOURCES_PRICING:
strategy := strategy.NewPricingStrategy(&param.ResourcePricingParams{Params: params, Replicas: 1}) strategy := strategy.NewPricingStrategy(&param.ResourcePricingParams{Params: params, Replicas: 1})
@ -111,32 +114,46 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa
return nil, errors.New("clusters is nil") return nil, errors.New("clusters is nil")
} }
for i := len(clusters) - 1; i >= 0; i-- {
if clusters[i].Replicas == 0 {
clusters = append(clusters[:i], clusters[i+1:]...)
}
}
if len(clusters) == 0 {
return nil, errors.New("clusters is nil")
}
var wg sync.WaitGroup var wg sync.WaitGroup
var results []*AiResult var results []*AiResult
var errs []error var errs []interface{}
var ch = make(chan *AiResult, len(clusters)) var ch = make(chan *AiResult, len(clusters))
var errCh = make(chan error, len(clusters)) var errCh = make(chan interface{}, len(clusters))
executorMap := *as.AiExecutor executorMap := *as.AiExecutor
for _, cluster := range clusters { for _, cluster := range clusters {
c := cluster c := cluster
if cluster.Replicas == 0 {
continue
}
wg.Add(1) wg.Add(1)
go func() { go func() {
opt, _ := cloneAiOption(as.option) opt, _ := cloneAiOption(as.option)
resp, err := executorMap[c.Name].Execute(as.ctx, opt) resp, err := executorMap[c.ClusterId].Execute(as.ctx, opt)
if err != nil { if err != nil {
errCh <- err e := struct {
err error
clusterId string
}{
err: err,
clusterId: c.ClusterId,
}
errCh <- e
wg.Done() wg.Done()
return return
} }
result, _ := convertType(resp) result, _ := convertType(resp)
result.Replica = c.Replicas result.Replica = c.Replicas
result.ClusterId = strconv.FormatInt(c.ParticipantId, 10) result.ClusterId = c.ClusterId
ch <- result ch <- result
wg.Done() wg.Done()
@ -150,10 +167,22 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa
errs = append(errs, e) errs = append(errs, e)
} }
if len(errs) != 0 { if len(errs) != len(clusters) {
return nil, errors.New("submit task failed") return nil, errors.New("submit task failed")
} }
if len(errs) != 0 {
var msg string
for _, err := range errs {
e := (err).(struct {
err error
clusterId string
})
msg += fmt.Sprintf("clusterId: %v , error: %v \n", e.clusterId, e.err.Error())
}
return nil, errors.New(msg)
}
for s := range ch { for s := range ch {
// TODO: database operation // TODO: database operation
results = append(results, s) results = append(results, s)

View File

@ -9,18 +9,18 @@ type AiCollector interface {
} }
type ResourceStats struct { type ResourceStats struct {
ParticipantId int64 ClusterId string
Name string Name string
CpuCoreAvail int64 CpuCoreAvail int64
CpuCoreTotal int64 CpuCoreTotal int64
MemAvail float64 MemAvail float64
MemTotal float64 MemTotal float64
DiskAvail float64 DiskAvail float64
DiskTotal float64 DiskTotal float64
GpuAvail int64 GpuAvail int64
CardsAvail []*Card CardsAvail []*Card
CpuCoreHours float64 CpuCoreHours float64
Balance float64 Balance float64
} }
type Card struct { type Card struct {

View File

@ -33,15 +33,14 @@ func (ps *DynamicResourcesStrategy) Schedule() ([]*AssignedCluster, error) {
for _, res := range ps.resources { for _, res := range ps.resources {
if opt.ResourceType == "cpu" { if opt.ResourceType == "cpu" {
if res.CpuCoreHours <= 0 { if res.CpuCoreHours <= 0 {
cluster := &AssignedCluster{ParticipantId: res.ParticipantId, Name: res.Name, Replicas: ps.replicas} cluster := &AssignedCluster{ClusterId: res.ClusterId, Replicas: ps.replicas}
results = append(results, cluster) results = append(results, cluster)
return results, nil return results, nil
} }
if res.CpuCoreHours > maxCpuCoreHoursAvailable { if res.CpuCoreHours > maxCpuCoreHoursAvailable {
maxCpuCoreHoursAvailable = res.CpuCoreHours maxCpuCoreHoursAvailable = res.CpuCoreHours
assignedCluster.Name = res.Name assignedCluster.ClusterId = res.ClusterId
assignedCluster.ParticipantId = res.ParticipantId
assignedCluster.Replicas = ps.replicas assignedCluster.Replicas = ps.replicas
} }
} }
@ -56,8 +55,7 @@ func (ps *DynamicResourcesStrategy) Schedule() ([]*AssignedCluster, error) {
} }
if maxCurrentCardHours > maxCardHoursAvailable { if maxCurrentCardHours > maxCardHoursAvailable {
maxCardHoursAvailable = maxCurrentCardHours maxCardHoursAvailable = maxCurrentCardHours
assignedCluster.Name = res.Name assignedCluster.ClusterId = res.ClusterId
assignedCluster.ParticipantId = res.ParticipantId
assignedCluster.Replicas = ps.replicas assignedCluster.Replicas = ps.replicas
} }
} }

View File

@ -1,23 +0,0 @@
package param
import "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/entity"
type ReplicationParams struct {
Replicas int32
*Params
}
func (r *ReplicationParams) GetReplicas() int32 {
return r.Replicas
}
func (r *ReplicationParams) GetParticipants() []*entity.Participant {
var participants []*entity.Participant
for _, resource := range r.Resources {
participants = append(participants, &entity.Participant{
Participant_id: resource.ParticipantId,
Name: resource.Name,
})
}
return participants
}

View File

@ -2,6 +2,7 @@ package param
import ( import (
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing"
"strconv"
) )
type ResourcePricingParams struct { type ResourcePricingParams struct {
@ -21,8 +22,9 @@ func (r *ResourcePricingParams) GetTask() *providerPricing.Task {
func (r *ResourcePricingParams) GetProviders() []*providerPricing.Provider { func (r *ResourcePricingParams) GetProviders() []*providerPricing.Provider {
var providerList []*providerPricing.Provider var providerList []*providerPricing.Provider
for _, resource := range r.Resources { for _, resource := range r.Resources {
id, _ := strconv.ParseInt(resource.ClusterId, 10, 64)
provider := providerPricing.NewProvider( provider := providerPricing.NewProvider(
resource.ParticipantId, id,
float64(resource.CpuCoreAvail), float64(resource.CpuCoreAvail),
resource.MemAvail, resource.MemAvail,
resource.DiskAvail, 0.0, 0.0, 0.0) resource.DiskAvail, 0.0, 0.0, 0.0)

View File

@ -2,33 +2,31 @@ package strategy
import ( import (
"errors" "errors"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/entity"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy/param"
) )
type ReplicationStrategy struct { type ReplicationStrategy struct {
replicas int32 replicas int32
participants []*entity.Participant clusterIds []string
} }
func NewReplicationStrategy(params *param.ReplicationParams) *ReplicationStrategy { func NewReplicationStrategy(clusterIds []string, replicas int32) *ReplicationStrategy {
return &ReplicationStrategy{replicas: params.GetReplicas(), return &ReplicationStrategy{clusterIds: clusterIds,
participants: params.GetParticipants(), replicas: replicas,
} }
} }
func (ps *ReplicationStrategy) Schedule() ([]*AssignedCluster, error) { func (r *ReplicationStrategy) Schedule() ([]*AssignedCluster, error) {
if ps.replicas < 1 { if r.replicas < 1 {
return nil, errors.New("replicas must be greater than 0") return nil, errors.New("replicas must be greater than 0")
} }
if ps.participants == nil { if len(r.clusterIds) == 0 {
return nil, errors.New("participantId must be set") return nil, errors.New("clusterIds must be set")
} }
var results []*AssignedCluster var results []*AssignedCluster
for _, p := range ps.participants { for _, c := range r.clusterIds {
cluster := &AssignedCluster{ParticipantId: p.Participant_id, Name: p.Name, Replicas: ps.replicas} cluster := &AssignedCluster{ClusterId: c, Replicas: r.replicas}
results = append(results, cluster) results = append(results, cluster)
} }
return results, nil return results, nil

View File

@ -18,6 +18,7 @@ import (
"errors" "errors"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy/param" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy/param"
"strconv"
) )
type PricingStrategy struct { type PricingStrategy struct {
@ -154,7 +155,7 @@ func (ps *PricingStrategy) Schedule() ([]*AssignedCluster, error) {
if e == 0 { if e == 0 {
continue continue
} }
cluster := &AssignedCluster{ParticipantId: ps.ProviderList[i].Pid, Replicas: int32(e)} cluster := &AssignedCluster{ClusterId: strconv.FormatInt(ps.ProviderList[i].Pid, 10), Replicas: int32(e)}
results = append(results, cluster) results = append(results, cluster)
} }

View File

@ -29,7 +29,7 @@ func (s *StaticWeightStrategy) Schedule() ([]*AssignedCluster, error) {
weights := make([]*weightDistributing.Weight, 0) weights := make([]*weightDistributing.Weight, 0)
for k, v := range s.staticWeightMap { for k, v := range s.staticWeightMap {
weight := &weightDistributing.Weight{ weight := &weightDistributing.Weight{
Name: k, Id: k,
Weight: v, Weight: v,
} }
weights = append(weights, weight) weights = append(weights, weight)
@ -39,7 +39,7 @@ func (s *StaticWeightStrategy) Schedule() ([]*AssignedCluster, error) {
var results []*AssignedCluster var results []*AssignedCluster
for _, weight := range weights { for _, weight := range weights {
cluster := &AssignedCluster{ParticipantId: weight.Id, Name: weight.Name, Replicas: weight.Replica} cluster := &AssignedCluster{ClusterId: weight.Id, Replicas: weight.Replica}
results = append(results, cluster) results = append(results, cluster)
} }

View File

@ -18,9 +18,8 @@ type Strategy interface {
} }
type AssignedCluster struct { type AssignedCluster struct {
ParticipantId int64 ClusterId string
Name string Replicas int32
Replicas int32
} }
func GetStrategyNames() []string { func GetStrategyNames() []string {

View File

@ -5,7 +5,6 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/entity" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/entity"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy/param"
"testing" "testing"
) )
@ -17,15 +16,15 @@ func TestReplication(t *testing.T) {
} }
rsc := []*collector.ResourceStats{ rsc := []*collector.ResourceStats{
{ {
ParticipantId: 1, ClusterId: "1",
Name: "test1", Name: "test1",
}, },
{ {
ParticipantId: 1, ClusterId: "2",
Name: "test2"}, Name: "test2"},
{ {
ParticipantId: 1, ClusterId: "3",
Name: "test3"}, Name: "test3"},
} }
tests := []struct { tests := []struct {
name string name string
@ -47,8 +46,11 @@ func TestReplication(t *testing.T) {
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
params := &param.Params{Resources: rsc} var clusterIds []string
repl := strategy.NewReplicationStrategy(&param.ReplicationParams{Params: params, Replicas: tt.replica}) for _, stats := range rsc {
clusterIds = append(clusterIds, stats.ClusterId)
}
repl := strategy.NewReplicationStrategy(clusterIds, 0)
schedule, err := repl.Schedule() schedule, err := repl.Schedule()
if err != nil { if err != nil {
return return

View File

@ -283,11 +283,11 @@ func (o *OctopusLink) GetResourceStats(ctx context.Context) (*collector.Resource
} }
resourceStats := &collector.ResourceStats{ resourceStats := &collector.ResourceStats{
ParticipantId: o.participantId, ClusterId: strconv.FormatInt(o.participantId, 10),
Name: o.platform, Name: o.platform,
Balance: balance, Balance: balance,
CardsAvail: cards, CardsAvail: cards,
CpuCoreHours: cpuHours, CpuCoreHours: cpuHours,
} }
return resourceStats, nil return resourceStats, nil

View File

@ -284,14 +284,14 @@ func (s *ShuguangAi) GetResourceStats(ctx context.Context) (*collector.ResourceS
totalDcu := limitResp.Data.AccountMaxDcu totalDcu := limitResp.Data.AccountMaxDcu
//disk //disk
//diskReq := &hpcAC.ParaStorQuotaReq{} diskReq := &hpcAC.ParaStorQuotaReq{}
//diskResp, err := s.aCRpc.ParaStorQuota(ctx, diskReq) diskResp, err := s.aCRpc.ParaStorQuota(ctx, diskReq)
//if err != nil { if err != nil {
// return nil, err return nil, err
//} }
//
//totalDisk := common.RoundFloat(diskResp.Data[0].Threshold*KB*KB*KB, 3) totalDisk := common.RoundFloat(diskResp.Data[0].Threshold*KB*KB*KB, 3)
//availDisk := common.RoundFloat((diskResp.Data[0].Threshold-diskResp.Data[0].Usage)*KB*KB*KB, 3) availDisk := common.RoundFloat((diskResp.Data[0].Threshold-diskResp.Data[0].Usage)*KB*KB*KB, 3)
//memory //memory
nodeResp, err := s.aCRpc.GetNodeResources(ctx, nil) nodeResp, err := s.aCRpc.GetNodeResources(ctx, nil)
@ -344,13 +344,13 @@ func (s *ShuguangAi) GetResourceStats(ctx context.Context) (*collector.ResourceS
} }
cards = append(cards, dcu) cards = append(cards, dcu)
resourceStats := &collector.ResourceStats{ resourceStats := &collector.ResourceStats{
ParticipantId: s.participantId, ClusterId: strconv.FormatInt(s.participantId, 10),
Name: s.platform, Name: s.platform,
Balance: balance, Balance: balance,
CpuCoreTotal: totalCpu, CpuCoreTotal: totalCpu,
CpuCoreAvail: CpuCoreAvail, CpuCoreAvail: CpuCoreAvail,
//DiskTotal: totalDisk, DiskTotal: totalDisk,
//DiskAvail: availDisk, DiskAvail: availDisk,
MemTotal: memSize, MemTotal: memSize,
MemAvail: MemAvail, MemAvail: MemAvail,
CpuCoreHours: cpuHours, CpuCoreHours: cpuHours,