diff --git a/api/internal/logic/core/pagelisttasklogic.go b/api/internal/logic/core/pagelisttasklogic.go index 97dad450..c7be973b 100644 --- a/api/internal/logic/core/pagelisttasklogic.go +++ b/api/internal/logic/core/pagelisttasklogic.go @@ -86,7 +86,8 @@ func (l *PageListTaskLogic) PageListTask(req *types.PageTaskReq) (resp *types.Pa } func (l *PageListTaskLogic) updateTaskStatus(tasklist []*types.TaskModel, ch chan<- struct{}) { - list := tasklist + list := make([]*types.TaskModel, len(tasklist)) + copy(list, tasklist) for i := len(list) - 1; i >= 0; i-- { if list[i].AdapterTypeDict != 1 || list[i].Status == constants.Succeeded || list[i].Status == constants.Failed { list = append(list[:i], list[i+1:]...) @@ -99,9 +100,9 @@ func (l *PageListTaskLogic) updateTaskStatus(tasklist []*types.TaskModel, ch cha } task := list[0] - for i, _ := range list { - earliest, _ := time.Parse(constants.Layout, task.UpdatedTime) - latest, _ := time.Parse(constants.Layout, list[i].UpdatedTime) + for i := range list { + earliest, _ := time.Parse(time.RFC3339, task.UpdatedTime) + latest, _ := time.Parse(time.RFC3339, list[i].UpdatedTime) if latest.Before(earliest) { task = list[i] } @@ -116,6 +117,11 @@ func (l *PageListTaskLogic) updateTaskStatus(tasklist []*types.TaskModel, ch cha } if len(aiTask) == 0 { + tx = l.svcCtx.DbEngin.Model(task).Table("task").Where("deleted_at is null").Updates(task) + if tx.Error != nil { + logx.Errorf(tx.Error.Error()) + return + } ch <- struct{}{} return } @@ -140,11 +146,19 @@ func (l *PageListTaskLogic) updateTaskStatus(tasklist []*types.TaskModel, ch cha for i := len(aiTask) - 1; i >= 0; i-- { if aiTask[i].StartTime == "" { + task.Status = aiTask[i].Status aiTask = append(aiTask[:i], aiTask[i+1:]...) } } if len(aiTask) == 0 { + task.UpdatedTime = time.Now().Format(constants.Layout) + tx = l.svcCtx.DbEngin.Table("task").Model(task).Updates(task) + if tx.Error != nil { + logx.Errorf(tx.Error.Error()) + ch <- struct{}{} + return + } ch <- struct{}{} return } @@ -208,7 +222,8 @@ func (l *PageListTaskLogic) updateTaskStatus(tasklist []*types.TaskModel, ch cha } func (l *PageListTaskLogic) updateAiTaskStatus(tasklist []*types.TaskModel, ch chan<- struct{}) { - list := tasklist + list := make([]*types.TaskModel, len(tasklist)) + copy(list, tasklist) for i := len(list) - 1; i >= 0; i-- { if list[i].AdapterTypeDict != 1 || list[i].Status == constants.Succeeded || list[i].Status == constants.Failed { list = append(list[:i], list[i+1:]...) @@ -221,7 +236,7 @@ func (l *PageListTaskLogic) updateAiTaskStatus(tasklist []*types.TaskModel, ch c } task := list[0] - for i, _ := range list { + for i := range list { earliest, _ := time.Parse(constants.Layout, task.UpdatedTime) latest, _ := time.Parse(constants.Layout, list[i].UpdatedTime) if latest.Before(earliest) { diff --git a/api/internal/scheduler/database/aiStorage.go b/api/internal/scheduler/database/aiStorage.go index f618ff73..36a69364 100644 --- a/api/internal/scheduler/database/aiStorage.go +++ b/api/internal/scheduler/database/aiStorage.go @@ -120,7 +120,7 @@ func (s *AiStorage) SaveAiTask(taskId int64, option *option.AiOption, clusterId ClusterId: cId, ClusterName: clusterName, Name: option.TaskName, - Replica: int64(option.Replica), + Replica: int64(option.Replica), JobId: jobId, TaskType: option.TaskType, Strategy: option.StrategyName, @@ -218,7 +218,7 @@ func (s *AiStorage) SaveClusterResources(clusterId string, clusterName string, c } func (s *AiStorage) UpdateClusterResources(clusterResource *models.TClusterResource) error { - tx := s.DbEngin.Updates(clusterResource) + tx := s.DbEngin.Model(clusterResource).Updates(clusterResource) if tx.Error != nil { return tx.Error } diff --git a/api/internal/scheduler/schedulers/aiScheduler.go b/api/internal/scheduler/schedulers/aiScheduler.go index 01187c5e..7979f966 100644 --- a/api/internal/scheduler/schedulers/aiScheduler.go +++ b/api/internal/scheduler/schedulers/aiScheduler.go @@ -26,6 +26,7 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy/param" "gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" @@ -90,38 +91,41 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) { return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: as.option.ClusterIds[0], Replicas: 1}}, nil } - //resources, err := as.findClustersWithResources() + resources, err := as.findClustersWithResources() - /* if err != nil { - return nil, err - } - if len(resources) == 0 { - return nil, errors.New("no cluster has resources") - } + if err != nil { + return nil, err + } + if len(resources) == 0 { + return nil, errors.New("no cluster has resources") + } - if len(resources) == 1 { - var cluster strategy.AssignedCluster - cluster.ClusterId = resources[0].ClusterId - cluster.Replicas = 1 - return &strategy.SingleAssignment{Cluster: &cluster}, nil - } + if len(resources) == 1 { + var cluster strategy.AssignedCluster + cluster.ClusterId = resources[0].ClusterId + cluster.Replicas = 1 + return &strategy.SingleAssignment{Cluster: &cluster}, nil + } - params := ¶m.Params{Resources: resources}*/ + params := ¶m.Params{Resources: resources} switch as.option.StrategyName { - /*case strategy.REPLICATION: - var clusterIds []string - for _, resource := range resources { - clusterIds = append(clusterIds, resource.ClusterId) - } - strategy := strategy.NewReplicationStrategy(clusterIds, 1) - return strategy, nil*/ - /*case strategy.RESOURCES_PRICING: + case strategy.REPLICATION: + var clusterIds []string + for _, resource := range resources { + if resource == nil { + continue + } + clusterIds = append(clusterIds, resource.ClusterId) + } + strategy := strategy.NewReplicationStrategy(clusterIds, 1) + return strategy, nil + case strategy.RESOURCES_PRICING: strategy := strategy.NewPricingStrategy(¶m.ResourcePricingParams{Params: params, Replicas: 1}) return strategy, nil case strategy.DYNAMIC_RESOURCES: strategy := strategy.NewDynamicResourcesStrategy(params.Resources, as.option, 1) - return strategy, nil*/ + return strategy, nil case strategy.STATIC_WEIGHT: //todo resources should match cluster StaticWeightMap strategy := strategy.NewStaticWeightStrategy(as.option.ClusterToStaticWeight, as.option.Replica) @@ -181,8 +185,9 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa return } + result := &AiResult{} mu.Lock() - result, _ := convertType(resp) + result, _ = convertType(resp) mu.Unlock() result.Replica = c.Replicas diff --git a/api/internal/scheduler/strategy/dynamicResources.go b/api/internal/scheduler/strategy/dynamicResources.go index 12a2172d..913ea0b8 100644 --- a/api/internal/scheduler/strategy/dynamicResources.go +++ b/api/internal/scheduler/strategy/dynamicResources.go @@ -31,6 +31,11 @@ func (ps *DynamicResourcesStrategy) Schedule() ([]*AssignedCluster, error) { var assignedCluster AssignedCluster var results []*AssignedCluster for _, res := range ps.resources { + + if res == nil { + continue + } + if opt.ResourceType == "cpu" { if res.CpuCoreHours <= 0 { cluster := &AssignedCluster{ClusterId: res.ClusterId, Replicas: ps.replicas} diff --git a/api/internal/storeLink/modelarts.go b/api/internal/storeLink/modelarts.go index d19b0b82..0652c8a5 100644 --- a/api/internal/storeLink/modelarts.go +++ b/api/internal/storeLink/modelarts.go @@ -21,6 +21,7 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils/timeutils" "gitlink.org.cn/JointCloud/pcm-modelarts/client/imagesservice" "gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice" "gitlink.org.cn/JointCloud/pcm-modelarts/modelarts" @@ -243,14 +244,20 @@ func (m *ModelArtsLink) GetTrainingTask(ctx context.Context, taskId string) (*co switch strings.ToLower(jobresp.Status.Phase) { case "completed": - task.Start = time.Unix(int64(jobresp.Status.StartTime)/1000, 0).Format(constants.Layout) - duration := jobresp.Status.Duration - task.End = time.Unix(int64(jobresp.Status.StartTime)/1000+int64(duration/1000), 0).Format(constants.Layout) + milliTimestamp := int64(jobresp.Status.StartTime) + task.Start = timeutils.MillisecondsToUTCString(milliTimestamp, time.DateTime) + duration := int64(jobresp.Status.Duration) + task.End = timeutils.MillisecondsToAddDurationToUTCString(milliTimestamp, duration, time.DateTime) task.Status = constants.Completed case "failed": + milliTimestamp := int64(jobresp.Status.StartTime) + task.Start = timeutils.MillisecondsToUTCString(milliTimestamp, time.DateTime) + duration := int64(jobresp.Status.Duration) + task.End = timeutils.MillisecondsToAddDurationToUTCString(milliTimestamp, duration, time.DateTime) task.Status = constants.Failed case "running": - task.Start = time.Unix(int64(jobresp.Status.StartTime)/1000, 0).Format(constants.Layout) + milliTimestamp := int64(jobresp.Status.StartTime) + task.Start = timeutils.MillisecondsToUTCString(milliTimestamp, time.DateTime) task.Status = constants.Running case "stopped": task.Status = constants.Stopped diff --git a/api/internal/storeLink/octopus.go b/api/internal/storeLink/octopus.go index 77d957d9..53c3652c 100644 --- a/api/internal/storeLink/octopus.go +++ b/api/internal/storeLink/octopus.go @@ -520,8 +520,12 @@ func (o *OctopusLink) GetTrainingTask(ctx context.Context, taskId string) (*coll } var task collector.Task task.Id = jobresp.Payload.TrainJob.Id - task.Start = time.Unix(jobresp.Payload.TrainJob.StartedAt, 0).Format(constants.Layout) - task.End = time.Unix(jobresp.Payload.TrainJob.CompletedAt, 0).Format(constants.Layout) + if jobresp.Payload.TrainJob.StartedAt != 0 { + task.Start = time.Unix(jobresp.Payload.TrainJob.StartedAt, 0).Format(constants.Layout) + } + if jobresp.Payload.TrainJob.CompletedAt != 0 { + task.End = time.Unix(jobresp.Payload.TrainJob.CompletedAt, 0).Format(constants.Layout) + } switch jobresp.Payload.TrainJob.Status { case "succeeded": task.Status = constants.Completed diff --git a/api/internal/storeLink/shuguangai.go b/api/internal/storeLink/shuguangai.go index cd17229a..2aa2a68c 100644 --- a/api/internal/storeLink/shuguangai.go +++ b/api/internal/storeLink/shuguangai.go @@ -521,8 +521,12 @@ func (s *ShuguangAi) GetTrainingTask(ctx context.Context, taskId string) (*colle } var task collector.Task task.Id = jobresp.Data.Id - task.Start = jobresp.Data.StartTime - task.End = jobresp.Data.EndTime + if jobresp.Data.StartTime != "" { + task.Start = jobresp.Data.StartTime + } + if jobresp.Data.EndTime != "" { + task.End = jobresp.Data.EndTime + } task.Status = jobresp.Data.Status return &task, nil diff --git a/deploy/yaml.tar.gz b/deploy/yaml.tar.gz index 11d4d0e5..53cb32ac 100644 Binary files a/deploy/yaml.tar.gz and b/deploy/yaml.tar.gz differ diff --git a/pkg/constants/time.go b/pkg/constants/time.go index deecc715..7a1469ef 100644 --- a/pkg/constants/time.go +++ b/pkg/constants/time.go @@ -1,3 +1,6 @@ package constants -const Layout = "2006-01-02 15:04:05" +const ( + Layout = "2006-01-02 15:04:05" + Layout_temp = "2006-01-02T15:04:05Z07:00" +) diff --git a/pkg/utils/timeutils/time.go b/pkg/utils/timeutils/time.go index 20b0accc..5e98e3a7 100644 --- a/pkg/utils/timeutils/time.go +++ b/pkg/utils/timeutils/time.go @@ -77,3 +77,33 @@ func UnixTimeToString(ut int64) string { return t.Format("2006-01-02 15:04:05") } + +// MillisecondsToUTCString 将毫秒时间戳转换为UTC时间的字符串表示 +func MillisecondsToUTCString(milliseconds int64, layout string) string { + // 将毫秒转换为秒 + timestamp := milliseconds / 1000 + // 创建time.Time对象 + timeObj := time.Unix(timestamp, (milliseconds%1000)*int64(time.Millisecond)) + // 使用RFC3339格式返回UTC时间字符串 + return timeObj.Local().Format(layout) +} + +// SecondsToUTCString 将秒时间戳转换为UTC时间的字符串表示 +func SecondsToUTCString(seconds int64, layout string) string { + // 创建time.Time对象 + timeObj := time.Unix(seconds, 0) + // 使用RFC3339格式返回UTC时间字符串 + return timeObj.Local().Format(layout) +} + +// MillisecondsToAddDurationToUTCString 将毫秒时间戳加上持续时间毫秒后转换为UTC时间的字符串表示 +func MillisecondsToAddDurationToUTCString(milliseconds int64, durationMilliseconds int64, layout string) string { + // 将毫秒时间戳转换为秒 + timestamp := milliseconds / 1000 + // 创建time.Time对象 + timeObj := time.Unix(timestamp, (milliseconds%1000)*int64(time.Millisecond)) + // 添加持续时间 + duration := time.Duration(durationMilliseconds) * time.Millisecond + // 返回加上持续时间后的UTC时间字符串 + return timeObj.Add(duration).Local().Format(layout) +}