diff --git a/api/client/types.go b/api/client/types.go index 672cc562..4bbce2e9 100644 --- a/api/client/types.go +++ b/api/client/types.go @@ -111,18 +111,17 @@ type HpcInfo struct { } type CloudInfo struct { - Participant int64 `json:"participant,omitempty"` - Id int64 `json:"id,omitempty"` - TaskId int64 `json:"taskId,omitempty"` - ApiVersion string `json:"apiVersion,omitempty"` - Kind string `json:"kind,omitempty"` - Namespace string `json:"namespace,omitempty"` - Name string `json:"name,omitempty"` - Status string `json:"status,omitempty"` - StartTime string `json:"startTime,omitempty"` - RunningTime int64 `json:"runningTime,omitempty"` - Result string `json:"result,omitempty"` - YamlString string `json:"yamlString,omitempty"` + Id uint `json:"id,omitempty"` + TaskId int64 `json:"taskId,omitempty"` + AdapterId uint `json:"adapterId,omitempty"` + ClusterId uint `json:"clusterId,omitempty"` + ClusterName string `json:"clusterName,omitempty"` + Kind string `json:"kind,omitempty"` + Status string `json:"status,omitempty"` + StartTime *time.Time `json:"startTime,omitempty"` + YamlString string `json:"yamlString,omitempty"` + Result string `json:"result,omitempty"` + Namespace string `json:"namespace,omitempty"` } type AiInfo struct { diff --git a/api/desc/core/pcm-core.api b/api/desc/core/pcm-core.api index be4782d6..64689eb8 100644 --- a/api/desc/core/pcm-core.api +++ b/api/desc/core/pcm-core.api @@ -118,6 +118,7 @@ type ( Strategy string `json:"strategy"` StaticWeightMap map[string]int32 `json:"staticWeightMap,optional"` ReqBody []string `json:"reqBody"` + Replicas int64 `json:"replicas,string"` } ) diff --git a/api/etc/pcm.yaml b/api/etc/pcm.yaml index 2a27ffba..ee51794e 100644 --- a/api/etc/pcm.yaml +++ b/api/etc/pcm.yaml @@ -5,8 +5,8 @@ Port: 8999 Timeout: 50000 DB: - DataSource: root:uJpLd6u-J?HC1@(47.92.88.143:3306)/pcm?parseTime=true&loc=Local - # DataSource: root:uJpLd6u-J?HC1@(47.92.88.143:3306)/pcm?parseTime=true&loc=Local + DataSource: root:uJpLd6u-J?HC1@(10.206.0.12:3306)/pcm?parseTime=true&loc=Local +# DataSource: root:uJpLd6u-J?HC1@(47.92.88.143:3306)/pcm?parseTime=true&loc=Local Redis: Host: 10.206.0.12:6379 Pass: redisPW123 diff --git a/api/internal/logic/ai/getcenterlistlogic.go b/api/internal/logic/ai/getcenterlistlogic.go index ce9db87d..2de25ad5 100644 --- a/api/internal/logic/ai/getcenterlistlogic.go +++ b/api/internal/logic/ai/getcenterlistlogic.go @@ -2,7 +2,6 @@ package ai import ( "context" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" @@ -24,7 +23,21 @@ func NewGetCenterListLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Get } func (l *GetCenterListLogic) GetCenterList() (resp *types.CenterListResp, err error) { - // todo: add your logic here and delete this line + resp = &types.CenterListResp{} - return + adapterList, err := l.svcCtx.Scheduler.AiStorages.GetAdaptersByType("1") + if err != nil { + return nil, err + } + + for _, adapter := range adapterList { + a := &types.AiCenter{ + Name: adapter.Name, + StackName: adapter.Nickname, + Version: adapter.Version, + } + resp.List = append(resp.List, a) + } + + return resp, nil } diff --git a/api/internal/logic/ai/getcenteroverviewlogic.go b/api/internal/logic/ai/getcenteroverviewlogic.go index 0de00f18..9f76979d 100644 --- a/api/internal/logic/ai/getcenteroverviewlogic.go +++ b/api/internal/logic/ai/getcenteroverviewlogic.go @@ -2,7 +2,6 @@ package ai import ( "context" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" @@ -24,7 +23,46 @@ func NewGetCenterOverviewLogic(ctx context.Context, svcCtx *svc.ServiceContext) } func (l *GetCenterOverviewLogic) GetCenterOverview() (resp *types.CenterOverviewResp, err error) { - // todo: add your logic here and delete this line + resp = &types.CenterOverviewResp{} - return + var centerNum int32 + var taskNum int32 + var cardNum int32 + var totalTops float64 + + adapterList, err := l.svcCtx.Scheduler.AiStorages.GetAdaptersByType("1") + if err != nil { + return nil, err + } + centerNum = int32(len(adapterList)) + resp.CenterNum = centerNum + + for _, adapter := range adapterList { + taskList, err := l.svcCtx.Scheduler.AiStorages.GetAiTasksByAdapterId(adapter.Id) + if err != nil { + continue + } + taskNum += int32(len(taskList)) + } + resp.TaskNum = taskNum + + for _, adapter := range adapterList { + clusters, err := l.svcCtx.Scheduler.AiStorages.GetClustersByAdapterId(adapter.Id) + if err != nil { + continue + } + for _, cluster := range clusters.List { + clusterResource, err := l.svcCtx.Scheduler.AiStorages.GetClusterResourcesById(cluster.Id) + if err != nil { + continue + } + cardNum += int32(clusterResource.CardTotal) + totalTops += clusterResource.CardTopsTotal + } + } + + resp.CardNum = centerNum + resp.PowerInTops = totalTops + + return resp, nil } diff --git a/api/internal/logic/ai/getcenterqueueinglogic.go b/api/internal/logic/ai/getcenterqueueinglogic.go index 6ff23825..bd5e5e2b 100644 --- a/api/internal/logic/ai/getcenterqueueinglogic.go +++ b/api/internal/logic/ai/getcenterqueueinglogic.go @@ -2,6 +2,7 @@ package ai import ( "context" + "sort" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" @@ -24,7 +25,46 @@ func NewGetCenterQueueingLogic(ctx context.Context, svcCtx *svc.ServiceContext) } func (l *GetCenterQueueingLogic) GetCenterQueueing() (resp *types.CenterQueueingResp, err error) { - // todo: add your logic here and delete this line + resp = &types.CenterQueueingResp{} - return + adapters, err := l.svcCtx.Scheduler.AiStorages.GetAdaptersByType("1") + if err != nil { + return nil, err + } + + for _, adapter := range adapters { + clusters, err := l.svcCtx.Scheduler.AiStorages.GetClustersByAdapterId(adapter.Id) + if err != nil { + continue + } + for _, cluster := range clusters.List { + queues, err := l.svcCtx.Scheduler.AiStorages.GetClusterTaskQueues(adapter.Id, cluster.Id) + if err != nil { + continue + } + //todo sync current task queues + current := &types.CenterQueue{ + Name: cluster.Name, + QueueingNum: int32(queues[0].QueueNum), + } + history := &types.CenterQueue{ + Name: cluster.Name, + QueueingNum: int32(queues[0].QueueNum), + } + resp.Current = append(resp.Current, current) + resp.History = append(resp.History, history) + + } + } + + sortQueueingNum(resp.Current) + sortQueueingNum(resp.History) + + return resp, nil +} + +func sortQueueingNum(q []*types.CenterQueue) { + sort.Slice(q, func(i, j int) bool { + return q[i].QueueingNum > q[j].QueueingNum + }) } diff --git a/api/internal/logic/ai/getcentertasklistlogic.go b/api/internal/logic/ai/getcentertasklistlogic.go index 96242e9b..0a800630 100644 --- a/api/internal/logic/ai/getcentertasklistlogic.go +++ b/api/internal/logic/ai/getcentertasklistlogic.go @@ -2,6 +2,8 @@ package ai import ( "context" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" + "time" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" @@ -15,6 +17,8 @@ type GetCenterTaskListLogic struct { svcCtx *svc.ServiceContext } +const layout = "2006-01-02 15:04:05" + func NewGetCenterTaskListLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetCenterTaskListLogic { return &GetCenterTaskListLogic{ Logger: logx.WithContext(ctx), @@ -24,7 +28,36 @@ func NewGetCenterTaskListLogic(ctx context.Context, svcCtx *svc.ServiceContext) } func (l *GetCenterTaskListLogic) GetCenterTaskList() (resp *types.CenterTaskListResp, err error) { - // todo: add your logic here and delete this line + resp = &types.CenterTaskListResp{} - return + adapterList, err := l.svcCtx.Scheduler.AiStorages.GetAdaptersByType("1") + if err != nil { + return nil, err + } + + for _, adapter := range adapterList { + taskList, err := l.svcCtx.Scheduler.AiStorages.GetAiTasksByAdapterId(adapter.Id) + if err != nil { + continue + } + for _, task := range taskList { + var elapsed time.Duration + start, _ := time.Parse(layout, task.CommitTime) + if task.Status != constants.Completed { + elapsed = start.Sub(time.Now()) + } else { + end, _ := time.Parse(layout, task.EndTime) + elapsed = start.Sub(end) + } + + t := &types.AiTask{ + Name: task.Name, + Status: task.Status, + TimeElapsed: int32(elapsed.Seconds()), + } + resp.List = append(resp.List, t) + } + } + + return resp, nil } diff --git a/api/internal/logic/cloud/commitgeneraltasklogic.go b/api/internal/logic/cloud/commitgeneraltasklogic.go index 524d29cd..cf8842b9 100644 --- a/api/internal/logic/cloud/commitgeneraltasklogic.go +++ b/api/internal/logic/cloud/commitgeneraltasklogic.go @@ -4,6 +4,9 @@ import ( "bytes" "context" "github.com/pkg/errors" + clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/api/client" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models/cloud" @@ -13,7 +16,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" syaml "k8s.io/apimachinery/pkg/runtime/serializer/yaml" kyaml "k8s.io/apimachinery/pkg/util/yaml" - "sigs.k8s.io/yaml" + "strconv" "strings" "time" @@ -38,62 +41,102 @@ func NewCommitGeneralTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) } func (l *CommitGeneralTaskLogic) CommitGeneralTask(req *types.GeneralTaskReq) error { - var yamlStr []string - for _, s := range req.ReqBody { - j2, err := yaml.YAMLToJSON([]byte(s)) - if err != nil { - logx.Errorf("Failed to convert yaml to JSON, err: %v", err) - return err + tx := l.svcCtx.DbEngin.Begin() + // 执行回滚或者提交操作 + defer func() { + if p := recover(); p != nil { + tx.Rollback() + logx.Error(p) + } else if tx.Error != nil { + logx.Info("rollback, error", tx.Error) + tx.Rollback() + } else { + tx = tx.Commit() + logx.Info("commit success") } - yamlStr = append(yamlStr, string(j2)) - } - result := strings.Join(yamlStr, ",") - //TODO The namespace is fixed to ns-admin for the time being. Later, the namespace is obtained based on the user - taskModel := models.Task{ - Status: constants.Saved, - Name: req.Name, - CommitTime: time.Now(), - YamlString: "[" + result + "]", - } - // Save the task data to the database - tx := l.svcCtx.DbEngin.Create(&taskModel) - if tx.Error != nil { - return tx.Error - } - + }() + //TODO adapter + adapterId, _ := strconv.ParseUint(req.AdapterIds[0], 10, 64) var clusters []*models.CloudModel - err := l.svcCtx.DbEngin.Raw("SELECT * FROM `t_cluster` where adapter_id in ? and id in ?", req.AdapterIds, req.ClusterIds).Scan(&clusters).Error + err := tx.Raw("SELECT * FROM `t_cluster` where adapter_id in ? and id in ?", req.AdapterIds, req.ClusterIds).Scan(&clusters).Error if err != nil { logx.Errorf("CommitGeneralTask() => sql execution error: %v", err) return errors.Errorf("the cluster does not match the drive resources. Check the data") } taskCloud := cloud.TaskCloudModel{} - //TODO 执行策略返回集群跟 Replica - for _, c := range clusters { + opt := &option.CloudOption{} + utils.Convert(&req, &opt) + sc, _ := schedulers.NewCloudScheduler(l.ctx, "", l.svcCtx.Scheduler, opt, tx, l.svcCtx.PromClient) + + results, err := l.svcCtx.Scheduler.AssignAndSchedule(sc) + if err != nil { + logx.Errorf("AssignAndSchedule() => execution error: %v", err) + return err + } + + rs := (results).([]*schedulers.CloudResult) + + var synergyStatus int64 + if len(rs) > 1 { + synergyStatus = 1 + } + var strategy int64 + sqlStr := `select t_dict_item.item_value + from t_dict + left join t_dict_item on t_dict.id = t_dict_item.dict_id + where item_text = ? + and t_dict.dict_code = 'schedule_Strategy'` + //查询调度策略 + err = tx.Raw(sqlStr, req.Strategy).Scan(&strategy).Error + taskModel := models.Task{ + Id: utils.GenSnowflakeID(), + Status: constants.Pending, + Name: req.Name, + CommitTime: time.Now(), + YamlString: strings.Join(req.ReqBody, "\n---\n"), + TaskTypeDict: 0, + SynergyStatus: synergyStatus, + Strategy: strategy, + } + var taskClouds []cloud.TaskCloudModel + for _, r := range rs { for _, s := range req.ReqBody { - sStruct := UnMarshalK8sStruct(s) + sStruct := UnMarshalK8sStruct(s, int64(r.Replica)) unString, _ := sStruct.MarshalJSON() taskCloud.Id = utils.GenSnowflakeIDUint() taskCloud.TaskId = uint(taskModel.Id) - taskCloud.AdapterId = c.AdapterId - taskCloud.ClusterId = c.Id - taskCloud.ClusterName = c.Name - taskCloud.Status = "Pending" + clusterId, _ := strconv.ParseUint(r.ClusterId, 10, 64) + taskCloud.AdapterId = uint(adapterId) + taskCloud.ClusterId = uint(clusterId) + taskCloud.ClusterName = r.ClusterName + taskCloud.Status = constants.Pending taskCloud.YamlString = string(unString) taskCloud.Kind = sStruct.GetKind() taskCloud.Namespace = sStruct.GetNamespace() - tx = l.svcCtx.DbEngin.Create(&taskCloud) - if tx.Error != nil { - logx.Errorf("CommitGeneralTask() create taskCloud => sql execution error: %v", err) - return tx.Error - } + taskClouds = append(taskClouds, taskCloud) } } - + adapterName := "" + tx.Table("t_adapter").Select("name").Where("id=?", adapterId).Find(&adapterName) + noticeInfo := clientCore.NoticeInfo{ + AdapterId: int64(adapterId), + AdapterName: adapterName, + NoticeType: "create", + TaskName: req.Name, + Incident: "任务创建中", + CreatedTime: time.Now(), + } + db := tx.Table("task").Create(&taskModel) + db = tx.Table("task_cloud").Create(&taskClouds) + db = tx.Table("t_notice").Create(¬iceInfo) + if db.Error != nil { + logx.Errorf("Task creation failure, err: %v", db.Error) + return errors.New("task creation failure") + } return nil } -func UnMarshalK8sStruct(yamlString string) *unstructured.Unstructured { +func UnMarshalK8sStruct(yamlString string, replica int64) *unstructured.Unstructured { unstructuredObj := &unstructured.Unstructured{} d := kyaml.NewYAMLOrJSONDecoder(bytes.NewBufferString(yamlString), 4096) var err error @@ -116,11 +159,7 @@ func UnMarshalK8sStruct(yamlString string) *unstructured.Unstructured { } //设置副本数 if unstructuredObj.GetKind() == "Deployment" || unstructuredObj.GetKind() == "StatefulSet" { - unstructured.SetNestedField( - unstructuredObj.Object, - int64(6), - "spec", "replicas", - ) + unstructured.SetNestedField(unstructuredObj.Object, replica, "spec", "replicas") } } return unstructuredObj diff --git a/api/internal/logic/core/pulltaskinfologic.go b/api/internal/logic/core/pulltaskinfologic.go index ef9b86d9..9581659e 100644 --- a/api/internal/logic/core/pulltaskinfologic.go +++ b/api/internal/logic/core/pulltaskinfologic.go @@ -5,6 +5,7 @@ import ( "github.com/jinzhu/copier" clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/api/client" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models/cloud" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "gorm.io/gorm" @@ -54,7 +55,7 @@ func (l *PullTaskInfoLogic) PullTaskInfo(req *clientCore.PullTaskInfoReq) (*clie } } case 0: - var cloudModelList []models.Cloud + var cloudModelList []cloud.TaskCloudModel err := findModelList(req.AdapterId, l.svcCtx.DbEngin, &cloudModelList) if err != nil { return nil, err diff --git a/api/internal/logic/schedule/schedulesubmitlogic.go b/api/internal/logic/schedule/schedulesubmitlogic.go index 183699f2..e46ffe7d 100644 --- a/api/internal/logic/schedule/schedulesubmitlogic.go +++ b/api/internal/logic/schedule/schedulesubmitlogic.go @@ -29,7 +29,9 @@ func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *type resp = &types.ScheduleResp{} opt := &option.AiOption{ AdapterId: req.AiOption.AdapterId, + TaskName: req.AiOption.TaskName, ResourceType: req.AiOption.ResourceType, + Replica: 1, Tops: req.AiOption.Tops, TaskType: req.AiOption.TaskType, DatasetsName: req.AiOption.Datasets, diff --git a/api/internal/mqs/ScheduleCloud.go b/api/internal/mqs/ScheduleCloud.go index a379ecbe..a74056ff 100644 --- a/api/internal/mqs/ScheduleCloud.go +++ b/api/internal/mqs/ScheduleCloud.go @@ -16,8 +16,6 @@ package mqs import ( "context" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" ) @@ -38,28 +36,28 @@ func NewCloudMq(ctx context.Context, svcCtx *svc.ServiceContext) *CloudMq { func (l *CloudMq) Consume(val string) error { // 接受消息, 根据标签筛选过滤 - cloudScheduler := schedulers.NewCloudScheduler() - schdl, err := scheduler.NewScheduler(cloudScheduler, val, l.svcCtx.DbEngin, l.svcCtx.ParticipantRpc) - if err != nil { - return err - } - - //检测是否指定了集群列表 - schdl.SpecifyClusters() - - //检测是否指定了nsID - schdl.SpecifyNsID() - - //通过标签匹配筛选出集群范围 - schdl.MatchLabels() - - //todo 屏蔽原调度算法,因为监控数据暂未上报,临时采用随机调度 - schdl.TempAssign() - - // 存储数据 - err = schdl.SaveToDb() - if err != nil { - return err - } + //cloudScheduler := schedulers.NewCloudScheduler() + //schdl, err := scheduler.NewScheduler(cloudScheduler, val, l.svcCtx.DbEngin, l.svcCtx.ParticipantRpc) + //if err != nil { + // return err + //} + // + ////检测是否指定了集群列表 + //schdl.SpecifyClusters() + // + ////检测是否指定了nsID + //schdl.SpecifyNsID() + // + ////通过标签匹配筛选出集群范围 + //schdl.MatchLabels() + // + ////todo 屏蔽原调度算法,因为监控数据暂未上报,临时采用随机调度 + //schdl.TempAssign() + // + //// 存储数据 + //err = schdl.SaveToDb() + //if err != nil { + // return err + //} return nil } diff --git a/api/internal/scheduler/database/aiStorage.go b/api/internal/scheduler/database/aiStorage.go index 8efb98b8..c458c622 100644 --- a/api/internal/scheduler/database/aiStorage.go +++ b/api/internal/scheduler/database/aiStorage.go @@ -50,9 +50,20 @@ func (s *AiStorage) GetAdapterIdsByType(adapterType string) ([]string, error) { return ids, nil } -func (s *AiStorage) GetAiTasks() ([]*types.AiTaskDb, error) { +func (s *AiStorage) GetAdaptersByType(adapterType string) ([]*types.AdapterInfo, error) { + var list []*types.AdapterInfo + db := s.DbEngin.Model(&types.AdapterInfo{}).Table("t_adapter") + db = db.Where("type = ?", adapterType) + err := db.Order("create_time desc").Find(&list).Error + if err != nil { + return nil, err + } + return list, nil +} + +func (s *AiStorage) GetAiTasksByAdapterId(adapterId string) ([]*types.AiTaskDb, error) { var resp []*types.AiTaskDb - tx := s.DbEngin.Raw("select * from task_ai").Scan(&resp) + tx := s.DbEngin.Raw("select * from task_ai where `adapter_id` = ? ", adapterId).Scan(&resp) if tx.Error != nil { logx.Errorf(tx.Error.Error()) return nil, tx.Error @@ -93,6 +104,7 @@ func (s *AiStorage) SaveAiTask(taskId int64, option *option.AiOption, clusterId Name: option.TaskName, Replica: option.Replica, JobId: jobId, + TaskType: option.TaskType, Strategy: option.StrategyName, Status: status, Msg: msg, @@ -106,6 +118,37 @@ func (s *AiStorage) SaveAiTask(taskId int64, option *option.AiOption, clusterId return nil } +func (s *AiStorage) SaveClusterTaskQueue(adapterId string, clusterId string, queueNum int64) error { + aId, err := strconv.ParseInt(adapterId, 10, 64) + if err != nil { + return err + } + cId, err := strconv.ParseInt(clusterId, 10, 64) + if err != nil { + return err + } + taskQueue := models.TClusterTaskQueue{ + AdapterId: aId, + ClusterId: cId, + QueueNum: queueNum, + } + tx := s.DbEngin.Create(&taskQueue) + if tx.Error != nil { + return tx.Error + } + return nil +} + +func (s *AiStorage) GetClusterTaskQueues(adapterId string, clusterId string) ([]*models.TClusterTaskQueue, error) { + var taskQueues []*models.TClusterTaskQueue + tx := s.DbEngin.Raw("select * from t_cluster_task_queue where `adapter_id` = ? and `cluster_id` = ?", adapterId, clusterId).Scan(&taskQueues) + if tx.Error != nil { + logx.Errorf(tx.Error.Error()) + return nil, tx.Error + } + return taskQueues, nil +} + func (s *AiStorage) GetAiTaskIdByClusterIdAndTaskId(clusterId string, taskId string) (string, error) { var aiTask models.TaskAi tx := s.DbEngin.Raw("select * from task_ai where `cluster_id` = ? and `task_id` = ?", clusterId, taskId).Scan(&aiTask) @@ -116,6 +159,16 @@ func (s *AiStorage) GetAiTaskIdByClusterIdAndTaskId(clusterId string, taskId str return aiTask.JobId, nil } +func (s *AiStorage) GetClusterResourcesById(clusterId string) (*models.TClusterResource, error) { + var clusterResource models.TClusterResource + tx := s.DbEngin.Raw("select * from t_cluster_resource where `cluster_id` = ?", clusterId).Scan(&clusterResource) + if tx.Error != nil { + logx.Errorf(tx.Error.Error()) + return nil, tx.Error + } + return &clusterResource, nil +} + func (s *AiStorage) UpdateTask() error { return nil } diff --git a/api/internal/scheduler/schedulers/cloudScheduler.go b/api/internal/scheduler/schedulers/cloudScheduler.go index e4035574..4f00aaba 100644 --- a/api/internal/scheduler/schedulers/cloudScheduler.go +++ b/api/internal/scheduler/schedulers/cloudScheduler.go @@ -15,106 +15,175 @@ package schedulers import ( - "bytes" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/database" + "context" + "errors" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy/param" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response" - "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models/cloud" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/tracker" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" - "io" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime" - syaml "k8s.io/apimachinery/pkg/runtime/serializer/yaml" - kyaml "k8s.io/apimachinery/pkg/util/yaml" + "gorm.io/gorm" + "math" + "time" ) type CloudScheduler struct { - storage database.Storage + yamlString string + task *response.TaskInfo + *scheduler.Scheduler + option *option.CloudOption + ctx context.Context + dbEngin *gorm.DB + promClient tracker.Prometheus + svcCtx *svc.ServiceContext } -func NewCloudScheduler() *CloudScheduler { - return &CloudScheduler{} +type CloudResult struct { + TaskId string + ClusterId string + ClusterName string + Strategy string + Replica int32 + Msg string } -func (cs *CloudScheduler) PickOptimalStrategy() (strategy.Strategy, error) { - //获取所有计算中心 - //调度算法 - strategy := strategy.NewPricingStrategy(¶m.ResourcePricingParams{}) - return strategy, nil +func NewCloudScheduler(ctx context.Context, val string, scheduler *scheduler.Scheduler, option *option.CloudOption, dbEngin *gorm.DB, promClient tracker.Prometheus) (*CloudScheduler, error) { + return &CloudScheduler{ctx: ctx, yamlString: val, Scheduler: scheduler, option: option, dbEngin: dbEngin, promClient: promClient}, nil } -func (cs *CloudScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) { - cloud := cs.UnMarshalK8sStruct(resource, task.TaskId, task.NsID) - cloud.Id = utils.GenSnowflakeID() - cloud.NsID = task.NsID - - cloud.ParticipantId = participantId - return cloud, nil -} - -func (cs *CloudScheduler) UnMarshalK8sStruct(yamlString string, taskId int64, nsID string) models.Cloud { - var cloud models.Cloud - d := kyaml.NewYAMLOrJSONDecoder(bytes.NewBufferString(yamlString), 4096) - var err error - for { - var rawObj runtime.RawExtension - err = d.Decode(&rawObj) - if err == io.EOF { - break - } - if err != nil { - } - obj := &unstructured.Unstructured{} - syaml.NewDecodingSerializer(unstructured.UnstructuredJSONScheme).Decode(rawObj.Raw, nil, obj) - if err != nil { - } - - unstructuredMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj) - if err != nil { - } - - unstructureObj := &unstructured.Unstructured{Object: unstructuredMap} - if len(nsID) != 0 { - unstructureObj.SetNamespace(nsID) - } - cloud = models.Cloud{ - TaskId: taskId, - ApiVersion: unstructureObj.GetAPIVersion(), - Name: unstructureObj.GetName(), - Kind: unstructureObj.GetKind(), - Namespace: unstructureObj.GetNamespace(), - Status: "Saved", - } - // 命名空间为空 设置默认值 - if len(unstructureObj.GetNamespace()) == 0 { - cloud.Namespace = "default" - } - //unstructureObj转成string - unString, _ := unstructureObj.MarshalJSON() - cloud.YamlString = string(unString) +func (as *CloudScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) { + c := cloud.TaskCloudModel{ + AdapterId: uint(participantId), + TaskId: uint(task.TaskId), + Status: "Pending", + YamlString: as.yamlString, } - return cloud + utils.Convert(task.Metadata, &c) + return c, nil } -func (cs *CloudScheduler) genTaskAndProviders() (*providerPricing.Task, []*providerPricing.Provider, error) { - proParams, err := cs.storage.GetProviderParams() +func (as *CloudScheduler) PickOptimalStrategy() (strategy.Strategy, error) { + if len(as.option.ClusterIds) == 1 { + return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: as.option.ClusterIds[0], Replicas: 1}}, nil + } + + resources, err := as.findClustersWithResources() + if err != nil { - return nil, nil, nil + return nil, err } - var providerList []*providerPricing.Provider - for _, p := range proParams { - provider := providerPricing.NewProvider(p.Participant_id, p.Cpu_avail, p.Mem_avail, p.Disk_avail, 0.0, 0.0, 0.0) - providerList = append(providerList, provider) + if len(resources) == 0 { + return nil, errors.New("no cluster has resources") } - //replicas := task.Metadata.(map[string]interface{})["spec"].(map[string]interface{})["replicas"].(float64) - //t := algorithm.NewTask(0, int(replicas), 2, 75120000, 301214500, 1200, 2, 6, 2000) + if len(resources) == 1 { + var cluster strategy.AssignedCluster + cluster.ClusterId = resources[0].ClusterId + cluster.Replicas = 1 + return &strategy.SingleAssignment{Cluster: &cluster}, nil + } - return nil, providerList, nil + params := ¶m.Params{Resources: resources} + + switch as.option.Strategy { + case strategy.REPLICATION: + var clusterIds []string + for _, resource := range resources { + clusterIds = append(clusterIds, resource.ClusterId) + } + strategy := strategy.NewReplicationStrategy(clusterIds, as.option.Replica) + return strategy, nil + case strategy.RESOURCES_PRICING: + strategy := strategy.NewPricingStrategy(¶m.ResourcePricingParams{Params: params, Replicas: as.option.Replica}) + return strategy, nil + case strategy.DYNAMIC_RESOURCES: + strategy := strategy.NewDynamicResourcesStrategy(params.Resources, as.option, 1) + return strategy, nil + case strategy.STATIC_WEIGHT: + //todo resources should match cluster StaticWeightMap + strategy := strategy.NewStaticWeightStrategy(as.option.StaticWeightMap, as.option.Replica) + return strategy, nil + } + + return nil, errors.New("no strategy has been chosen") } -func (cs *CloudScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interface{}, error) { - return nil, nil +func (as *CloudScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interface{}, error) { + if clusters == nil { + return nil, errors.New("clusters is nil") + } + + for i := len(clusters) - 1; i >= 0; i-- { + if clusters[i].Replicas == 0 { + clusters = append(clusters[:i], clusters[i+1:]...) + } + } + + if len(clusters) == 0 { + return nil, errors.New("clusters is nil") + } + + var results []*CloudResult + + for _, cluster := range clusters { + cName := "" + as.dbEngin.Table("t_cluster").Select("name").Where("id=?", cluster.ClusterId).Find(&cName) + cr := CloudResult{ + ClusterId: cluster.ClusterId, + ClusterName: cName, + Replica: cluster.Replicas, + } + cr.ClusterId = cluster.ClusterId + cr.Replica = cluster.Replicas + + cr.ClusterName = cName + results = append(results, &cr) + } + + return results, nil +} + +func (as *CloudScheduler) findClustersWithResources() ([]*collector.ResourceStats, error) { + resp := []*collector.ResourceStats{} + //查询集群资源信息 + var rMetrics []tracker.Metric + metrics := []string{"cluster_cpu_utilisation", "cluster_cpu_avail", "cluster_cpu_total", "cluster_memory_total", "cluster_memory_avail", "cluster_memory_utilisation", "cluster_disk_utilisation", "cluster_disk_avail", "cluster_disk_total", "cluster_pod_utilisation"} + var clusterNames []string + as.dbEngin.Table("t_cluster").Select("name").Where("id in ?", as.option.ClusterIds).Find(&clusterNames) + for _, c := range clusterNames { + rMetrics = as.promClient.GetNamedMetrics(metrics, time.Now(), tracker.ClusterOption{ClusterName: c}) + r := collector.ResourceStats{} + var cid string + as.dbEngin.Table("t_cluster").Select("id").Where("name = ?", c).Find(&cid) + r.ClusterId = cid + r.Name = c + for _, metric := range rMetrics { + if metric.MetricName == "cluster_cpu_total" { + r.CpuCoreTotal = int64(metric.MetricData.MetricValues[0].Sample.Value()) + } + if metric.MetricName == "cluster_cpu_avail" { + cpuAvail := metric.MetricData.MetricValues[0].Sample.Value() + r.CpuCoreAvail = int64(math.Round(cpuAvail)) + } + if metric.MetricName == "cluster_memory_total" { + r.MemTotal = metric.MetricData.MetricValues[0].Sample.Value() + } + if metric.MetricName == "cluster_memory_avail" { + r.MemAvail = metric.MetricData.MetricValues[0].Sample.Value() + } + if metric.MetricName == "cluster_disk_total" { + r.DiskTotal = metric.MetricData.MetricValues[0].Sample.Value() + } + if metric.MetricName == "cluster_disk_avail" { + r.DiskAvail = metric.MetricData.MetricValues[0].Sample.Value() + } + } + resp = append(resp, &r) + } + return resp, nil } diff --git a/api/internal/scheduler/schedulers/option/cloudOption.go b/api/internal/scheduler/schedulers/option/cloudOption.go index cf2df437..2654c755 100644 --- a/api/internal/scheduler/schedulers/option/cloudOption.go +++ b/api/internal/scheduler/schedulers/option/cloudOption.go @@ -7,6 +7,7 @@ type CloudOption struct { Strategy string `json:"strategy"` StaticWeightMap map[string]int32 `json:"staticWeightMap,optional"` ReqBody []string `json:"reqBody"` + Replica int32 `json:"replicas,string"` } func (c CloudOption) GetOptionType() string { diff --git a/api/internal/types/types.go b/api/internal/types/types.go index 7e3ae940..321e6755 100644 --- a/api/internal/types/types.go +++ b/api/internal/types/types.go @@ -105,6 +105,7 @@ type GeneralTaskReq struct { Strategy string `json:"strategy"` StaticWeightMap map[string]int32 `json:"staticWeightMap,optional"` ReqBody []string `json:"reqBody"` + Replicas int64 `json:"replicas,string"` } type DeleteTaskReq struct { diff --git a/pkg/constants/task.go b/pkg/constants/task.go index daf8879f..0ec079f3 100644 --- a/pkg/constants/task.go +++ b/pkg/constants/task.go @@ -26,4 +26,5 @@ const ( WaitRestart = "WaitRestart" WaitPause = "WaitPause" WaitStart = "WaitStart" + Pending = "Pending" ) diff --git a/pkg/models/cloud/task_cloud.go b/pkg/models/cloud/task_cloud.go index 13e8c045..3dec32bc 100644 --- a/pkg/models/cloud/task_cloud.go +++ b/pkg/models/cloud/task_cloud.go @@ -6,18 +6,17 @@ import ( ) type TaskCloudModel struct { - Id uint `json:"id" gorm:"primarykey;not null;comment:id"` - TaskId uint `json:"taskId" gorm:"not null;comment:task表id"` - AdapterId uint `json:"adapterId" gorm:"not null;comment:适配器id"` - ClusterId uint `json:"clusterId" gorm:"not null;comment:集群id"` - ClusterName string `json:"clusterName" gorm:"not null;comment:集群名称"` - Kind string `json:"kind" gorm:"comment:种类"` - Status string `json:"status" gorm:"comment:状态"` - StartTime time.Time `json:"startTime" gorm:"comment:开始时间"` - YamlString string `json:"yamlString" gorm:"not null;comment:入参"` - Result string `json:"result" gorm:"comment:运行结果"` - Namespace string `json:"namespace" gorm:"comment:命名空间"` - Replica int `json:"replica" gorm:"not null;comment:副本数"` + Id uint `json:"id" gorm:"primarykey;not null;comment:id"` + TaskId uint `json:"taskId" gorm:"not null;comment:task表id"` + AdapterId uint `json:"adapterId" gorm:"not null;comment:适配器id"` + ClusterId uint `json:"clusterId" gorm:"not null;comment:集群id"` + ClusterName string `json:"clusterName" gorm:"not null;comment:集群名称"` + Kind string `json:"kind" gorm:"comment:种类"` + Status string `json:"status" gorm:"comment:状态"` + StartTime *time.Time `json:"startTime" gorm:"comment:开始时间"` + YamlString string `json:"yamlString" gorm:"not null;comment:入参"` + Result string `json:"result" gorm:"comment:运行结果"` + Namespace string `json:"namespace" gorm:"comment:命名空间"` base.BaseModel } diff --git a/pkg/models/taskaimodel_gen.go b/pkg/models/taskaimodel_gen.go index ab0c5502..34ecd67b 100644 --- a/pkg/models/taskaimodel_gen.go +++ b/pkg/models/taskaimodel_gen.go @@ -49,6 +49,7 @@ type ( CommitTime time.Time `db:"commit_time"` // 提交时间 StartTime string `db:"start_time"` // 开始时间 EndTime string `db:"end_time"` // 结束时间 + TaskType string `db:"task_type"` } ) @@ -87,14 +88,14 @@ func (m *defaultTaskAiModel) FindOne(ctx context.Context, id int64) (*TaskAi, er } func (m *defaultTaskAiModel) Insert(ctx context.Context, data *TaskAi) (sql.Result, error) { - query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, taskAiRowsExpectAutoSet) - ret, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.AdapterId, data.ClusterId, data.Name, data.Replica, data.JobId, data.Strategy, data.Status, data.Msg, data.CommitTime, data.StartTime, data.EndTime) + query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, taskAiRowsExpectAutoSet) + ret, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.AdapterId, data.ClusterId, data.Name, data.Replica, data.JobId, data.Strategy, data.Status, data.Msg, data.CommitTime, data.StartTime, data.EndTime, data.TaskType) return ret, err } func (m *defaultTaskAiModel) Update(ctx context.Context, data *TaskAi) error { query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, taskAiRowsWithPlaceHolder) - _, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.AdapterId, data.ClusterId, data.Name, data.Replica, data.JobId, data.Strategy, data.Status, data.Msg, data.CommitTime, data.StartTime, data.EndTime, data.Id) + _, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.AdapterId, data.ClusterId, data.Name, data.Replica, data.JobId, data.Strategy, data.Status, data.Msg, data.CommitTime, data.StartTime, data.EndTime, data.TaskType, data.Id) return err } diff --git a/pkg/models/tclusterresourcemodel_gen.go b/pkg/models/tclusterresourcemodel_gen.go index bf4068e6..1bb40107 100644 --- a/pkg/models/tclusterresourcemodel_gen.go +++ b/pkg/models/tclusterresourcemodel_gen.go @@ -35,17 +35,19 @@ type ( } TClusterResource struct { - ClusterId int64 `db:"cluster_id"` - ClusterName string `db:"cluster_name"` - ClusterType int64 `db:"cluster_type"` // 类型0->容器,1->智算,2->超算,3-虚拟机 - CpuAvail float64 `db:"cpu_avail"` - CpuTotal float64 `db:"cpu_total"` - MemAvail float64 `db:"mem_avail"` - MemTotal float64 `db:"mem_total"` - DiskAvail float64 `db:"disk_avail"` - DiskTotal float64 `db:"disk_total"` - GpuAvail float64 `db:"gpu_avail"` - GpuTotal float64 `db:"gpu_total"` + ClusterId int64 `db:"cluster_id"` + ClusterName string `db:"cluster_name"` + ClusterType int64 `db:"cluster_type"` // 类型0->容器,1->智算,2->超算,3-虚拟机 + CpuAvail float64 `db:"cpu_avail"` + CpuTotal float64 `db:"cpu_total"` + MemAvail float64 `db:"mem_avail"` + MemTotal float64 `db:"mem_total"` + DiskAvail float64 `db:"disk_avail"` + DiskTotal float64 `db:"disk_total"` + GpuAvail float64 `db:"gpu_avail"` + GpuTotal float64 `db:"gpu_total"` + CardTotal int64 `db:"card_total"` // 算力卡数量 + CardTopsTotal float64 `db:"card_tops_total"` // 算力总量tops } ) @@ -56,6 +58,13 @@ func newTClusterResourceModel(conn sqlx.SqlConn) *defaultTClusterResourceModel { } } +func (m *defaultTClusterResourceModel) withSession(session sqlx.Session) *defaultTClusterResourceModel { + return &defaultTClusterResourceModel{ + conn: sqlx.NewSqlConnFromSession(session), + table: "`t_cluster_resource`", + } +} + func (m *defaultTClusterResourceModel) Delete(ctx context.Context, clusterId int64) error { query := fmt.Sprintf("delete from %s where `cluster_id` = ?", m.table) _, err := m.conn.ExecCtx(ctx, query, clusterId) @@ -77,14 +86,14 @@ func (m *defaultTClusterResourceModel) FindOne(ctx context.Context, clusterId in } func (m *defaultTClusterResourceModel) Insert(ctx context.Context, data *TClusterResource) (sql.Result, error) { - query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, tClusterResourceRowsExpectAutoSet) - ret, err := m.conn.ExecCtx(ctx, query, data.ClusterId, data.ClusterName, data.ClusterType, data.CpuAvail, data.CpuTotal, data.MemAvail, data.MemTotal, data.DiskAvail, data.DiskTotal, data.GpuAvail, data.GpuTotal) + query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, tClusterResourceRowsExpectAutoSet) + ret, err := m.conn.ExecCtx(ctx, query, data.ClusterId, data.ClusterName, data.ClusterType, data.CpuAvail, data.CpuTotal, data.MemAvail, data.MemTotal, data.DiskAvail, data.DiskTotal, data.GpuAvail, data.GpuTotal, data.CardTotal, data.CardTopsTotal) return ret, err } func (m *defaultTClusterResourceModel) Update(ctx context.Context, data *TClusterResource) error { query := fmt.Sprintf("update %s set %s where `cluster_id` = ?", m.table, tClusterResourceRowsWithPlaceHolder) - _, err := m.conn.ExecCtx(ctx, query, data.ClusterName, data.ClusterType, data.CpuAvail, data.CpuTotal, data.MemAvail, data.MemTotal, data.DiskAvail, data.DiskTotal, data.GpuAvail, data.GpuTotal, data.ClusterId) + _, err := m.conn.ExecCtx(ctx, query, data.ClusterName, data.ClusterType, data.CpuAvail, data.CpuTotal, data.MemAvail, data.MemTotal, data.DiskAvail, data.DiskTotal, data.GpuAvail, data.GpuTotal, data.CardTotal, data.CardTopsTotal, data.ClusterId) return err } diff --git a/pkg/models/tclustertaskqueuemodel.go b/pkg/models/tclustertaskqueuemodel.go new file mode 100644 index 00000000..35e52522 --- /dev/null +++ b/pkg/models/tclustertaskqueuemodel.go @@ -0,0 +1,24 @@ +package models + +import "github.com/zeromicro/go-zero/core/stores/sqlx" + +var _ TClusterTaskQueueModel = (*customTClusterTaskQueueModel)(nil) + +type ( + // TClusterTaskQueueModel is an interface to be customized, add more methods here, + // and implement the added methods in customTClusterTaskQueueModel. + TClusterTaskQueueModel interface { + tClusterTaskQueueModel + } + + customTClusterTaskQueueModel struct { + *defaultTClusterTaskQueueModel + } +) + +// NewTClusterTaskQueueModel returns a model for the database table. +func NewTClusterTaskQueueModel(conn sqlx.SqlConn) TClusterTaskQueueModel { + return &customTClusterTaskQueueModel{ + defaultTClusterTaskQueueModel: newTClusterTaskQueueModel(conn), + } +} diff --git a/pkg/models/tclustertaskqueuemodel_gen.go b/pkg/models/tclustertaskqueuemodel_gen.go new file mode 100644 index 00000000..f40a857c --- /dev/null +++ b/pkg/models/tclustertaskqueuemodel_gen.go @@ -0,0 +1,95 @@ +// Code generated by goctl. DO NOT EDIT. + +package models + +import ( + "context" + "database/sql" + "fmt" + "strings" + "time" + + "github.com/zeromicro/go-zero/core/stores/builder" + "github.com/zeromicro/go-zero/core/stores/sqlc" + "github.com/zeromicro/go-zero/core/stores/sqlx" + "github.com/zeromicro/go-zero/core/stringx" +) + +var ( + tClusterTaskQueueFieldNames = builder.RawFieldNames(&TClusterTaskQueue{}) + tClusterTaskQueueRows = strings.Join(tClusterTaskQueueFieldNames, ",") + tClusterTaskQueueRowsExpectAutoSet = strings.Join(stringx.Remove(tClusterTaskQueueFieldNames, "`id`", "`create_at`", "`create_time`", "`created_at`", "`update_at`", "`update_time`", "`updated_at`"), ",") + tClusterTaskQueueRowsWithPlaceHolder = strings.Join(stringx.Remove(tClusterTaskQueueFieldNames, "`id`", "`create_at`", "`create_time`", "`created_at`", "`update_at`", "`update_time`", "`updated_at`"), "=?,") + "=?" +) + +type ( + tClusterTaskQueueModel interface { + Insert(ctx context.Context, data *TClusterTaskQueue) (sql.Result, error) + FindOne(ctx context.Context, id int64) (*TClusterTaskQueue, error) + Update(ctx context.Context, data *TClusterTaskQueue) error + Delete(ctx context.Context, id int64) error + } + + defaultTClusterTaskQueueModel struct { + conn sqlx.SqlConn + table string + } + + TClusterTaskQueue struct { + Id int64 `db:"id"` // id + AdapterId int64 `db:"adapter_id"` // 适配器id + ClusterId int64 `db:"cluster_id"` // 集群id + QueueNum int64 `db:"queue_num"` // 任务排队数量 + Date time.Time `db:"date"` + } +) + +func newTClusterTaskQueueModel(conn sqlx.SqlConn) *defaultTClusterTaskQueueModel { + return &defaultTClusterTaskQueueModel{ + conn: conn, + table: "`t_cluster_task_queue`", + } +} + +func (m *defaultTClusterTaskQueueModel) withSession(session sqlx.Session) *defaultTClusterTaskQueueModel { + return &defaultTClusterTaskQueueModel{ + conn: sqlx.NewSqlConnFromSession(session), + table: "`t_cluster_task_queue`", + } +} + +func (m *defaultTClusterTaskQueueModel) Delete(ctx context.Context, id int64) error { + query := fmt.Sprintf("delete from %s where `id` = ?", m.table) + _, err := m.conn.ExecCtx(ctx, query, id) + return err +} + +func (m *defaultTClusterTaskQueueModel) FindOne(ctx context.Context, id int64) (*TClusterTaskQueue, error) { + query := fmt.Sprintf("select %s from %s where `id` = ? limit 1", tClusterTaskQueueRows, m.table) + var resp TClusterTaskQueue + err := m.conn.QueryRowCtx(ctx, &resp, query, id) + switch err { + case nil: + return &resp, nil + case sqlc.ErrNotFound: + return nil, ErrNotFound + default: + return nil, err + } +} + +func (m *defaultTClusterTaskQueueModel) Insert(ctx context.Context, data *TClusterTaskQueue) (sql.Result, error) { + query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?)", m.table, tClusterTaskQueueRowsExpectAutoSet) + ret, err := m.conn.ExecCtx(ctx, query, data.AdapterId, data.ClusterId, data.QueueNum, data.Date) + return ret, err +} + +func (m *defaultTClusterTaskQueueModel) Update(ctx context.Context, data *TClusterTaskQueue) error { + query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, tClusterTaskQueueRowsWithPlaceHolder) + _, err := m.conn.ExecCtx(ctx, query, data.AdapterId, data.ClusterId, data.QueueNum, data.Date, data.Id) + return err +} + +func (m *defaultTClusterTaskQueueModel) tableName() string { + return m.table +}