Former-commit-id: 98a7d3a544ba31c4376a5cb29a24c666d8734b4f
This commit is contained in:
tzwang 2024-05-21 08:58:16 +08:00
commit 6a3d79c82d
3 changed files with 77 additions and 25 deletions

View File

@ -45,7 +45,6 @@ func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *type
Params: req.AiOption.Params,
Envs: req.AiOption.Envs,
Cmd: req.AiOption.Cmd,
ClusterIds: req.AiOption.AiClusterIds,
}
aiSchdl, err := schedulers.NewAiScheduler(l.ctx, "", l.svcCtx.Scheduler, opt)
if err != nil {

View File

@ -26,6 +26,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy/param"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
@ -90,7 +91,7 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: as.option.ClusterIds[0], Replicas: 1}}, nil
}
/* resources, err := as.findClustersWithResources()
resources, err := as.findClustersWithResources()
if err != nil {
return nil, err
@ -106,22 +107,22 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
return &strategy.SingleAssignment{Cluster: &cluster}, nil
}
params := &param.Params{Resources: resources}*/
params := &param.Params{Resources: resources}
switch as.option.StrategyName {
case strategy.REPLICATION:
var clusterIds []string
/* for _, resource := range resources {
for _, resource := range resources {
clusterIds = append(clusterIds, resource.ClusterId)
}*/
}
strategy := strategy.NewReplicationStrategy(clusterIds, 1)
return strategy, nil
/* case strategy.RESOURCES_PRICING:
case strategy.RESOURCES_PRICING:
strategy := strategy.NewPricingStrategy(&param.ResourcePricingParams{Params: params, Replicas: 1})
return strategy, nil
case strategy.DYNAMIC_RESOURCES:
strategy := strategy.NewDynamicResourcesStrategy(params.Resources, as.option, 1)
return strategy, nil*/
return strategy, nil
case strategy.STATIC_WEIGHT:
//todo resources should match cluster StaticWeightMap
strategy := strategy.NewStaticWeightStrategy(as.option.ClusterToStaticWeight, as.option.Replica)

View File

@ -19,12 +19,14 @@ import (
"github.com/pkg/errors"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gitlink.org.cn/JointCloud/pcm-modelarts/client/imagesservice"
"gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice"
"gitlink.org.cn/JointCloud/pcm-modelarts/modelarts"
"strconv"
"strings"
"time"
)
const (
@ -207,11 +209,61 @@ func (m *ModelArtsLink) UploadAlgorithmCode(ctx context.Context, resourceType st
}
func (m *ModelArtsLink) GetTrainingTaskLog(ctx context.Context, taskId string, instanceNum string) (string, error) {
return "", nil
req := &modelartsservice.GetTrainingJobLogsPreviewReq{
Platform: m.platform,
TaskId: "worker-0",
TrainingJobId: taskId,
}
resp, err := m.modelArtsRpc.GetTrainingJobLogsPreview(ctx, req)
if err != nil {
return "", err
}
if strings.Contains(resp.Content, "404 Not Found") {
resp.Content = "waiting for logs..."
}
return resp.Content, nil
}
func (m *ModelArtsLink) GetTrainingTask(ctx context.Context, taskId string) (*collector.Task, error) {
return nil, nil
resp, err := m.QueryTask(ctx, taskId)
if err != nil {
return nil, err
}
jobresp, ok := (resp).(*modelartsservice.JobResponse)
if jobresp.ErrorMsg != "" || !ok {
if jobresp.ErrorMsg != "" {
return nil, errors.New(jobresp.ErrorMsg)
} else {
return nil, errors.New("get training task failed, empty error returned")
}
}
var task collector.Task
task.Id = jobresp.Metadata.Id
switch strings.ToLower(jobresp.Status.Phase) {
case "completed":
task.Start = time.Unix(int64(jobresp.Status.StartTime)/1000, 0).Format(constants.Layout)
duration := jobresp.Status.Duration
task.End = time.Unix(int64(jobresp.Status.StartTime)/1000+int64(duration/1000), 0).Format(constants.Layout)
task.Status = constants.Completed
case "failed":
task.Status = constants.Failed
case "running":
task.Start = time.Unix(int64(jobresp.Status.StartTime)/1000, 0).Format(constants.Layout)
task.Status = constants.Running
case "stopped":
task.Status = constants.Stopped
case "pending":
task.Status = constants.Pending
case "terminated":
//TODO Failed
task.Status = constants.Failed
default:
task.Status = "undefined"
}
return &task, nil
}
func (m *ModelArtsLink) Execute(ctx context.Context, option *option.AiOption) (interface{}, error) {