From 62ecd4e2e45167839c92501c164b7aee071d8db2 Mon Sep 17 00:00:00 2001 From: tzwang Date: Thu, 16 May 2024 16:49:50 +0800 Subject: [PATCH] fix aischeduler bugs Former-commit-id: 887857a40bd3492ac3a412902893cb3990048677 --- .../scheduler/schedulers/aiScheduler.go | 59 ++++++++++--------- api/internal/storeLink/octopus.go | 7 ++- api/internal/storeLink/shuguangai.go | 8 ++- 3 files changed, 45 insertions(+), 29 deletions(-) diff --git a/api/internal/scheduler/schedulers/aiScheduler.go b/api/internal/scheduler/schedulers/aiScheduler.go index 50cf27ca..84b2fe35 100644 --- a/api/internal/scheduler/schedulers/aiScheduler.go +++ b/api/internal/scheduler/schedulers/aiScheduler.go @@ -130,39 +130,44 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa var wg sync.WaitGroup var results []*AiResult var errs []interface{} - var ch = make(chan *AiResult, len(clusters)) - var errCh = make(chan interface{}, len(clusters)) + var taskNum int32 + for _, cluster := range clusters { + taskNum += cluster.Replicas + } + var ch = make(chan *AiResult, taskNum) + var errCh = make(chan interface{}, taskNum) executorMap := as.AiService.AiExecutorAdapterMap[as.option.AdapterId] for _, cluster := range clusters { c := cluster - wg.Add(1) - go func() { - opt, _ := cloneAiOption(as.option) - resp, err := executorMap[c.ClusterId].Execute(as.ctx, opt) - - if err != nil { - e := struct { - err error - clusterId string - }{ - err: err, - clusterId: c.ClusterId, + for i := 0; i < int(c.Replicas); i++ { + wg.Add(1) + go func() { + opt, _ := cloneAiOption(as.option) + resp, err := executorMap[c.ClusterId].Execute(as.ctx, opt) + if err != nil { + e := struct { + err error + clusterId string + }{ + err: err, + clusterId: c.ClusterId, + } + errCh <- e + wg.Done() + return } - errCh <- e + + result, _ := convertType(resp) + result.Replica = c.Replicas + result.ClusterId = c.ClusterId + result.Strategy = as.option.StrategyName + result.Card = opt.ComputeCard + + ch <- result wg.Done() - return - } - - result, _ := convertType(resp) - result.Replica = c.Replicas - result.ClusterId = c.ClusterId - result.Strategy = as.option.StrategyName - result.Card = opt.ComputeCard - - ch <- result - wg.Done() - }() + }() + } } wg.Wait() close(ch) diff --git a/api/internal/storeLink/octopus.go b/api/internal/storeLink/octopus.go index ff4c9f32..97f949d6 100644 --- a/api/internal/storeLink/octopus.go +++ b/api/internal/storeLink/octopus.go @@ -591,7 +591,10 @@ func (o *OctopusLink) generateResourceId(ctx context.Context, option *option.AiO } if option.ResourceType == CARD { - err = setResourceIdByCard(option, specResp, GCU) + if option.ComputeCard == "" { + option.ComputeCard = GCU + } + err = setResourceIdByCard(option, specResp, option.ComputeCard) if err != nil { return err } @@ -742,6 +745,8 @@ func (o *OctopusLink) generateCmd(option *option.AiOption) error { switch option.ComputeCard { case GCU: option.Cmd = "cd /code; python3 train.py" + case MLU: + option.Cmd = "su root; cd /torch/venv3/pytorch/bin; source activate; cd /code; python train.py" default: option.Cmd = TRAIN_CMD } diff --git a/api/internal/storeLink/shuguangai.go b/api/internal/storeLink/shuguangai.go index b2be408c..415f14b1 100644 --- a/api/internal/storeLink/shuguangai.go +++ b/api/internal/storeLink/shuguangai.go @@ -33,7 +33,7 @@ import ( const ( RAM_SIZE_1G = 1024 // 1G WORKER_NUMBER = 1 - DCU = "dcu" + DCU = "DCU" DCU_TOPS = 24.5 PYTORCH = "Pytorch" TASK_PYTORCH_PREFIX = "PytorchTask" @@ -570,7 +570,13 @@ func (s *ShuguangAi) generateResourceId(option *option.AiOption) error { } if option.ResourceType == CARD { + if option.ComputeCard == "" { + option.ComputeCard = DCU + } + if strings.ToUpper(option.ComputeCard) != DCU { + return errors.New("computeCard not found") + } option.ComputeCard = DCU if 0 <= option.Tops && option.Tops <= DCU_TOPS {