From fa1390fed2f643b37959e9d689a0a2df784a59bf Mon Sep 17 00:00:00 2001 From: qiwang <1364512070@qq.com> Date: Mon, 20 May 2024 20:43:55 +0800 Subject: [PATCH 1/2] fix: add ai Random RandomStrategy Former-commit-id: 6293f4eeb0e6f7f60b9f75522cdf19c7fb332643 --- .../logic/schedule/schedulesubmitlogic.go | 1 - .../scheduler/schedulers/aiScheduler.go | 45 ++++++++++--------- 2 files changed, 23 insertions(+), 23 deletions(-) diff --git a/api/internal/logic/schedule/schedulesubmitlogic.go b/api/internal/logic/schedule/schedulesubmitlogic.go index b2759a0c..5ce7a6ee 100644 --- a/api/internal/logic/schedule/schedulesubmitlogic.go +++ b/api/internal/logic/schedule/schedulesubmitlogic.go @@ -45,7 +45,6 @@ func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *type Params: req.AiOption.Params, Envs: req.AiOption.Envs, Cmd: req.AiOption.Cmd, - ClusterIds: req.AiOption.AiClusterIds, } aiSchdl, err := schedulers.NewAiScheduler(l.ctx, "", l.svcCtx.Scheduler, opt) if err != nil { diff --git a/api/internal/scheduler/schedulers/aiScheduler.go b/api/internal/scheduler/schedulers/aiScheduler.go index c3aceb12..79e11a0d 100644 --- a/api/internal/scheduler/schedulers/aiScheduler.go +++ b/api/internal/scheduler/schedulers/aiScheduler.go @@ -26,6 +26,7 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy/param" "gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" @@ -89,38 +90,38 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) { return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: as.option.ClusterIds[0], Replicas: 1}}, nil } - /* resources, err := as.findClustersWithResources() + resources, err := as.findClustersWithResources() - if err != nil { - return nil, err - } - if len(resources) == 0 { - return nil, errors.New("no cluster has resources") - } + if err != nil { + return nil, err + } + if len(resources) == 0 { + return nil, errors.New("no cluster has resources") + } - if len(resources) == 1 { - var cluster strategy.AssignedCluster - cluster.ClusterId = resources[0].ClusterId - cluster.Replicas = 1 - return &strategy.SingleAssignment{Cluster: &cluster}, nil - } + if len(resources) == 1 { + var cluster strategy.AssignedCluster + cluster.ClusterId = resources[0].ClusterId + cluster.Replicas = 1 + return &strategy.SingleAssignment{Cluster: &cluster}, nil + } - params := ¶m.Params{Resources: resources}*/ + params := ¶m.Params{Resources: resources} switch as.option.StrategyName { case strategy.REPLICATION: var clusterIds []string - /* for _, resource := range resources { + for _, resource := range resources { clusterIds = append(clusterIds, resource.ClusterId) - }*/ + } strategy := strategy.NewReplicationStrategy(clusterIds, 1) return strategy, nil - /* case strategy.RESOURCES_PRICING: - strategy := strategy.NewPricingStrategy(¶m.ResourcePricingParams{Params: params, Replicas: 1}) - return strategy, nil - case strategy.DYNAMIC_RESOURCES: - strategy := strategy.NewDynamicResourcesStrategy(params.Resources, as.option, 1) - return strategy, nil*/ + case strategy.RESOURCES_PRICING: + strategy := strategy.NewPricingStrategy(¶m.ResourcePricingParams{Params: params, Replicas: 1}) + return strategy, nil + case strategy.DYNAMIC_RESOURCES: + strategy := strategy.NewDynamicResourcesStrategy(params.Resources, as.option, 1) + return strategy, nil case strategy.STATIC_WEIGHT: //todo resources should match cluster StaticWeightMap strategy := strategy.NewStaticWeightStrategy(as.option.ClusterToStaticWeight, as.option.Replica) From 1a2e4b031acbb6b2bd527f64803e1c27662f4704 Mon Sep 17 00:00:00 2001 From: jagger Date: Mon, 20 May 2024 21:13:06 +0800 Subject: [PATCH 2/2] fix bug Signed-off-by: jagger Former-commit-id: b2946b0f6946b3c587f6b1f3ced3068cc005459c --- api/internal/storeLink/modelarts.go | 56 +++++++++++++++++++++++++++-- 1 file changed, 54 insertions(+), 2 deletions(-) diff --git a/api/internal/storeLink/modelarts.go b/api/internal/storeLink/modelarts.go index c91df822..d19b0b82 100644 --- a/api/internal/storeLink/modelarts.go +++ b/api/internal/storeLink/modelarts.go @@ -19,12 +19,14 @@ import ( "github.com/pkg/errors" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "gitlink.org.cn/JointCloud/pcm-modelarts/client/imagesservice" "gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice" "gitlink.org.cn/JointCloud/pcm-modelarts/modelarts" "strconv" "strings" + "time" ) const ( @@ -207,11 +209,61 @@ func (m *ModelArtsLink) UploadAlgorithmCode(ctx context.Context, resourceType st } func (m *ModelArtsLink) GetTrainingTaskLog(ctx context.Context, taskId string, instanceNum string) (string, error) { - return "", nil + req := &modelartsservice.GetTrainingJobLogsPreviewReq{ + Platform: m.platform, + TaskId: "worker-0", + TrainingJobId: taskId, + } + resp, err := m.modelArtsRpc.GetTrainingJobLogsPreview(ctx, req) + if err != nil { + return "", err + } + + if strings.Contains(resp.Content, "404 Not Found") { + resp.Content = "waiting for logs..." + } + return resp.Content, nil } func (m *ModelArtsLink) GetTrainingTask(ctx context.Context, taskId string) (*collector.Task, error) { - return nil, nil + resp, err := m.QueryTask(ctx, taskId) + if err != nil { + return nil, err + } + jobresp, ok := (resp).(*modelartsservice.JobResponse) + if jobresp.ErrorMsg != "" || !ok { + if jobresp.ErrorMsg != "" { + return nil, errors.New(jobresp.ErrorMsg) + } else { + return nil, errors.New("get training task failed, empty error returned") + } + } + var task collector.Task + task.Id = jobresp.Metadata.Id + + switch strings.ToLower(jobresp.Status.Phase) { + case "completed": + task.Start = time.Unix(int64(jobresp.Status.StartTime)/1000, 0).Format(constants.Layout) + duration := jobresp.Status.Duration + task.End = time.Unix(int64(jobresp.Status.StartTime)/1000+int64(duration/1000), 0).Format(constants.Layout) + task.Status = constants.Completed + case "failed": + task.Status = constants.Failed + case "running": + task.Start = time.Unix(int64(jobresp.Status.StartTime)/1000, 0).Format(constants.Layout) + task.Status = constants.Running + case "stopped": + task.Status = constants.Stopped + case "pending": + task.Status = constants.Pending + case "terminated": + //TODO Failed + task.Status = constants.Failed + default: + task.Status = "undefined" + } + + return &task, nil } func (m *ModelArtsLink) Execute(ctx context.Context, option *option.AiOption) (interface{}, error) {