Merge remote-tracking branch 'origin/master' into master-wq

# Conflicts:
#	api/internal/scheduler/schedulers/aiScheduler.go


Former-commit-id: 4a5c08df6633b6956fe99b9b2745afc383f3c15c
This commit is contained in:
qiwang 2024-05-21 19:00:18 +08:00
commit bbccccb408
10 changed files with 114 additions and 41 deletions

View File

@ -86,7 +86,8 @@ func (l *PageListTaskLogic) PageListTask(req *types.PageTaskReq) (resp *types.Pa
}
func (l *PageListTaskLogic) updateTaskStatus(tasklist []*types.TaskModel, ch chan<- struct{}) {
list := tasklist
list := make([]*types.TaskModel, len(tasklist))
copy(list, tasklist)
for i := len(list) - 1; i >= 0; i-- {
if list[i].AdapterTypeDict != 1 || list[i].Status == constants.Succeeded || list[i].Status == constants.Failed {
list = append(list[:i], list[i+1:]...)
@ -99,9 +100,9 @@ func (l *PageListTaskLogic) updateTaskStatus(tasklist []*types.TaskModel, ch cha
}
task := list[0]
for i, _ := range list {
earliest, _ := time.Parse(constants.Layout, task.UpdatedTime)
latest, _ := time.Parse(constants.Layout, list[i].UpdatedTime)
for i := range list {
earliest, _ := time.Parse(time.RFC3339, task.UpdatedTime)
latest, _ := time.Parse(time.RFC3339, list[i].UpdatedTime)
if latest.Before(earliest) {
task = list[i]
}
@ -116,6 +117,11 @@ func (l *PageListTaskLogic) updateTaskStatus(tasklist []*types.TaskModel, ch cha
}
if len(aiTask) == 0 {
tx = l.svcCtx.DbEngin.Model(task).Table("task").Where("deleted_at is null").Updates(task)
if tx.Error != nil {
logx.Errorf(tx.Error.Error())
return
}
ch <- struct{}{}
return
}
@ -140,11 +146,19 @@ func (l *PageListTaskLogic) updateTaskStatus(tasklist []*types.TaskModel, ch cha
for i := len(aiTask) - 1; i >= 0; i-- {
if aiTask[i].StartTime == "" {
task.Status = aiTask[i].Status
aiTask = append(aiTask[:i], aiTask[i+1:]...)
}
}
if len(aiTask) == 0 {
task.UpdatedTime = time.Now().Format(constants.Layout)
tx = l.svcCtx.DbEngin.Table("task").Model(task).Updates(task)
if tx.Error != nil {
logx.Errorf(tx.Error.Error())
ch <- struct{}{}
return
}
ch <- struct{}{}
return
}
@ -208,7 +222,8 @@ func (l *PageListTaskLogic) updateTaskStatus(tasklist []*types.TaskModel, ch cha
}
func (l *PageListTaskLogic) updateAiTaskStatus(tasklist []*types.TaskModel, ch chan<- struct{}) {
list := tasklist
list := make([]*types.TaskModel, len(tasklist))
copy(list, tasklist)
for i := len(list) - 1; i >= 0; i-- {
if list[i].AdapterTypeDict != 1 || list[i].Status == constants.Succeeded || list[i].Status == constants.Failed {
list = append(list[:i], list[i+1:]...)
@ -221,7 +236,7 @@ func (l *PageListTaskLogic) updateAiTaskStatus(tasklist []*types.TaskModel, ch c
}
task := list[0]
for i, _ := range list {
for i := range list {
earliest, _ := time.Parse(constants.Layout, task.UpdatedTime)
latest, _ := time.Parse(constants.Layout, list[i].UpdatedTime)
if latest.Before(earliest) {

View File

@ -120,7 +120,7 @@ func (s *AiStorage) SaveAiTask(taskId int64, option *option.AiOption, clusterId
ClusterId: cId,
ClusterName: clusterName,
Name: option.TaskName,
Replica: int64(option.Replica),
Replica: int64(option.Replica),
JobId: jobId,
TaskType: option.TaskType,
Strategy: option.StrategyName,
@ -218,7 +218,7 @@ func (s *AiStorage) SaveClusterResources(clusterId string, clusterName string, c
}
func (s *AiStorage) UpdateClusterResources(clusterResource *models.TClusterResource) error {
tx := s.DbEngin.Updates(clusterResource)
tx := s.DbEngin.Model(clusterResource).Updates(clusterResource)
if tx.Error != nil {
return tx.Error
}

View File

@ -26,6 +26,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy/param"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
@ -90,38 +91,41 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: as.option.ClusterIds[0], Replicas: 1}}, nil
}
//resources, err := as.findClustersWithResources()
resources, err := as.findClustersWithResources()
/* if err != nil {
return nil, err
}
if len(resources) == 0 {
return nil, errors.New("no cluster has resources")
}
if err != nil {
return nil, err
}
if len(resources) == 0 {
return nil, errors.New("no cluster has resources")
}
if len(resources) == 1 {
var cluster strategy.AssignedCluster
cluster.ClusterId = resources[0].ClusterId
cluster.Replicas = 1
return &strategy.SingleAssignment{Cluster: &cluster}, nil
}
if len(resources) == 1 {
var cluster strategy.AssignedCluster
cluster.ClusterId = resources[0].ClusterId
cluster.Replicas = 1
return &strategy.SingleAssignment{Cluster: &cluster}, nil
}
params := &param.Params{Resources: resources}*/
params := &param.Params{Resources: resources}
switch as.option.StrategyName {
/*case strategy.REPLICATION:
var clusterIds []string
for _, resource := range resources {
clusterIds = append(clusterIds, resource.ClusterId)
}
strategy := strategy.NewReplicationStrategy(clusterIds, 1)
return strategy, nil*/
/*case strategy.RESOURCES_PRICING:
case strategy.REPLICATION:
var clusterIds []string
for _, resource := range resources {
if resource == nil {
continue
}
clusterIds = append(clusterIds, resource.ClusterId)
}
strategy := strategy.NewReplicationStrategy(clusterIds, 1)
return strategy, nil
case strategy.RESOURCES_PRICING:
strategy := strategy.NewPricingStrategy(&param.ResourcePricingParams{Params: params, Replicas: 1})
return strategy, nil
case strategy.DYNAMIC_RESOURCES:
strategy := strategy.NewDynamicResourcesStrategy(params.Resources, as.option, 1)
return strategy, nil*/
return strategy, nil
case strategy.STATIC_WEIGHT:
//todo resources should match cluster StaticWeightMap
strategy := strategy.NewStaticWeightStrategy(as.option.ClusterToStaticWeight, as.option.Replica)
@ -181,8 +185,9 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa
return
}
result := &AiResult{}
mu.Lock()
result, _ := convertType(resp)
result, _ = convertType(resp)
mu.Unlock()
result.Replica = c.Replicas

View File

@ -31,6 +31,11 @@ func (ps *DynamicResourcesStrategy) Schedule() ([]*AssignedCluster, error) {
var assignedCluster AssignedCluster
var results []*AssignedCluster
for _, res := range ps.resources {
if res == nil {
continue
}
if opt.ResourceType == "cpu" {
if res.CpuCoreHours <= 0 {
cluster := &AssignedCluster{ClusterId: res.ClusterId, Replicas: ps.replicas}

View File

@ -21,6 +21,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils/timeutils"
"gitlink.org.cn/JointCloud/pcm-modelarts/client/imagesservice"
"gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice"
"gitlink.org.cn/JointCloud/pcm-modelarts/modelarts"
@ -243,14 +244,20 @@ func (m *ModelArtsLink) GetTrainingTask(ctx context.Context, taskId string) (*co
switch strings.ToLower(jobresp.Status.Phase) {
case "completed":
task.Start = time.Unix(int64(jobresp.Status.StartTime)/1000, 0).Format(constants.Layout)
duration := jobresp.Status.Duration
task.End = time.Unix(int64(jobresp.Status.StartTime)/1000+int64(duration/1000), 0).Format(constants.Layout)
milliTimestamp := int64(jobresp.Status.StartTime)
task.Start = timeutils.MillisecondsToUTCString(milliTimestamp, time.DateTime)
duration := int64(jobresp.Status.Duration)
task.End = timeutils.MillisecondsToAddDurationToUTCString(milliTimestamp, duration, time.DateTime)
task.Status = constants.Completed
case "failed":
milliTimestamp := int64(jobresp.Status.StartTime)
task.Start = timeutils.MillisecondsToUTCString(milliTimestamp, time.DateTime)
duration := int64(jobresp.Status.Duration)
task.End = timeutils.MillisecondsToAddDurationToUTCString(milliTimestamp, duration, time.DateTime)
task.Status = constants.Failed
case "running":
task.Start = time.Unix(int64(jobresp.Status.StartTime)/1000, 0).Format(constants.Layout)
milliTimestamp := int64(jobresp.Status.StartTime)
task.Start = timeutils.MillisecondsToUTCString(milliTimestamp, time.DateTime)
task.Status = constants.Running
case "stopped":
task.Status = constants.Stopped

View File

@ -520,8 +520,12 @@ func (o *OctopusLink) GetTrainingTask(ctx context.Context, taskId string) (*coll
}
var task collector.Task
task.Id = jobresp.Payload.TrainJob.Id
task.Start = time.Unix(jobresp.Payload.TrainJob.StartedAt, 0).Format(constants.Layout)
task.End = time.Unix(jobresp.Payload.TrainJob.CompletedAt, 0).Format(constants.Layout)
if jobresp.Payload.TrainJob.StartedAt != 0 {
task.Start = time.Unix(jobresp.Payload.TrainJob.StartedAt, 0).Format(constants.Layout)
}
if jobresp.Payload.TrainJob.CompletedAt != 0 {
task.End = time.Unix(jobresp.Payload.TrainJob.CompletedAt, 0).Format(constants.Layout)
}
switch jobresp.Payload.TrainJob.Status {
case "succeeded":
task.Status = constants.Completed

View File

@ -521,8 +521,12 @@ func (s *ShuguangAi) GetTrainingTask(ctx context.Context, taskId string) (*colle
}
var task collector.Task
task.Id = jobresp.Data.Id
task.Start = jobresp.Data.StartTime
task.End = jobresp.Data.EndTime
if jobresp.Data.StartTime != "" {
task.Start = jobresp.Data.StartTime
}
if jobresp.Data.EndTime != "" {
task.End = jobresp.Data.EndTime
}
task.Status = jobresp.Data.Status
return &task, nil

Binary file not shown.

View File

@ -1,3 +1,6 @@
package constants
const Layout = "2006-01-02 15:04:05"
const (
Layout = "2006-01-02 15:04:05"
Layout_temp = "2006-01-02T15:04:05Z07:00"
)

View File

@ -77,3 +77,33 @@ func UnixTimeToString(ut int64) string {
return t.Format("2006-01-02 15:04:05")
}
// MillisecondsToUTCString 将毫秒时间戳转换为UTC时间的字符串表示
func MillisecondsToUTCString(milliseconds int64, layout string) string {
// 将毫秒转换为秒
timestamp := milliseconds / 1000
// 创建time.Time对象
timeObj := time.Unix(timestamp, (milliseconds%1000)*int64(time.Millisecond))
// 使用RFC3339格式返回UTC时间字符串
return timeObj.Local().Format(layout)
}
// SecondsToUTCString 将秒时间戳转换为UTC时间的字符串表示
func SecondsToUTCString(seconds int64, layout string) string {
// 创建time.Time对象
timeObj := time.Unix(seconds, 0)
// 使用RFC3339格式返回UTC时间字符串
return timeObj.Local().Format(layout)
}
// MillisecondsToAddDurationToUTCString 将毫秒时间戳加上持续时间毫秒后转换为UTC时间的字符串表示
func MillisecondsToAddDurationToUTCString(milliseconds int64, durationMilliseconds int64, layout string) string {
// 将毫秒时间戳转换为秒
timestamp := milliseconds / 1000
// 创建time.Time对象
timeObj := time.Unix(timestamp, (milliseconds%1000)*int64(time.Millisecond))
// 添加持续时间
duration := time.Duration(durationMilliseconds) * time.Millisecond
// 返回加上持续时间后的UTC时间字符串
return timeObj.Add(duration).Local().Format(layout)
}