diff --git a/api/internal/logic/ai/getcenterlistlogic.go b/api/internal/logic/ai/getcenterlistlogic.go index ce9db87d..2de25ad5 100644 --- a/api/internal/logic/ai/getcenterlistlogic.go +++ b/api/internal/logic/ai/getcenterlistlogic.go @@ -2,7 +2,6 @@ package ai import ( "context" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" @@ -24,7 +23,21 @@ func NewGetCenterListLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Get } func (l *GetCenterListLogic) GetCenterList() (resp *types.CenterListResp, err error) { - // todo: add your logic here and delete this line + resp = &types.CenterListResp{} - return + adapterList, err := l.svcCtx.Scheduler.AiStorages.GetAdaptersByType("1") + if err != nil { + return nil, err + } + + for _, adapter := range adapterList { + a := &types.AiCenter{ + Name: adapter.Name, + StackName: adapter.Nickname, + Version: adapter.Version, + } + resp.List = append(resp.List, a) + } + + return resp, nil } diff --git a/api/internal/logic/ai/getcenteroverviewlogic.go b/api/internal/logic/ai/getcenteroverviewlogic.go index 0de00f18..9f76979d 100644 --- a/api/internal/logic/ai/getcenteroverviewlogic.go +++ b/api/internal/logic/ai/getcenteroverviewlogic.go @@ -2,7 +2,6 @@ package ai import ( "context" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" @@ -24,7 +23,46 @@ func NewGetCenterOverviewLogic(ctx context.Context, svcCtx *svc.ServiceContext) } func (l *GetCenterOverviewLogic) GetCenterOverview() (resp *types.CenterOverviewResp, err error) { - // todo: add your logic here and delete this line + resp = &types.CenterOverviewResp{} - return + var centerNum int32 + var taskNum int32 + var cardNum int32 + var totalTops float64 + + adapterList, err := l.svcCtx.Scheduler.AiStorages.GetAdaptersByType("1") + if err != nil { + return nil, err + } + centerNum = int32(len(adapterList)) + resp.CenterNum = centerNum + + for _, adapter := range adapterList { + taskList, err := l.svcCtx.Scheduler.AiStorages.GetAiTasksByAdapterId(adapter.Id) + if err != nil { + continue + } + taskNum += int32(len(taskList)) + } + resp.TaskNum = taskNum + + for _, adapter := range adapterList { + clusters, err := l.svcCtx.Scheduler.AiStorages.GetClustersByAdapterId(adapter.Id) + if err != nil { + continue + } + for _, cluster := range clusters.List { + clusterResource, err := l.svcCtx.Scheduler.AiStorages.GetClusterResourcesById(cluster.Id) + if err != nil { + continue + } + cardNum += int32(clusterResource.CardTotal) + totalTops += clusterResource.CardTopsTotal + } + } + + resp.CardNum = centerNum + resp.PowerInTops = totalTops + + return resp, nil } diff --git a/api/internal/logic/ai/getcenterqueueinglogic.go b/api/internal/logic/ai/getcenterqueueinglogic.go index 6ff23825..bd5e5e2b 100644 --- a/api/internal/logic/ai/getcenterqueueinglogic.go +++ b/api/internal/logic/ai/getcenterqueueinglogic.go @@ -2,6 +2,7 @@ package ai import ( "context" + "sort" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" @@ -24,7 +25,46 @@ func NewGetCenterQueueingLogic(ctx context.Context, svcCtx *svc.ServiceContext) } func (l *GetCenterQueueingLogic) GetCenterQueueing() (resp *types.CenterQueueingResp, err error) { - // todo: add your logic here and delete this line + resp = &types.CenterQueueingResp{} - return + adapters, err := l.svcCtx.Scheduler.AiStorages.GetAdaptersByType("1") + if err != nil { + return nil, err + } + + for _, adapter := range adapters { + clusters, err := l.svcCtx.Scheduler.AiStorages.GetClustersByAdapterId(adapter.Id) + if err != nil { + continue + } + for _, cluster := range clusters.List { + queues, err := l.svcCtx.Scheduler.AiStorages.GetClusterTaskQueues(adapter.Id, cluster.Id) + if err != nil { + continue + } + //todo sync current task queues + current := &types.CenterQueue{ + Name: cluster.Name, + QueueingNum: int32(queues[0].QueueNum), + } + history := &types.CenterQueue{ + Name: cluster.Name, + QueueingNum: int32(queues[0].QueueNum), + } + resp.Current = append(resp.Current, current) + resp.History = append(resp.History, history) + + } + } + + sortQueueingNum(resp.Current) + sortQueueingNum(resp.History) + + return resp, nil +} + +func sortQueueingNum(q []*types.CenterQueue) { + sort.Slice(q, func(i, j int) bool { + return q[i].QueueingNum > q[j].QueueingNum + }) } diff --git a/api/internal/logic/ai/getcentertasklistlogic.go b/api/internal/logic/ai/getcentertasklistlogic.go index 96242e9b..0a800630 100644 --- a/api/internal/logic/ai/getcentertasklistlogic.go +++ b/api/internal/logic/ai/getcentertasklistlogic.go @@ -2,6 +2,8 @@ package ai import ( "context" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" + "time" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" @@ -15,6 +17,8 @@ type GetCenterTaskListLogic struct { svcCtx *svc.ServiceContext } +const layout = "2006-01-02 15:04:05" + func NewGetCenterTaskListLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetCenterTaskListLogic { return &GetCenterTaskListLogic{ Logger: logx.WithContext(ctx), @@ -24,7 +28,36 @@ func NewGetCenterTaskListLogic(ctx context.Context, svcCtx *svc.ServiceContext) } func (l *GetCenterTaskListLogic) GetCenterTaskList() (resp *types.CenterTaskListResp, err error) { - // todo: add your logic here and delete this line + resp = &types.CenterTaskListResp{} - return + adapterList, err := l.svcCtx.Scheduler.AiStorages.GetAdaptersByType("1") + if err != nil { + return nil, err + } + + for _, adapter := range adapterList { + taskList, err := l.svcCtx.Scheduler.AiStorages.GetAiTasksByAdapterId(adapter.Id) + if err != nil { + continue + } + for _, task := range taskList { + var elapsed time.Duration + start, _ := time.Parse(layout, task.CommitTime) + if task.Status != constants.Completed { + elapsed = start.Sub(time.Now()) + } else { + end, _ := time.Parse(layout, task.EndTime) + elapsed = start.Sub(end) + } + + t := &types.AiTask{ + Name: task.Name, + Status: task.Status, + TimeElapsed: int32(elapsed.Seconds()), + } + resp.List = append(resp.List, t) + } + } + + return resp, nil } diff --git a/api/internal/logic/schedule/schedulesubmitlogic.go b/api/internal/logic/schedule/schedulesubmitlogic.go index 183699f2..e46ffe7d 100644 --- a/api/internal/logic/schedule/schedulesubmitlogic.go +++ b/api/internal/logic/schedule/schedulesubmitlogic.go @@ -29,7 +29,9 @@ func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *type resp = &types.ScheduleResp{} opt := &option.AiOption{ AdapterId: req.AiOption.AdapterId, + TaskName: req.AiOption.TaskName, ResourceType: req.AiOption.ResourceType, + Replica: 1, Tops: req.AiOption.Tops, TaskType: req.AiOption.TaskType, DatasetsName: req.AiOption.Datasets, diff --git a/api/internal/scheduler/database/aiStorage.go b/api/internal/scheduler/database/aiStorage.go index 8efb98b8..c458c622 100644 --- a/api/internal/scheduler/database/aiStorage.go +++ b/api/internal/scheduler/database/aiStorage.go @@ -50,9 +50,20 @@ func (s *AiStorage) GetAdapterIdsByType(adapterType string) ([]string, error) { return ids, nil } -func (s *AiStorage) GetAiTasks() ([]*types.AiTaskDb, error) { +func (s *AiStorage) GetAdaptersByType(adapterType string) ([]*types.AdapterInfo, error) { + var list []*types.AdapterInfo + db := s.DbEngin.Model(&types.AdapterInfo{}).Table("t_adapter") + db = db.Where("type = ?", adapterType) + err := db.Order("create_time desc").Find(&list).Error + if err != nil { + return nil, err + } + return list, nil +} + +func (s *AiStorage) GetAiTasksByAdapterId(adapterId string) ([]*types.AiTaskDb, error) { var resp []*types.AiTaskDb - tx := s.DbEngin.Raw("select * from task_ai").Scan(&resp) + tx := s.DbEngin.Raw("select * from task_ai where `adapter_id` = ? ", adapterId).Scan(&resp) if tx.Error != nil { logx.Errorf(tx.Error.Error()) return nil, tx.Error @@ -93,6 +104,7 @@ func (s *AiStorage) SaveAiTask(taskId int64, option *option.AiOption, clusterId Name: option.TaskName, Replica: option.Replica, JobId: jobId, + TaskType: option.TaskType, Strategy: option.StrategyName, Status: status, Msg: msg, @@ -106,6 +118,37 @@ func (s *AiStorage) SaveAiTask(taskId int64, option *option.AiOption, clusterId return nil } +func (s *AiStorage) SaveClusterTaskQueue(adapterId string, clusterId string, queueNum int64) error { + aId, err := strconv.ParseInt(adapterId, 10, 64) + if err != nil { + return err + } + cId, err := strconv.ParseInt(clusterId, 10, 64) + if err != nil { + return err + } + taskQueue := models.TClusterTaskQueue{ + AdapterId: aId, + ClusterId: cId, + QueueNum: queueNum, + } + tx := s.DbEngin.Create(&taskQueue) + if tx.Error != nil { + return tx.Error + } + return nil +} + +func (s *AiStorage) GetClusterTaskQueues(adapterId string, clusterId string) ([]*models.TClusterTaskQueue, error) { + var taskQueues []*models.TClusterTaskQueue + tx := s.DbEngin.Raw("select * from t_cluster_task_queue where `adapter_id` = ? and `cluster_id` = ?", adapterId, clusterId).Scan(&taskQueues) + if tx.Error != nil { + logx.Errorf(tx.Error.Error()) + return nil, tx.Error + } + return taskQueues, nil +} + func (s *AiStorage) GetAiTaskIdByClusterIdAndTaskId(clusterId string, taskId string) (string, error) { var aiTask models.TaskAi tx := s.DbEngin.Raw("select * from task_ai where `cluster_id` = ? and `task_id` = ?", clusterId, taskId).Scan(&aiTask) @@ -116,6 +159,16 @@ func (s *AiStorage) GetAiTaskIdByClusterIdAndTaskId(clusterId string, taskId str return aiTask.JobId, nil } +func (s *AiStorage) GetClusterResourcesById(clusterId string) (*models.TClusterResource, error) { + var clusterResource models.TClusterResource + tx := s.DbEngin.Raw("select * from t_cluster_resource where `cluster_id` = ?", clusterId).Scan(&clusterResource) + if tx.Error != nil { + logx.Errorf(tx.Error.Error()) + return nil, tx.Error + } + return &clusterResource, nil +} + func (s *AiStorage) UpdateTask() error { return nil }