From b0de44251bb3a31312b2e3ecd7d1c97fb50db176 Mon Sep 17 00:00:00 2001 From: tzwang Date: Fri, 7 Jun 2024 18:52:24 +0800 Subject: [PATCH] fix noticeinfo bugs Former-commit-id: fb2dfbd4d7f0beafd92a783868504536f7cda644 --- api/internal/cron/aiCronTask.go | 23 ++++++++++- .../logic/schedule/schedulesubmitlogic.go | 12 +++++- api/internal/scheduler/database/aiStorage.go | 39 ++++++++++++++++++- .../scheduler/schedulers/aiScheduler.go | 14 +++++-- 4 files changed, 81 insertions(+), 7 deletions(-) diff --git a/api/internal/cron/aiCronTask.go b/api/internal/cron/aiCronTask.go index 80a9d877..b325e6be 100644 --- a/api/internal/cron/aiCronTask.go +++ b/api/internal/cron/aiCronTask.go @@ -114,7 +114,28 @@ func UpdateAiTaskStatus(svc *svc.ServiceContext, tasklist []*types.TaskModel) { wg.Done() return } - t.Status = trainingTask.Status + switch trainingTask.Status { + case constants.Running: + if t.Status != trainingTask.Status { + svc.Scheduler.AiStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "running", "任务运行中") + t.Status = trainingTask.Status + } + case constants.Failed: + if t.Status != trainingTask.Status { + svc.Scheduler.AiStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "failed", "任务失败") + t.Status = trainingTask.Status + } + case constants.Completed: + if t.Status != trainingTask.Status { + svc.Scheduler.AiStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "completed", "任务完成") + t.Status = trainingTask.Status + } + default: + if t.Status != trainingTask.Status { + svc.Scheduler.AiStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "pending", "任务pending") + t.Status = trainingTask.Status + } + } t.StartTime = trainingTask.Start t.EndTime = trainingTask.End err = svc.Scheduler.AiStorages.UpdateAiTask(t) diff --git a/api/internal/logic/schedule/schedulesubmitlogic.go b/api/internal/logic/schedule/schedulesubmitlogic.go index 5ce7a6ee..64a1fbc4 100644 --- a/api/internal/logic/schedule/schedulesubmitlogic.go +++ b/api/internal/logic/schedule/schedulesubmitlogic.go @@ -64,7 +64,13 @@ func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *type synergystatus = 1 } strategyCode, err := l.svcCtx.Scheduler.AiStorages.GetStrategyCode(req.AiOption.Strategy) - + if err != nil { + return nil, err + } + adapterName, err := l.svcCtx.Scheduler.AiStorages.GetAdapterNameById(rs[0].AdapterId) + if err != nil { + return nil, err + } id, err := l.svcCtx.Scheduler.AiStorages.SaveTask(req.AiOption.TaskName, strategyCode, synergystatus) if err != nil { return nil, err @@ -84,11 +90,13 @@ func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *type clusterName, _ := l.svcCtx.Scheduler.AiStorages.GetClusterNameById(r.ClusterId) - err := l.svcCtx.Scheduler.AiStorages.SaveAiTask(id, opt, r.ClusterId, clusterName, r.JobId, constants.Saved, r.Msg) + err := l.svcCtx.Scheduler.AiStorages.SaveAiTask(id, opt, adapterName, r.ClusterId, clusterName, r.JobId, constants.Saved, r.Msg) if err != nil { return nil, err } + l.svcCtx.Scheduler.AiStorages.AddNoticeInfo(r.AdapterId, adapterName, r.ClusterId, clusterName, r.TaskName, "create", "任务创建中") + resp.Results = append(resp.Results, scheResult) } diff --git a/api/internal/scheduler/database/aiStorage.go b/api/internal/scheduler/database/aiStorage.go index 3f0b3f78..68e11316 100644 --- a/api/internal/scheduler/database/aiStorage.go +++ b/api/internal/scheduler/database/aiStorage.go @@ -2,6 +2,7 @@ package database import ( "github.com/zeromicro/go-zero/core/logx" + clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/api/client" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" @@ -46,6 +47,16 @@ func (s *AiStorage) GetClusterNameById(id string) (string, error) { return name, nil } +func (s *AiStorage) GetAdapterNameById(id string) (string, error) { + var name string + tx := s.DbEngin.Raw("select `name` from t_adapter where `id` = ?", id).Scan(&name) + if tx.Error != nil { + logx.Errorf(tx.Error.Error()) + return "", tx.Error + } + return name, nil +} + func (s *AiStorage) GetAdapterIdsByType(adapterType string) ([]string, error) { var list []types.AdapterInfo var ids []string @@ -102,7 +113,7 @@ func (s *AiStorage) SaveTask(name string, strategyCode int64, synergyStatus int6 return taskModel.Id, nil } -func (s *AiStorage) SaveAiTask(taskId int64, option *option.AiOption, clusterId string, clusterName string, jobId string, status string, msg string) error { +func (s *AiStorage) SaveAiTask(taskId int64, option *option.AiOption, adapterName string, clusterId string, clusterName string, jobId string, status string, msg string) error { // 构建主任务结构体 aId, err := strconv.ParseInt(option.AdapterId, 10, 64) if err != nil { @@ -116,6 +127,7 @@ func (s *AiStorage) SaveAiTask(taskId int64, option *option.AiOption, clusterId aiTaskModel := models.TaskAi{ TaskId: taskId, AdapterId: aId, + AdapterName: adapterName, ClusterId: cId, ClusterName: clusterName, Name: option.TaskName, @@ -281,3 +293,28 @@ func (s *AiStorage) GetStrategyCode(name string) (int64, error) { } return strategy, nil } + +func (s *AiStorage) AddNoticeInfo(adapterId string, adapterName string, clusterId string, clusterName string, taskName string, noticeType string, incident string) { + aId, err := strconv.ParseInt(adapterId, 10, 64) + if err != nil { + return + } + cId, err := strconv.ParseInt(clusterId, 10, 64) + if err != nil { + return + } + noticeInfo := clientCore.NoticeInfo{ + AdapterId: aId, + AdapterName: adapterName, + ClusterId: cId, + ClusterName: clusterName, + NoticeType: noticeType, + TaskName: taskName, + Incident: incident, + CreatedTime: time.Now(), + } + result := s.DbEngin.Table("t_notice").Create(¬iceInfo) + if result.Error != nil { + logx.Errorf("Task creation failure, err: %v", result.Error) + } +} diff --git a/api/internal/scheduler/schedulers/aiScheduler.go b/api/internal/scheduler/schedulers/aiScheduler.go index 7979f966..201f565d 100644 --- a/api/internal/scheduler/schedulers/aiScheduler.go +++ b/api/internal/scheduler/schedulers/aiScheduler.go @@ -45,6 +45,8 @@ type AiScheduler struct { } type AiResult struct { + AdapterId string + TaskName string JobId string ClusterId string Strategy string @@ -190,6 +192,8 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa result, _ = convertType(resp) mu.Unlock() + result.AdapterId = opt.AdapterId + result.TaskName = opt.TaskName result.Replica = c.Replicas result.ClusterId = c.ClusterId result.Strategy = as.option.StrategyName @@ -222,6 +226,10 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa if err != nil { return nil, errors.New("database add failed: " + err.Error()) } + adapterName, err := as.AiStorages.GetAdapterNameById(as.option.AdapterId) + if err != nil { + return nil, err + } var errmsg string for _, err := range errs { @@ -234,7 +242,7 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa clusterName, _ := as.AiStorages.GetClusterNameById(e.clusterId) - err := as.AiStorages.SaveAiTask(taskId, as.option, e.clusterId, clusterName, "", constants.Failed, msg) + err := as.AiStorages.SaveAiTask(taskId, as.option, adapterName, e.clusterId, clusterName, "", constants.Failed, msg) if err != nil { return nil, errors.New("database add failed: " + err.Error()) } @@ -246,14 +254,14 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa if s.Msg != "" { msg := fmt.Sprintf("clusterId: %v , error: %v \n", s.ClusterId, s.Msg) errmsg += msg - err := as.AiStorages.SaveAiTask(taskId, as.option, s.ClusterId, clusterName, "", constants.Failed, msg) + err := as.AiStorages.SaveAiTask(taskId, as.option, adapterName, s.ClusterId, clusterName, "", constants.Failed, msg) if err != nil { return nil, errors.New("database add failed: " + err.Error()) } } else { msg := fmt.Sprintf("clusterId: %v , submitted successfully, jobId: %v \n", s.ClusterId, s.JobId) errmsg += msg - err := as.AiStorages.SaveAiTask(taskId, as.option, s.ClusterId, clusterName, s.JobId, constants.Saved, msg) + err := as.AiStorages.SaveAiTask(taskId, as.option, adapterName, s.ClusterId, clusterName, s.JobId, constants.Saved, msg) if err != nil { return nil, errors.New("database add failed: " + err.Error()) }