From acdd2abbfe5249f1b7d51fd7f4636df4e1760b14 Mon Sep 17 00:00:00 2001 From: tzwang Date: Thu, 28 Mar 2024 17:33:59 +0800 Subject: [PATCH] updated acquire resources concurrently Former-commit-id: a8d6c0ac6ba317e6c6138029533027b244fa2491 --- .../scheduler/schedulers/aiScheduler.go | 54 +++++-- api/internal/storeLink/storeLink.go | 148 ++++++++++++------ 2 files changed, 144 insertions(+), 58 deletions(-) diff --git a/api/internal/scheduler/schedulers/aiScheduler.go b/api/internal/scheduler/schedulers/aiScheduler.go index 3ac52d10..1667f8af 100644 --- a/api/internal/scheduler/schedulers/aiScheduler.go +++ b/api/internal/scheduler/schedulers/aiScheduler.go @@ -24,6 +24,7 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" + "sync" ) type AiScheduler struct { @@ -98,25 +99,60 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) error { executorMap := *as.AiExecutor for _, cluster := range clusters { - _, err := executorMap[cluster.Name].Execute(as.option) - if err != nil { - // TODO: database operation + c := cluster + if cluster.Replicas == 0 { + continue } - // TODO: database operation + go func() { + _, err := executorMap[c.Name].Execute(as.option) + if err != nil { + // TODO: database operation + } + // TODO: database operation + }() } return nil } func (as *AiScheduler) findClustersWithResources() ([]*collector.ResourceStats, error) { + var wg sync.WaitGroup + var ch = make(chan *collector.ResourceStats, len(*as.ResourceCollector)) + var errCh = make(chan error, len(*as.ResourceCollector)) + var resourceSpecs []*collector.ResourceStats + var errs []error + for _, resourceCollector := range *as.ResourceCollector { - spec, err := resourceCollector.GetResourceStats() - if err != nil { - continue - } - resourceSpecs = append(resourceSpecs, spec) + wg.Add(1) + rc := resourceCollector + go func() { + spec, err := rc.GetResourceStats() + if err != nil { + errCh <- err + wg.Done() + return + } + ch <- spec + wg.Done() + }() } + wg.Wait() + close(ch) + close(errCh) + + for s := range ch { + resourceSpecs = append(resourceSpecs, s) + } + + for e := range errCh { + errs = append(errs, e) + } + + if len(errs) != 0 { + return nil, errors.New("get resources failed") + } + if len(resourceSpecs) == 0 { return nil, errors.New("no resource found") } diff --git a/api/internal/storeLink/storeLink.go b/api/internal/storeLink/storeLink.go index e54c722c..a0bb1919 100644 --- a/api/internal/storeLink/storeLink.go +++ b/api/internal/storeLink/storeLink.go @@ -28,6 +28,7 @@ import ( "gitlink.org.cn/jcce-pcm/pcm-participant-octopus/octopus" "gorm.io/gorm" "strings" + "sync" ) type Linkage interface { @@ -124,73 +125,122 @@ func GetResourceTypes() []string { } func GetDatasetsNames(collectorMap *map[string]collector.AiCollector) ([]string, error) { + var wg sync.WaitGroup + var errCh = make(chan error, len(*collectorMap)) + var errs []error var names []string - //errCount := 0 + var mu sync.Mutex colMap := *collectorMap for _, col := range colMap { - var ns []string - specs, err := col.GetDatasetsSpecs() - if err != nil { - return nil, errors.New("failed to acquire datasets list") - } - for _, spec := range specs { - ns = append(ns, spec.Name) - } - if len(ns) == 0 { - continue - } - if len(names) == 0 { - names = ns - continue - } - - names = common.IntersectString(names, ns) + wg.Add(1) + c := col + go func() { + var ns []string + specs, err := c.GetDatasetsSpecs() + if err != nil { + errCh <- err + wg.Done() + return + } + for _, spec := range specs { + ns = append(ns, spec.Name) + } + if len(ns) == 0 { + wg.Done() + return + } + mu.Lock() + if len(names) == 0 { + names = ns + wg.Done() + mu.Unlock() + return + } + names = common.IntersectString(names, ns) + wg.Done() + mu.Unlock() + }() } - //if (len(*collectorMap) - errCount) < 2 { - // - //} + wg.Wait() + close(errCh) + + for e := range errCh { + errs = append(errs, e) + } + + if len(errs) != 0 { + return nil, errors.New("get DatasetsNames failed") + } + names = common.RemoveDuplicates(names) return names, nil } func GetAlgorithms(collectorMap *map[string]collector.AiCollector, resourceType string, taskType string, dataset string) ([]string, error) { var names []string + var wg sync.WaitGroup + var errCh = make(chan error, len(*collectorMap)) + var errs []error + var mu sync.Mutex + colMap := *collectorMap for _, col := range colMap { - var ns []string - algorithms, err := col.GetAlgorithms() - if err != nil { - return nil, err - } - for _, algorithm := range algorithms { - if algorithm.TaskType != taskType { - continue + wg.Add(1) + c := col + go func() { + var ns []string + algorithms, err := c.GetAlgorithms() + if err != nil { + errCh <- err + wg.Done() + return } - switch algorithm.Platform { - case OCTOPUS: - splitns := strings.Split(algorithm.Name, UNDERSCORE) - if dataset != splitns[0] || len(splitns) == 1 { + for _, algorithm := range algorithms { + if algorithm.TaskType != taskType { continue } - ns = append(ns, splitns[1]) - case SHUGUANGAI: - splitns := strings.Split(algorithm.Name, DASH) - if dataset != splitns[0] || len(splitns) == 1 { - continue + switch algorithm.Platform { + case OCTOPUS: + splitns := strings.Split(algorithm.Name, UNDERSCORE) + if dataset != splitns[0] || len(splitns) == 1 { + continue + } + ns = append(ns, splitns[1]) + case SHUGUANGAI: + splitns := strings.Split(algorithm.Name, DASH) + if dataset != splitns[0] || len(splitns) == 1 { + continue + } + ns = append(ns, splitns[1]) } - ns = append(ns, splitns[1]) } - } - if len(ns) == 0 { - continue - } - if len(names) == 0 { - names = ns - continue - } - - names = common.IntersectString(names, ns) + if len(ns) == 0 { + wg.Done() + return + } + mu.Lock() + if len(names) == 0 { + names = ns + wg.Done() + mu.Unlock() + return + } + names = common.IntersectString(names, ns) + wg.Done() + mu.Unlock() + }() } + wg.Wait() + close(errCh) + + for e := range errCh { + errs = append(errs, e) + } + + if len(errs) != 0 { + return nil, errors.New("get Algorithms failed") + } + names = common.RemoveDuplicates(names) return names, nil }