From 60b726b08c6c6a6e2c6285c43eaf990e94a0851f Mon Sep 17 00:00:00 2001 From: tzwang Date: Tue, 30 Apr 2024 15:10:18 +0800 Subject: [PATCH 1/4] added task queue db model Former-commit-id: 7b30cf9bb2f39291599f1516bc2c118698bd840b --- pkg/models/taskaimodel_gen.go | 7 +- pkg/models/tclustertaskqueuemodel.go | 24 ++++++ pkg/models/tclustertaskqueuemodel_gen.go | 95 ++++++++++++++++++++++++ 3 files changed, 123 insertions(+), 3 deletions(-) create mode 100644 pkg/models/tclustertaskqueuemodel.go create mode 100644 pkg/models/tclustertaskqueuemodel_gen.go diff --git a/pkg/models/taskaimodel_gen.go b/pkg/models/taskaimodel_gen.go index ab0c5502..34ecd67b 100644 --- a/pkg/models/taskaimodel_gen.go +++ b/pkg/models/taskaimodel_gen.go @@ -49,6 +49,7 @@ type ( CommitTime time.Time `db:"commit_time"` // 提交时间 StartTime string `db:"start_time"` // 开始时间 EndTime string `db:"end_time"` // 结束时间 + TaskType string `db:"task_type"` } ) @@ -87,14 +88,14 @@ func (m *defaultTaskAiModel) FindOne(ctx context.Context, id int64) (*TaskAi, er } func (m *defaultTaskAiModel) Insert(ctx context.Context, data *TaskAi) (sql.Result, error) { - query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, taskAiRowsExpectAutoSet) - ret, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.AdapterId, data.ClusterId, data.Name, data.Replica, data.JobId, data.Strategy, data.Status, data.Msg, data.CommitTime, data.StartTime, data.EndTime) + query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, taskAiRowsExpectAutoSet) + ret, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.AdapterId, data.ClusterId, data.Name, data.Replica, data.JobId, data.Strategy, data.Status, data.Msg, data.CommitTime, data.StartTime, data.EndTime, data.TaskType) return ret, err } func (m *defaultTaskAiModel) Update(ctx context.Context, data *TaskAi) error { query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, taskAiRowsWithPlaceHolder) - _, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.AdapterId, data.ClusterId, data.Name, data.Replica, data.JobId, data.Strategy, data.Status, data.Msg, data.CommitTime, data.StartTime, data.EndTime, data.Id) + _, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.AdapterId, data.ClusterId, data.Name, data.Replica, data.JobId, data.Strategy, data.Status, data.Msg, data.CommitTime, data.StartTime, data.EndTime, data.TaskType, data.Id) return err } diff --git a/pkg/models/tclustertaskqueuemodel.go b/pkg/models/tclustertaskqueuemodel.go new file mode 100644 index 00000000..35e52522 --- /dev/null +++ b/pkg/models/tclustertaskqueuemodel.go @@ -0,0 +1,24 @@ +package models + +import "github.com/zeromicro/go-zero/core/stores/sqlx" + +var _ TClusterTaskQueueModel = (*customTClusterTaskQueueModel)(nil) + +type ( + // TClusterTaskQueueModel is an interface to be customized, add more methods here, + // and implement the added methods in customTClusterTaskQueueModel. + TClusterTaskQueueModel interface { + tClusterTaskQueueModel + } + + customTClusterTaskQueueModel struct { + *defaultTClusterTaskQueueModel + } +) + +// NewTClusterTaskQueueModel returns a model for the database table. +func NewTClusterTaskQueueModel(conn sqlx.SqlConn) TClusterTaskQueueModel { + return &customTClusterTaskQueueModel{ + defaultTClusterTaskQueueModel: newTClusterTaskQueueModel(conn), + } +} diff --git a/pkg/models/tclustertaskqueuemodel_gen.go b/pkg/models/tclustertaskqueuemodel_gen.go new file mode 100644 index 00000000..9113c7dc --- /dev/null +++ b/pkg/models/tclustertaskqueuemodel_gen.go @@ -0,0 +1,95 @@ +// Code generated by goctl. DO NOT EDIT. + +package models + +import ( + "context" + "database/sql" + "fmt" + "strings" + "time" + + "github.com/zeromicro/go-zero/core/stores/builder" + "github.com/zeromicro/go-zero/core/stores/sqlc" + "github.com/zeromicro/go-zero/core/stores/sqlx" + "github.com/zeromicro/go-zero/core/stringx" +) + +var ( + tClusterTaskQueueFieldNames = builder.RawFieldNames(&TClusterTaskQueue{}) + tClusterTaskQueueRows = strings.Join(tClusterTaskQueueFieldNames, ",") + tClusterTaskQueueRowsExpectAutoSet = strings.Join(stringx.Remove(tClusterTaskQueueFieldNames, "`id`", "`create_at`", "`create_time`", "`created_at`", "`update_at`", "`update_time`", "`updated_at`"), ",") + tClusterTaskQueueRowsWithPlaceHolder = strings.Join(stringx.Remove(tClusterTaskQueueFieldNames, "`id`", "`create_at`", "`create_time`", "`created_at`", "`update_at`", "`update_time`", "`updated_at`"), "=?,") + "=?" +) + +type ( + tClusterTaskQueueModel interface { + Insert(ctx context.Context, data *TClusterTaskQueue) (sql.Result, error) + FindOne(ctx context.Context, id int64) (*TClusterTaskQueue, error) + Update(ctx context.Context, data *TClusterTaskQueue) error + Delete(ctx context.Context, id int64) error + } + + defaultTClusterTaskQueueModel struct { + conn sqlx.SqlConn + table string + } + + TClusterTaskQueue struct { + Id int64 `db:"id"` // id + AdapterId int64 `db:"adapter_id"` // 适配器id + ClusterId int64 `db:"cluster_id"` // 集群id + QueueNum sql.NullInt64 `db:"queue_num"` // 任务排队数量 + Date time.Time `db:"date"` + } +) + +func newTClusterTaskQueueModel(conn sqlx.SqlConn) *defaultTClusterTaskQueueModel { + return &defaultTClusterTaskQueueModel{ + conn: conn, + table: "`t_cluster_task_queue`", + } +} + +func (m *defaultTClusterTaskQueueModel) withSession(session sqlx.Session) *defaultTClusterTaskQueueModel { + return &defaultTClusterTaskQueueModel{ + conn: sqlx.NewSqlConnFromSession(session), + table: "`t_cluster_task_queue`", + } +} + +func (m *defaultTClusterTaskQueueModel) Delete(ctx context.Context, id int64) error { + query := fmt.Sprintf("delete from %s where `id` = ?", m.table) + _, err := m.conn.ExecCtx(ctx, query, id) + return err +} + +func (m *defaultTClusterTaskQueueModel) FindOne(ctx context.Context, id int64) (*TClusterTaskQueue, error) { + query := fmt.Sprintf("select %s from %s where `id` = ? limit 1", tClusterTaskQueueRows, m.table) + var resp TClusterTaskQueue + err := m.conn.QueryRowCtx(ctx, &resp, query, id) + switch err { + case nil: + return &resp, nil + case sqlc.ErrNotFound: + return nil, ErrNotFound + default: + return nil, err + } +} + +func (m *defaultTClusterTaskQueueModel) Insert(ctx context.Context, data *TClusterTaskQueue) (sql.Result, error) { + query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?)", m.table, tClusterTaskQueueRowsExpectAutoSet) + ret, err := m.conn.ExecCtx(ctx, query, data.AdapterId, data.ClusterId, data.QueueNum, data.Date) + return ret, err +} + +func (m *defaultTClusterTaskQueueModel) Update(ctx context.Context, data *TClusterTaskQueue) error { + query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, tClusterTaskQueueRowsWithPlaceHolder) + _, err := m.conn.ExecCtx(ctx, query, data.AdapterId, data.ClusterId, data.QueueNum, data.Date, data.Id) + return err +} + +func (m *defaultTClusterTaskQueueModel) tableName() string { + return m.table +} From 2722f689b63e069affb977568de42c72fbfaa4b8 Mon Sep 17 00:00:00 2001 From: tzwang Date: Tue, 30 Apr 2024 15:33:09 +0800 Subject: [PATCH 2/4] updated task queue db model Former-commit-id: 76d76519039f64900c846abe1e8269ac37a2eb97 --- pkg/models/tclustertaskqueuemodel_gen.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/models/tclustertaskqueuemodel_gen.go b/pkg/models/tclustertaskqueuemodel_gen.go index 9113c7dc..f40a857c 100644 --- a/pkg/models/tclustertaskqueuemodel_gen.go +++ b/pkg/models/tclustertaskqueuemodel_gen.go @@ -36,11 +36,11 @@ type ( } TClusterTaskQueue struct { - Id int64 `db:"id"` // id - AdapterId int64 `db:"adapter_id"` // 适配器id - ClusterId int64 `db:"cluster_id"` // 集群id - QueueNum sql.NullInt64 `db:"queue_num"` // 任务排队数量 - Date time.Time `db:"date"` + Id int64 `db:"id"` // id + AdapterId int64 `db:"adapter_id"` // 适配器id + ClusterId int64 `db:"cluster_id"` // 集群id + QueueNum int64 `db:"queue_num"` // 任务排队数量 + Date time.Time `db:"date"` } ) From 18cadd528dfc6d7807c94b480b5ed58c379bd450 Mon Sep 17 00:00:00 2001 From: tzwang Date: Tue, 30 Apr 2024 16:18:14 +0800 Subject: [PATCH 3/4] updated cluster resource db model Former-commit-id: 2904365c03e2bf8ae9475dd6eed360c8106af165 --- pkg/models/tclusterresourcemodel_gen.go | 37 +++++++++++++++---------- 1 file changed, 23 insertions(+), 14 deletions(-) diff --git a/pkg/models/tclusterresourcemodel_gen.go b/pkg/models/tclusterresourcemodel_gen.go index bf4068e6..1bb40107 100644 --- a/pkg/models/tclusterresourcemodel_gen.go +++ b/pkg/models/tclusterresourcemodel_gen.go @@ -35,17 +35,19 @@ type ( } TClusterResource struct { - ClusterId int64 `db:"cluster_id"` - ClusterName string `db:"cluster_name"` - ClusterType int64 `db:"cluster_type"` // 类型0->容器,1->智算,2->超算,3-虚拟机 - CpuAvail float64 `db:"cpu_avail"` - CpuTotal float64 `db:"cpu_total"` - MemAvail float64 `db:"mem_avail"` - MemTotal float64 `db:"mem_total"` - DiskAvail float64 `db:"disk_avail"` - DiskTotal float64 `db:"disk_total"` - GpuAvail float64 `db:"gpu_avail"` - GpuTotal float64 `db:"gpu_total"` + ClusterId int64 `db:"cluster_id"` + ClusterName string `db:"cluster_name"` + ClusterType int64 `db:"cluster_type"` // 类型0->容器,1->智算,2->超算,3-虚拟机 + CpuAvail float64 `db:"cpu_avail"` + CpuTotal float64 `db:"cpu_total"` + MemAvail float64 `db:"mem_avail"` + MemTotal float64 `db:"mem_total"` + DiskAvail float64 `db:"disk_avail"` + DiskTotal float64 `db:"disk_total"` + GpuAvail float64 `db:"gpu_avail"` + GpuTotal float64 `db:"gpu_total"` + CardTotal int64 `db:"card_total"` // 算力卡数量 + CardTopsTotal float64 `db:"card_tops_total"` // 算力总量tops } ) @@ -56,6 +58,13 @@ func newTClusterResourceModel(conn sqlx.SqlConn) *defaultTClusterResourceModel { } } +func (m *defaultTClusterResourceModel) withSession(session sqlx.Session) *defaultTClusterResourceModel { + return &defaultTClusterResourceModel{ + conn: sqlx.NewSqlConnFromSession(session), + table: "`t_cluster_resource`", + } +} + func (m *defaultTClusterResourceModel) Delete(ctx context.Context, clusterId int64) error { query := fmt.Sprintf("delete from %s where `cluster_id` = ?", m.table) _, err := m.conn.ExecCtx(ctx, query, clusterId) @@ -77,14 +86,14 @@ func (m *defaultTClusterResourceModel) FindOne(ctx context.Context, clusterId in } func (m *defaultTClusterResourceModel) Insert(ctx context.Context, data *TClusterResource) (sql.Result, error) { - query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, tClusterResourceRowsExpectAutoSet) - ret, err := m.conn.ExecCtx(ctx, query, data.ClusterId, data.ClusterName, data.ClusterType, data.CpuAvail, data.CpuTotal, data.MemAvail, data.MemTotal, data.DiskAvail, data.DiskTotal, data.GpuAvail, data.GpuTotal) + query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, tClusterResourceRowsExpectAutoSet) + ret, err := m.conn.ExecCtx(ctx, query, data.ClusterId, data.ClusterName, data.ClusterType, data.CpuAvail, data.CpuTotal, data.MemAvail, data.MemTotal, data.DiskAvail, data.DiskTotal, data.GpuAvail, data.GpuTotal, data.CardTotal, data.CardTopsTotal) return ret, err } func (m *defaultTClusterResourceModel) Update(ctx context.Context, data *TClusterResource) error { query := fmt.Sprintf("update %s set %s where `cluster_id` = ?", m.table, tClusterResourceRowsWithPlaceHolder) - _, err := m.conn.ExecCtx(ctx, query, data.ClusterName, data.ClusterType, data.CpuAvail, data.CpuTotal, data.MemAvail, data.MemTotal, data.DiskAvail, data.DiskTotal, data.GpuAvail, data.GpuTotal, data.ClusterId) + _, err := m.conn.ExecCtx(ctx, query, data.ClusterName, data.ClusterType, data.CpuAvail, data.CpuTotal, data.MemAvail, data.MemTotal, data.DiskAvail, data.DiskTotal, data.GpuAvail, data.GpuTotal, data.CardTotal, data.CardTopsTotal, data.ClusterId) return err } From 77010121367a8d6d0b8bb0ed76d60aa78f9c175f Mon Sep 17 00:00:00 2001 From: tzwang Date: Tue, 30 Apr 2024 16:28:51 +0800 Subject: [PATCH 4/4] updated ai overview functions Former-commit-id: b6896236e3ab63aeeaf97337eb866022fb8fb383 --- api/internal/logic/ai/getcenterlistlogic.go | 19 ++++++- .../logic/ai/getcenteroverviewlogic.go | 44 +++++++++++++- .../logic/ai/getcenterqueueinglogic.go | 44 +++++++++++++- .../logic/ai/getcentertasklistlogic.go | 37 +++++++++++- .../logic/schedule/schedulesubmitlogic.go | 2 + api/internal/scheduler/database/aiStorage.go | 57 ++++++++++++++++++- 6 files changed, 191 insertions(+), 12 deletions(-) diff --git a/api/internal/logic/ai/getcenterlistlogic.go b/api/internal/logic/ai/getcenterlistlogic.go index ce9db87d..2de25ad5 100644 --- a/api/internal/logic/ai/getcenterlistlogic.go +++ b/api/internal/logic/ai/getcenterlistlogic.go @@ -2,7 +2,6 @@ package ai import ( "context" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" @@ -24,7 +23,21 @@ func NewGetCenterListLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Get } func (l *GetCenterListLogic) GetCenterList() (resp *types.CenterListResp, err error) { - // todo: add your logic here and delete this line + resp = &types.CenterListResp{} - return + adapterList, err := l.svcCtx.Scheduler.AiStorages.GetAdaptersByType("1") + if err != nil { + return nil, err + } + + for _, adapter := range adapterList { + a := &types.AiCenter{ + Name: adapter.Name, + StackName: adapter.Nickname, + Version: adapter.Version, + } + resp.List = append(resp.List, a) + } + + return resp, nil } diff --git a/api/internal/logic/ai/getcenteroverviewlogic.go b/api/internal/logic/ai/getcenteroverviewlogic.go index 0de00f18..9f76979d 100644 --- a/api/internal/logic/ai/getcenteroverviewlogic.go +++ b/api/internal/logic/ai/getcenteroverviewlogic.go @@ -2,7 +2,6 @@ package ai import ( "context" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" @@ -24,7 +23,46 @@ func NewGetCenterOverviewLogic(ctx context.Context, svcCtx *svc.ServiceContext) } func (l *GetCenterOverviewLogic) GetCenterOverview() (resp *types.CenterOverviewResp, err error) { - // todo: add your logic here and delete this line + resp = &types.CenterOverviewResp{} - return + var centerNum int32 + var taskNum int32 + var cardNum int32 + var totalTops float64 + + adapterList, err := l.svcCtx.Scheduler.AiStorages.GetAdaptersByType("1") + if err != nil { + return nil, err + } + centerNum = int32(len(adapterList)) + resp.CenterNum = centerNum + + for _, adapter := range adapterList { + taskList, err := l.svcCtx.Scheduler.AiStorages.GetAiTasksByAdapterId(adapter.Id) + if err != nil { + continue + } + taskNum += int32(len(taskList)) + } + resp.TaskNum = taskNum + + for _, adapter := range adapterList { + clusters, err := l.svcCtx.Scheduler.AiStorages.GetClustersByAdapterId(adapter.Id) + if err != nil { + continue + } + for _, cluster := range clusters.List { + clusterResource, err := l.svcCtx.Scheduler.AiStorages.GetClusterResourcesById(cluster.Id) + if err != nil { + continue + } + cardNum += int32(clusterResource.CardTotal) + totalTops += clusterResource.CardTopsTotal + } + } + + resp.CardNum = centerNum + resp.PowerInTops = totalTops + + return resp, nil } diff --git a/api/internal/logic/ai/getcenterqueueinglogic.go b/api/internal/logic/ai/getcenterqueueinglogic.go index 6ff23825..bd5e5e2b 100644 --- a/api/internal/logic/ai/getcenterqueueinglogic.go +++ b/api/internal/logic/ai/getcenterqueueinglogic.go @@ -2,6 +2,7 @@ package ai import ( "context" + "sort" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" @@ -24,7 +25,46 @@ func NewGetCenterQueueingLogic(ctx context.Context, svcCtx *svc.ServiceContext) } func (l *GetCenterQueueingLogic) GetCenterQueueing() (resp *types.CenterQueueingResp, err error) { - // todo: add your logic here and delete this line + resp = &types.CenterQueueingResp{} - return + adapters, err := l.svcCtx.Scheduler.AiStorages.GetAdaptersByType("1") + if err != nil { + return nil, err + } + + for _, adapter := range adapters { + clusters, err := l.svcCtx.Scheduler.AiStorages.GetClustersByAdapterId(adapter.Id) + if err != nil { + continue + } + for _, cluster := range clusters.List { + queues, err := l.svcCtx.Scheduler.AiStorages.GetClusterTaskQueues(adapter.Id, cluster.Id) + if err != nil { + continue + } + //todo sync current task queues + current := &types.CenterQueue{ + Name: cluster.Name, + QueueingNum: int32(queues[0].QueueNum), + } + history := &types.CenterQueue{ + Name: cluster.Name, + QueueingNum: int32(queues[0].QueueNum), + } + resp.Current = append(resp.Current, current) + resp.History = append(resp.History, history) + + } + } + + sortQueueingNum(resp.Current) + sortQueueingNum(resp.History) + + return resp, nil +} + +func sortQueueingNum(q []*types.CenterQueue) { + sort.Slice(q, func(i, j int) bool { + return q[i].QueueingNum > q[j].QueueingNum + }) } diff --git a/api/internal/logic/ai/getcentertasklistlogic.go b/api/internal/logic/ai/getcentertasklistlogic.go index 96242e9b..0a800630 100644 --- a/api/internal/logic/ai/getcentertasklistlogic.go +++ b/api/internal/logic/ai/getcentertasklistlogic.go @@ -2,6 +2,8 @@ package ai import ( "context" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" + "time" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" @@ -15,6 +17,8 @@ type GetCenterTaskListLogic struct { svcCtx *svc.ServiceContext } +const layout = "2006-01-02 15:04:05" + func NewGetCenterTaskListLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetCenterTaskListLogic { return &GetCenterTaskListLogic{ Logger: logx.WithContext(ctx), @@ -24,7 +28,36 @@ func NewGetCenterTaskListLogic(ctx context.Context, svcCtx *svc.ServiceContext) } func (l *GetCenterTaskListLogic) GetCenterTaskList() (resp *types.CenterTaskListResp, err error) { - // todo: add your logic here and delete this line + resp = &types.CenterTaskListResp{} - return + adapterList, err := l.svcCtx.Scheduler.AiStorages.GetAdaptersByType("1") + if err != nil { + return nil, err + } + + for _, adapter := range adapterList { + taskList, err := l.svcCtx.Scheduler.AiStorages.GetAiTasksByAdapterId(adapter.Id) + if err != nil { + continue + } + for _, task := range taskList { + var elapsed time.Duration + start, _ := time.Parse(layout, task.CommitTime) + if task.Status != constants.Completed { + elapsed = start.Sub(time.Now()) + } else { + end, _ := time.Parse(layout, task.EndTime) + elapsed = start.Sub(end) + } + + t := &types.AiTask{ + Name: task.Name, + Status: task.Status, + TimeElapsed: int32(elapsed.Seconds()), + } + resp.List = append(resp.List, t) + } + } + + return resp, nil } diff --git a/api/internal/logic/schedule/schedulesubmitlogic.go b/api/internal/logic/schedule/schedulesubmitlogic.go index 183699f2..e46ffe7d 100644 --- a/api/internal/logic/schedule/schedulesubmitlogic.go +++ b/api/internal/logic/schedule/schedulesubmitlogic.go @@ -29,7 +29,9 @@ func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *type resp = &types.ScheduleResp{} opt := &option.AiOption{ AdapterId: req.AiOption.AdapterId, + TaskName: req.AiOption.TaskName, ResourceType: req.AiOption.ResourceType, + Replica: 1, Tops: req.AiOption.Tops, TaskType: req.AiOption.TaskType, DatasetsName: req.AiOption.Datasets, diff --git a/api/internal/scheduler/database/aiStorage.go b/api/internal/scheduler/database/aiStorage.go index 8efb98b8..c458c622 100644 --- a/api/internal/scheduler/database/aiStorage.go +++ b/api/internal/scheduler/database/aiStorage.go @@ -50,9 +50,20 @@ func (s *AiStorage) GetAdapterIdsByType(adapterType string) ([]string, error) { return ids, nil } -func (s *AiStorage) GetAiTasks() ([]*types.AiTaskDb, error) { +func (s *AiStorage) GetAdaptersByType(adapterType string) ([]*types.AdapterInfo, error) { + var list []*types.AdapterInfo + db := s.DbEngin.Model(&types.AdapterInfo{}).Table("t_adapter") + db = db.Where("type = ?", adapterType) + err := db.Order("create_time desc").Find(&list).Error + if err != nil { + return nil, err + } + return list, nil +} + +func (s *AiStorage) GetAiTasksByAdapterId(adapterId string) ([]*types.AiTaskDb, error) { var resp []*types.AiTaskDb - tx := s.DbEngin.Raw("select * from task_ai").Scan(&resp) + tx := s.DbEngin.Raw("select * from task_ai where `adapter_id` = ? ", adapterId).Scan(&resp) if tx.Error != nil { logx.Errorf(tx.Error.Error()) return nil, tx.Error @@ -93,6 +104,7 @@ func (s *AiStorage) SaveAiTask(taskId int64, option *option.AiOption, clusterId Name: option.TaskName, Replica: option.Replica, JobId: jobId, + TaskType: option.TaskType, Strategy: option.StrategyName, Status: status, Msg: msg, @@ -106,6 +118,37 @@ func (s *AiStorage) SaveAiTask(taskId int64, option *option.AiOption, clusterId return nil } +func (s *AiStorage) SaveClusterTaskQueue(adapterId string, clusterId string, queueNum int64) error { + aId, err := strconv.ParseInt(adapterId, 10, 64) + if err != nil { + return err + } + cId, err := strconv.ParseInt(clusterId, 10, 64) + if err != nil { + return err + } + taskQueue := models.TClusterTaskQueue{ + AdapterId: aId, + ClusterId: cId, + QueueNum: queueNum, + } + tx := s.DbEngin.Create(&taskQueue) + if tx.Error != nil { + return tx.Error + } + return nil +} + +func (s *AiStorage) GetClusterTaskQueues(adapterId string, clusterId string) ([]*models.TClusterTaskQueue, error) { + var taskQueues []*models.TClusterTaskQueue + tx := s.DbEngin.Raw("select * from t_cluster_task_queue where `adapter_id` = ? and `cluster_id` = ?", adapterId, clusterId).Scan(&taskQueues) + if tx.Error != nil { + logx.Errorf(tx.Error.Error()) + return nil, tx.Error + } + return taskQueues, nil +} + func (s *AiStorage) GetAiTaskIdByClusterIdAndTaskId(clusterId string, taskId string) (string, error) { var aiTask models.TaskAi tx := s.DbEngin.Raw("select * from task_ai where `cluster_id` = ? and `task_id` = ?", clusterId, taskId).Scan(&aiTask) @@ -116,6 +159,16 @@ func (s *AiStorage) GetAiTaskIdByClusterIdAndTaskId(clusterId string, taskId str return aiTask.JobId, nil } +func (s *AiStorage) GetClusterResourcesById(clusterId string) (*models.TClusterResource, error) { + var clusterResource models.TClusterResource + tx := s.DbEngin.Raw("select * from t_cluster_resource where `cluster_id` = ?", clusterId).Scan(&clusterResource) + if tx.Error != nil { + logx.Errorf(tx.Error.Error()) + return nil, tx.Error + } + return &clusterResource, nil +} + func (s *AiStorage) UpdateTask() error { return nil }