diff --git a/api/internal/cron/aiCronTask.go b/api/internal/cron/aiCronTask.go index fdf2a767..458a243e 100644 --- a/api/internal/cron/aiCronTask.go +++ b/api/internal/cron/aiCronTask.go @@ -12,17 +12,13 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/storeLink" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" - "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-modelarts/client/imagesservice" "gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice" "gitlink.org.cn/JointCloud/pcm-octopus/octopusclient" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" "net/http" "strconv" "sync" - "time" ) const ( @@ -54,237 +50,6 @@ func GetTaskList(svc *svc.ServiceContext) ([]*types.TaskModel, error) { return list, nil } -func UpdateAiTaskStatus(svc *svc.ServiceContext, tasklist []*types.TaskModel) { - list := make([]*types.TaskModel, len(tasklist)) - copy(list, tasklist) - for i := len(list) - 1; i >= 0; i-- { - if list[i].AdapterTypeDict != "1" || list[i].Status == constants.Succeeded || list[i].Status == constants.Failed { - list = append(list[:i], list[i+1:]...) - } - } - - if len(list) == 0 { - return - } - - task := list[0] - for i := range list { - earliest, _ := time.Parse(constants.Layout, task.UpdatedTime) - latest, _ := time.Parse(constants.Layout, list[i].UpdatedTime) - if latest.Before(earliest) { - task = list[i] - } - } - - var aiTaskList []*models.TaskAi - tx := svc.DbEngin.Raw("select * from task_ai where `task_id` = ? ", task.Id).Scan(&aiTaskList) - if tx.Error != nil { - logx.Errorf(tx.Error.Error()) - return - } - - if len(aiTaskList) == 0 { - return - } - - var wg sync.WaitGroup - for _, aitask := range aiTaskList { - t := aitask - if t.Status == constants.Completed || t.Status == constants.Failed || t.JobId == "" { - continue - } - wg.Add(1) - go func() { - h := http.Request{} - trainingTask, err := svc.Scheduler.AiService.AiCollectorAdapterMap[strconv.FormatInt(t.AdapterId, 10)][strconv.FormatInt(t.ClusterId, 10)].GetTrainingTask(h.Context(), t.JobId) - if err != nil { - if status.Code(err) == codes.DeadlineExceeded { - msg := fmt.Sprintf("###UpdateAiTaskStatus###, AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error()) - logx.Errorf(errors.New(msg).Error()) - wg.Done() - return - } - - msg := fmt.Sprintf("###UpdateAiTaskStatus###, AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error()) - logx.Errorf(errors.New(msg).Error()) - wg.Done() - return - } - if trainingTask == nil { - wg.Done() - return - } - switch trainingTask.Status { - case constants.Running: - if t.Status != trainingTask.Status { - svc.Scheduler.AiStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "running", "任务运行中") - t.Status = trainingTask.Status - } - case constants.Failed: - if t.Status != trainingTask.Status { - svc.Scheduler.AiStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "failed", "任务失败") - t.Status = trainingTask.Status - } - case constants.Completed: - if t.Status != trainingTask.Status { - svc.Scheduler.AiStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "completed", "任务完成") - t.Status = trainingTask.Status - } - default: - if t.Status != trainingTask.Status { - svc.Scheduler.AiStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "pending", "任务pending") - t.Status = trainingTask.Status - } - } - t.StartTime = trainingTask.Start - t.EndTime = trainingTask.End - err = svc.Scheduler.AiStorages.UpdateAiTask(t) - if err != nil { - msg := fmt.Sprintf("###UpdateAiTaskStatus###, AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error()) - logx.Errorf(errors.New(msg).Error()) - wg.Done() - return - } - wg.Done() - }() - } - wg.Wait() -} - -func UpdateTaskStatus(svc *svc.ServiceContext, tasklist []*types.TaskModel) { - list := make([]*types.TaskModel, len(tasklist)) - copy(list, tasklist) - for i := len(list) - 1; i >= 0; i-- { - if list[i].AdapterTypeDict != "1" || list[i].Status == constants.Succeeded || list[i].Status == constants.Failed { - list = append(list[:i], list[i+1:]...) - } - } - - if len(list) == 0 { - return - } - - task := list[0] - for i := range list { - earliest, _ := time.Parse(time.RFC3339, task.UpdatedTime) - latest, _ := time.Parse(time.RFC3339, list[i].UpdatedTime) - if latest.Before(earliest) { - task = list[i] - } - } - - // Update Infer Task Status - if task.TaskTypeDict == "11" || task.TaskTypeDict == "12" { - UpdateInferTaskStatus(svc, task) - return - } - - var aiTask []*models.TaskAi - tx := svc.DbEngin.Raw("select * from task_ai where `task_id` = ? ", task.Id).Scan(&aiTask) - if tx.Error != nil { - logx.Errorf(tx.Error.Error()) - return - } - - if len(aiTask) == 0 { - tx = svc.DbEngin.Model(task).Table("task").Where("deleted_at is null").Updates(task) - if tx.Error != nil { - logx.Errorf(tx.Error.Error()) - return - } - return - } - - if len(aiTask) == 1 { - if aiTask[0].Status == constants.Completed { - task.Status = constants.Succeeded - } else { - task.Status = aiTask[0].Status - } - task.StartTime = aiTask[0].StartTime - task.EndTime = aiTask[0].EndTime - task.UpdatedTime = time.Now().Format(constants.Layout) - tx = svc.DbEngin.Model(task).Table("task").Where("deleted_at is null").Updates(task) - if tx.Error != nil { - logx.Errorf(tx.Error.Error()) - return - } - return - } - - for i := len(aiTask) - 1; i >= 0; i-- { - if aiTask[i].StartTime == "" { - task.Status = aiTask[i].Status - aiTask = append(aiTask[:i], aiTask[i+1:]...) - } - } - - if len(aiTask) == 0 { - task.UpdatedTime = time.Now().Format(constants.Layout) - tx = svc.DbEngin.Table("task").Model(task).Updates(task) - if tx.Error != nil { - logx.Errorf(tx.Error.Error()) - return - } - return - } - - start, _ := time.ParseInLocation(constants.Layout, aiTask[0].StartTime, time.Local) - end, _ := time.ParseInLocation(constants.Layout, aiTask[0].EndTime, time.Local) - - var status string - var count int - for _, a := range aiTask { - s, _ := time.ParseInLocation(constants.Layout, a.StartTime, time.Local) - e, _ := time.ParseInLocation(constants.Layout, a.EndTime, time.Local) - - if s.Before(start) { - start = s - } - - if e.After(end) { - end = e - } - - if a.Status == constants.Failed { - status = a.Status - break - } - - if a.Status == constants.Pending { - status = a.Status - continue - } - - if a.Status == constants.Running { - status = a.Status - continue - } - - if a.Status == constants.Completed { - count++ - continue - } - } - - if count == len(aiTask) { - status = constants.Succeeded - } - - if status != "" { - task.Status = status - task.StartTime = start.Format(constants.Layout) - task.EndTime = end.Format(constants.Layout) - } - - task.UpdatedTime = time.Now().Format(constants.Layout) - tx = svc.DbEngin.Table("task").Model(task).Updates(task) - if tx.Error != nil { - logx.Errorf(tx.Error.Error()) - return - } -} - func UpdateAiAdapterMaps(svc *svc.ServiceContext) { var aiType = "1" adapterIds, err := svc.Scheduler.AiStorages.GetAdapterIdsByType(aiType) @@ -483,108 +248,3 @@ func UpdateClusterResource(svc *svc.ServiceContext) { } wg.Wait() } - -func UpdateInferTaskStatus(svc *svc.ServiceContext, task *types.TaskModel) { - var aiTask []*models.TaskAi - tx := svc.DbEngin.Raw("select * from task_ai where `task_id` = ? ", task.Id).Scan(&aiTask) - if tx.Error != nil { - logx.Errorf(tx.Error.Error()) - return - } - - if len(aiTask) == 0 { - task.Status = constants.Failed - tx = svc.DbEngin.Model(task).Table("task").Where("deleted_at is null").Updates(task) - if tx.Error != nil { - logx.Errorf(tx.Error.Error()) - return - } - return - } - - if len(aiTask) == 1 { - if aiTask[0].Status == constants.Completed { - task.StartTime = aiTask[0].StartTime - task.EndTime = aiTask[0].EndTime - task.Status = constants.Succeeded - } else { - task.StartTime = aiTask[0].StartTime - task.Status = aiTask[0].Status - } - - task.UpdatedTime = time.Now().Format(constants.Layout) - tx = svc.DbEngin.Model(task).Table("task").Where("deleted_at is null").Updates(task) - if tx.Error != nil { - logx.Errorf(tx.Error.Error()) - return - } - return - } - - //for i := len(aiTask) - 1; i >= 0; i-- { - // if aiTask[i].StartTime == "" { - // task.Status = aiTask[i].Status - // aiTask = append(aiTask[:i], aiTask[i+1:]...) - // } - //} - // - //if len(aiTask) == 0 { - // task.UpdatedTime = time.Now().Format(constants.Layout) - // tx = svc.DbEngin.Table("task").Model(task).Updates(task) - // if tx.Error != nil { - // logx.Errorf(tx.Error.Error()) - // return - // } - // return - //} - - if aiTask[0].StartTime == "" { - return - } - - start, _ := time.ParseInLocation(time.RFC3339, aiTask[0].StartTime, time.Local) - end, _ := time.ParseInLocation(time.RFC3339, aiTask[0].EndTime, time.Local) - var status string - var count int - for _, a := range aiTask { - if a.Status == constants.Failed { - status = a.Status - break - } - - if a.Status == constants.Pending { - status = a.Status - continue - } - - if a.Status == constants.Running { - status = a.Status - continue - } - - if a.Status == constants.Completed { - count++ - continue - } - } - - if count == len(aiTask) { - status = constants.Succeeded - } - - if status == constants.Succeeded { - task.Status = status - task.StartTime = start.Format(time.RFC3339) - task.EndTime = end.Format(time.RFC3339) - } else { - task.Status = status - task.StartTime = start.Format(time.RFC3339) - } - - task.UpdatedTime = time.Now().Format(constants.Layout) - tx = svc.DbEngin.Table("task").Model(task).Updates(task) - if tx.Error != nil { - logx.Errorf(tx.Error.Error()) - return - } -} diff --git a/api/internal/cron/cron.go b/api/internal/cron/cron.go index d6749e1f..a571195c 100644 --- a/api/internal/cron/cron.go +++ b/api/internal/cron/cron.go @@ -16,6 +16,7 @@ package cron import ( "github.com/zeromicro/go-zero/core/logx" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/status" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" ) @@ -27,8 +28,8 @@ func AddCronGroup(svc *svc.ServiceContext) { logx.Errorf(err.Error()) return } - UpdateTaskStatus(svc, list) - UpdateAiTaskStatus(svc, list) + status.UpdateTaskStatus(svc, list) + status.UpdateAiTaskStatus(svc, list) }) svc.Cron.AddFunc("*/5 * * * * ?", func() { diff --git a/api/internal/logic/core/pagelisttasklogic.go b/api/internal/logic/core/pagelisttasklogic.go index fa56c9f0..c8bf7450 100644 --- a/api/internal/logic/core/pagelisttasklogic.go +++ b/api/internal/logic/core/pagelisttasklogic.go @@ -2,16 +2,11 @@ package core import ( "context" - "errors" - "fmt" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/status" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" - "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" - "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils/timeutils" - "strconv" - "sync" "time" "github.com/zeromicro/go-zero/core/logx" @@ -57,9 +52,8 @@ func (l *PageListTaskLogic) PageListTask(req *types.PageTaskReq) (resp *types.Pa } // 更新智算任务状态 - //chs := [2]chan struct{}{make(chan struct{}), make(chan struct{})} - //go l.updateTaskStatus(list, chs[0]) - //go l.updateAiTaskStatus(list, chs[1]) + go status.UpdateTaskStatus(l.svcCtx, list) + go status.UpdateAiTaskStatus(l.svcCtx, list) for _, model := range list { if model.StartTime != "" && model.EndTime == "" { @@ -77,221 +71,5 @@ func (l *PageListTaskLogic) PageListTask(req *types.PageTaskReq) (resp *types.Pa resp.PageNum = req.PageNum resp.Total = total - //for _, ch := range chs { - // select { - // case <-ch: - // case <-time.After(1 * time.Second): - // } - //} return } - -func (l *PageListTaskLogic) updateTaskStatus(tasklist []*types.TaskModel, ch chan<- struct{}) { - list := make([]*types.TaskModel, len(tasklist)) - copy(list, tasklist) - for i := len(list) - 1; i >= 0; i-- { - if list[i].AdapterTypeDict != "1" || list[i].Status == constants.Succeeded || list[i].Status == constants.Failed { - list = append(list[:i], list[i+1:]...) - } - } - - if len(list) == 0 { - ch <- struct{}{} - return - } - - task := list[0] - for i := range list { - earliest, _ := time.Parse(time.RFC3339, task.UpdatedTime) - latest, _ := time.Parse(time.RFC3339, list[i].UpdatedTime) - if latest.Before(earliest) { - task = list[i] - } - } - - var aiTask []*models.TaskAi - tx := l.svcCtx.DbEngin.Raw("select * from task_ai where `task_id` = ? ", task.Id).Scan(&aiTask) - if tx.Error != nil { - logx.Errorf(tx.Error.Error()) - ch <- struct{}{} - return - } - - if len(aiTask) == 0 { - tx = l.svcCtx.DbEngin.Model(task).Table("task").Where("deleted_at is null").Updates(task) - if tx.Error != nil { - logx.Errorf(tx.Error.Error()) - ch <- struct{}{} - return - } - ch <- struct{}{} - return - } - - if len(aiTask) == 1 { - if aiTask[0].Status == constants.Completed { - task.Status = constants.Succeeded - } else { - task.Status = aiTask[0].Status - } - task.StartTime = aiTask[0].StartTime - task.EndTime = aiTask[0].EndTime - task.UpdatedTime = time.Now().Format(constants.Layout) - tx = l.svcCtx.DbEngin.Model(task).Table("task").Where("deleted_at is null").Updates(task) - if tx.Error != nil { - logx.Errorf(tx.Error.Error()) - ch <- struct{}{} - return - } - ch <- struct{}{} - return - } - - for i := len(aiTask) - 1; i >= 0; i-- { - if aiTask[i].StartTime == "" { - task.Status = aiTask[i].Status - aiTask = append(aiTask[:i], aiTask[i+1:]...) - } - } - - if len(aiTask) == 0 { - task.UpdatedTime = time.Now().Format(constants.Layout) - tx = l.svcCtx.DbEngin.Table("task").Model(task).Updates(task) - if tx.Error != nil { - logx.Errorf(tx.Error.Error()) - ch <- struct{}{} - return - } - ch <- struct{}{} - return - } - - start, _ := time.ParseInLocation(constants.Layout, aiTask[0].StartTime, time.Local) - end, _ := time.ParseInLocation(constants.Layout, aiTask[0].EndTime, time.Local) - - var status string - var count int - for _, a := range aiTask { - s, _ := time.ParseInLocation(constants.Layout, a.StartTime, time.Local) - e, _ := time.ParseInLocation(constants.Layout, a.EndTime, time.Local) - - if s.Before(start) { - start = s - } - - if e.After(end) { - end = e - } - - if a.Status == constants.Failed { - status = a.Status - break - } - - if a.Status == constants.Pending { - status = a.Status - continue - } - - if a.Status == constants.Running { - status = a.Status - continue - } - - if a.Status == constants.Completed { - count++ - continue - } - } - - if count == len(aiTask) { - status = constants.Succeeded - } - - if status != "" { - task.Status = status - task.StartTime = start.Format(constants.Layout) - task.EndTime = end.Format(constants.Layout) - } - - task.UpdatedTime = time.Now().Format(constants.Layout) - tx = l.svcCtx.DbEngin.Table("task").Model(task).Updates(task) - if tx.Error != nil { - logx.Errorf(tx.Error.Error()) - ch <- struct{}{} - return - } - ch <- struct{}{} -} - -func (l *PageListTaskLogic) updateAiTaskStatus(tasklist []*types.TaskModel, ch chan<- struct{}) { - list := make([]*types.TaskModel, len(tasklist)) - copy(list, tasklist) - for i := len(list) - 1; i >= 0; i-- { - if list[i].AdapterTypeDict != "1" || list[i].Status == constants.Succeeded || list[i].Status == constants.Failed { - list = append(list[:i], list[i+1:]...) - } - } - - if len(list) == 0 { - ch <- struct{}{} - return - } - - task := list[0] - for i := range list { - earliest, _ := time.Parse(constants.Layout, task.UpdatedTime) - latest, _ := time.Parse(constants.Layout, list[i].UpdatedTime) - if latest.Before(earliest) { - task = list[i] - } - } - - var aiTaskList []*models.TaskAi - tx := l.svcCtx.DbEngin.Raw("select * from task_ai where `task_id` = ? ", task.Id).Scan(&aiTaskList) - if tx.Error != nil { - logx.Errorf(tx.Error.Error()) - ch <- struct{}{} - return - } - - if len(aiTaskList) == 0 { - ch <- struct{}{} - return - } - - var wg sync.WaitGroup - for _, aitask := range aiTaskList { - t := aitask - if t.Status == constants.Completed || t.Status == constants.Failed || t.JobId == "" { - continue - } - wg.Add(1) - go func() { - trainingTask, err := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[strconv.FormatInt(t.AdapterId, 10)][strconv.FormatInt(t.ClusterId, 10)].GetTrainingTask(l.ctx, t.JobId) - if err != nil { - msg := fmt.Sprintf("AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error()) - logx.Errorf(errors.New(msg).Error()) - wg.Done() - return - } - if trainingTask == nil { - wg.Done() - return - } - t.Status = trainingTask.Status - t.StartTime = trainingTask.Start - t.EndTime = trainingTask.End - err = l.svcCtx.Scheduler.AiStorages.UpdateAiTask(t) - if err != nil { - msg := fmt.Sprintf("AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error()) - logx.Errorf(errors.New(msg).Error()) - wg.Done() - return - } - wg.Done() - }() - } - wg.Wait() - ch <- struct{}{} -} diff --git a/api/internal/logic/inference/imageinferencelogic.go b/api/internal/logic/inference/imageinferencelogic.go index e37265ab..7a4d3d3b 100644 --- a/api/internal/logic/inference/imageinferencelogic.go +++ b/api/internal/logic/inference/imageinferencelogic.go @@ -9,6 +9,7 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "net/http" ) @@ -115,6 +116,22 @@ func (l *ImageInferenceLogic) ImageInfer(r *http.Request, req *types.ImageInfere l.svcCtx.Scheduler.AiStorages.AddNoticeInfo(opt.AdapterId, adapterName, "", "", opt.TaskName, "create", "任务创建中") + for i := len(clusters) - 1; i >= 0; i-- { + if clusters[i].Replicas == 0 { + clusters = append(clusters[:i], clusters[i+1:]...) + } + } + + //save taskai + for _, c := range clusters { + clusterName, _ := l.svcCtx.Scheduler.AiStorages.GetClusterNameById(c.ClusterId) + opt.Replica = c.Replicas + err := l.svcCtx.Scheduler.AiStorages.SaveAiTask(id, opt, adapterName, c.ClusterId, clusterName, "", constants.Saved, "") + if err != nil { + return nil, err + } + } + go l.svcCtx.Scheduler.AiService.ImageInfer(opt, id, adapterName, clusters, ts) return resp, nil diff --git a/api/internal/scheduler/database/aiStorage.go b/api/internal/scheduler/database/aiStorage.go index 9aba3e52..6db3cdac 100644 --- a/api/internal/scheduler/database/aiStorage.go +++ b/api/internal/scheduler/database/aiStorage.go @@ -125,6 +125,16 @@ func (s *AiStorage) SaveTask(name string, strategyCode int64, synergyStatus int6 return taskModel.Id, nil } +func (s *AiStorage) UpdateTask(task *types.TaskModel) error { + task.UpdatedTime = time.Now().Format(constants.Layout) + tx := s.DbEngin.Table("task").Model(task).Updates(task) + if tx.Error != nil { + logx.Errorf(tx.Error.Error()) + return tx.Error + } + return nil +} + func (s *AiStorage) SaveAiTask(taskId int64, opt option.Option, adapterName string, clusterId string, clusterName string, jobId string, status string, msg string) error { var aiOpt *option.AiOption switch (opt).(type) { diff --git a/api/internal/scheduler/service/inference/imageInfer.go b/api/internal/scheduler/service/inference/imageInfer.go index 2cf1e713..1085f919 100644 --- a/api/internal/scheduler/service/inference/imageInfer.go +++ b/api/internal/scheduler/service/inference/imageInfer.go @@ -31,11 +31,11 @@ type ImageFile struct { func Infer(opt *option.InferOption, id int64, adapterName string, clusters []*strategy.AssignedCluster, ts []*ImageFile, aiCollectorAdapterMap map[string]map[string]collector.AiCollector, storage *database.AiStorage, ctx context.Context) ([]*types.ImageResult, error) { - for i := len(clusters) - 1; i >= 0; i-- { - if clusters[i].Replicas == 0 { - clusters = append(clusters[:i], clusters[i+1:]...) - } - } + //for i := len(clusters) - 1; i >= 0; i-- { + // if clusters[i].Replicas == 0 { + // clusters = append(clusters[:i], clusters[i+1:]...) + // } + //} var wg sync.WaitGroup var cluster_ch = make(chan struct { @@ -53,15 +53,15 @@ func Infer(opt *option.InferOption, id int64, adapterName string, clusters []*st } collectorMap := aiCollectorAdapterMap[opt.AdapterId] - //save taskai - for _, c := range clusters { - clusterName, _ := storage.GetClusterNameById(c.ClusterId) - opt.Replica = c.Replicas - err := storage.SaveAiTask(id, opt, adapterName, c.ClusterId, clusterName, "", constants.Saved, "") - if err != nil { - return nil, err - } - } + ////save taskai + //for _, c := range clusters { + // clusterName, _ := storage.GetClusterNameById(c.ClusterId) + // opt.Replica = c.Replicas + // err := storage.SaveAiTask(id, opt, adapterName, c.ClusterId, clusterName, "", constants.Saved, "") + // if err != nil { + // return nil, err + // } + //} var mutex sync.Mutex errMap := make(map[string]string) diff --git a/api/internal/scheduler/service/status/taskStatusSync.go b/api/internal/scheduler/service/status/taskStatusSync.go new file mode 100644 index 00000000..52bfe922 --- /dev/null +++ b/api/internal/scheduler/service/status/taskStatusSync.go @@ -0,0 +1,337 @@ +package status + +import ( + "errors" + "fmt" + "github.com/zeromicro/go-zero/core/logx" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "net/http" + "strconv" + "sync" + "time" +) + +func UpdateTaskStatus(svc *svc.ServiceContext, tasklist []*types.TaskModel) { + list := make([]*types.TaskModel, len(tasklist)) + copy(list, tasklist) + for i := len(list) - 1; i >= 0; i-- { + if list[i].AdapterTypeDict != "1" || list[i].Status == constants.Succeeded || list[i].Status == constants.Failed { + list = append(list[:i], list[i+1:]...) + } + } + + if len(list) == 0 { + return + } + + task := list[0] + for i := range list { + earliest, _ := time.Parse(time.RFC3339, task.UpdatedTime) + latest, _ := time.Parse(time.RFC3339, list[i].UpdatedTime) + if latest.Before(earliest) { + task = list[i] + } + } + + // Update Infer Task Status + if task.TaskTypeDict == "11" || task.TaskTypeDict == "12" { + updateInferTaskStatus(svc, *task) + return + } + + aiTask, err := svc.Scheduler.AiStorages.GetAiTaskListById(task.Id) + if err != nil { + logx.Errorf(err.Error()) + return + } + + if len(aiTask) == 0 { + err := svc.Scheduler.AiStorages.UpdateTask(task) + if err != nil { + return + } + return + } + + if len(aiTask) == 1 { + if aiTask[0].Status == constants.Completed { + task.Status = constants.Succeeded + } else { + task.Status = aiTask[0].Status + } + task.StartTime = aiTask[0].StartTime + task.EndTime = aiTask[0].EndTime + err := svc.Scheduler.AiStorages.UpdateTask(task) + if err != nil { + return + } + return + } + + for i := len(aiTask) - 1; i >= 0; i-- { + if aiTask[i].StartTime == "" { + task.Status = aiTask[i].Status + aiTask = append(aiTask[:i], aiTask[i+1:]...) + } + } + + if len(aiTask) == 0 { + err := svc.Scheduler.AiStorages.UpdateTask(task) + if err != nil { + return + } + return + } + + start, _ := time.ParseInLocation(constants.Layout, aiTask[0].StartTime, time.Local) + end, _ := time.ParseInLocation(constants.Layout, aiTask[0].EndTime, time.Local) + + var status string + var count int + for _, a := range aiTask { + s, _ := time.ParseInLocation(constants.Layout, a.StartTime, time.Local) + e, _ := time.ParseInLocation(constants.Layout, a.EndTime, time.Local) + + if s.Before(start) { + start = s + } + + if e.After(end) { + end = e + } + + if a.Status == constants.Failed { + status = a.Status + break + } + + if a.Status == constants.Pending { + status = a.Status + continue + } + + if a.Status == constants.Running { + status = a.Status + continue + } + + if a.Status == constants.Completed { + count++ + continue + } + } + + if count == len(aiTask) { + status = constants.Succeeded + } + + if status != "" { + task.Status = status + task.StartTime = start.Format(constants.Layout) + task.EndTime = end.Format(constants.Layout) + } + + err = svc.Scheduler.AiStorages.UpdateTask(task) + if err != nil { + return + } +} + +func updateInferTaskStatus(svc *svc.ServiceContext, task types.TaskModel) { + aiTask, err := svc.Scheduler.AiStorages.GetAiTaskListById(task.Id) + if err != nil { + logx.Errorf(err.Error()) + return + } + + if len(aiTask) == 0 { + task.Status = constants.Failed + err = svc.Scheduler.AiStorages.UpdateTask(&task) + if err != nil { + return + } + return + } + + if len(aiTask) == 1 { + if aiTask[0].Status == constants.Completed { + task.StartTime = aiTask[0].StartTime + task.EndTime = aiTask[0].EndTime + task.Status = constants.Succeeded + } else { + task.StartTime = aiTask[0].StartTime + task.Status = aiTask[0].Status + } + + err = svc.Scheduler.AiStorages.UpdateTask(&task) + if err != nil { + return + } + return + } + + //for i := len(aiTask) - 1; i >= 0; i-- { + // if aiTask[i].StartTime == "" { + // task.Status = aiTask[i].Status + // aiTask = append(aiTask[:i], aiTask[i+1:]...) + // } + //} + // + //if len(aiTask) == 0 { + // task.UpdatedTime = time.Now().Format(constants.Layout) + // tx = svc.DbEngin.Table("task").Model(task).Updates(task) + // if tx.Error != nil { + // logx.Errorf(tx.Error.Error()) + // return + // } + // return + //} + + if aiTask[0].StartTime == "" { + return + } + + start, _ := time.ParseInLocation(time.RFC3339, aiTask[0].StartTime, time.Local) + end, _ := time.ParseInLocation(time.RFC3339, aiTask[0].EndTime, time.Local) + var status string + var count int + for _, a := range aiTask { + if a.Status == constants.Failed { + status = a.Status + break + } + + if a.Status == constants.Pending { + status = a.Status + continue + } + + if a.Status == constants.Running { + status = a.Status + continue + } + + if a.Status == constants.Completed { + count++ + continue + } + } + + if count == len(aiTask) { + status = constants.Succeeded + } + + if status == constants.Succeeded { + task.Status = status + task.StartTime = start.Format(time.RFC3339) + task.EndTime = end.Format(time.RFC3339) + } else { + task.Status = status + task.StartTime = start.Format(time.RFC3339) + } + + err = svc.Scheduler.AiStorages.UpdateTask(&task) + if err != nil { + return + } +} + +func UpdateAiTaskStatus(svc *svc.ServiceContext, tasklist []*types.TaskModel) { + list := make([]*types.TaskModel, len(tasklist)) + copy(list, tasklist) + for i := len(list) - 1; i >= 0; i-- { + if list[i].AdapterTypeDict != "1" || list[i].Status == constants.Succeeded || list[i].Status == constants.Failed { + list = append(list[:i], list[i+1:]...) + } + } + + if len(list) == 0 { + return + } + + task := list[0] + for i := range list { + earliest, _ := time.Parse(constants.Layout, task.UpdatedTime) + latest, _ := time.Parse(constants.Layout, list[i].UpdatedTime) + if latest.Before(earliest) { + task = list[i] + } + } + + aiTaskList, err := svc.Scheduler.AiStorages.GetAiTaskListById(task.Id) + if err != nil { + logx.Errorf(err.Error()) + return + } + + if len(aiTaskList) == 0 { + return + } + + var wg sync.WaitGroup + for _, aitask := range aiTaskList { + t := aitask + if t.Status == constants.Completed || t.Status == constants.Failed || t.JobId == "" { + continue + } + wg.Add(1) + go func() { + h := http.Request{} + trainingTask, err := svc.Scheduler.AiService.AiCollectorAdapterMap[strconv.FormatInt(t.AdapterId, 10)][strconv.FormatInt(t.ClusterId, 10)].GetTrainingTask(h.Context(), t.JobId) + if err != nil { + if status.Code(err) == codes.DeadlineExceeded { + msg := fmt.Sprintf("###UpdateAiTaskStatus###, AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error()) + logx.Errorf(errors.New(msg).Error()) + wg.Done() + return + } + + msg := fmt.Sprintf("###UpdateAiTaskStatus###, AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error()) + logx.Errorf(errors.New(msg).Error()) + wg.Done() + return + } + if trainingTask == nil { + wg.Done() + return + } + switch trainingTask.Status { + case constants.Running: + if t.Status != trainingTask.Status { + svc.Scheduler.AiStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "running", "任务运行中") + t.Status = trainingTask.Status + } + case constants.Failed: + if t.Status != trainingTask.Status { + svc.Scheduler.AiStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "failed", "任务失败") + t.Status = trainingTask.Status + } + case constants.Completed: + if t.Status != trainingTask.Status { + svc.Scheduler.AiStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "completed", "任务完成") + t.Status = trainingTask.Status + } + default: + if t.Status != trainingTask.Status { + svc.Scheduler.AiStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "pending", "任务pending") + t.Status = trainingTask.Status + } + } + t.StartTime = trainingTask.Start + t.EndTime = trainingTask.End + err = svc.Scheduler.AiStorages.UpdateAiTask(t) + if err != nil { + msg := fmt.Sprintf("###UpdateAiTaskStatus###, AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error()) + logx.Errorf(errors.New(msg).Error()) + wg.Done() + return + } + wg.Done() + }() + } + wg.Wait() +}